Problem using Phaser.awaitAdvanceInterruptibly()

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Problem using Phaser.awaitAdvanceInterruptibly()

R.A. Porter
I'm trying to write a multiplexed input stream where multiple consumers can read a common IO stream concurrently without making multiple copies of the stream in memory. My first pass through had used a CyclicBarrier, but my friend pointed out to me that it wasn't flexible enough to allow one of the consumers to drop out early by calling close on the stream.

I've updated to use Phaser and, while I have a lot of tests to write and perform, it appears that almost everything is working as it should be. The only issue I have is with the use of awaitAdvanceInterruptibly().

I don't want a single bad consumer blocking or otherwise taking too long consuming the current in-memory buffer to cause all my other consumers to block on phase advance. I thought I'd be able to manage that with the await/Advance timeout, but either I've used it wrong or I completely misunderstand its purpose and function.

Any guidance would be much appreciated. If I can't get this to work with Phaser, I may need to drop to simpler concurrency controls and manage the thread interaction with a lot more manual code. That's certainly doable, but definitely not preferred.

N.B. This is a work in progress; it does not reflect my best, final efforts.

-R.A. Porter

===
/*
 Copyright (C) 2016 R.A. Porter
    This program is free software: you can redistribute it and/or modify
    it under the terms of the GNU Lesser General Public License as published by
    the Free Software Foundation, either version 3 of the License, or
    (at your option) any later version.

    This program is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    GNU Lesser General Public License for more details.

    You should have received a copy of the GNU Lesser General Public License
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */
package com.dreamloom.multiplex;

import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.UncheckedIOException;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.stream.Collectors;

/**
 * Creates a multiplexed input stream, capable of parallel, concurrent reads by multiple consumers.
 * <p>
 * Input streams are depleted as they are consumed; this decorator allows multiple consumers to read
 * from the same stream without making multiple copies and overfilling memory. Because it operates
 * in parallel, it also provides the performance benefit of concurrent stream reading without adding
 * complexity to client code.
 * <p>
 * If any of the {@code Consumer} threads throw an exception or take longer than the configured time
 * to read a chunk of data from memory ({@link #AWAIT_SECONDS} seconds), the multiplexer will fail
 * all the consumer threads.
 */
public class MultiplexInputStream extends InputStream {
    private static final int DEFAULT_BUFFER_SIZE = 2048;

    private static final int AWAIT_SECONDS = 10;

    private static final ThreadLocal<Integer> INDEX = new ThreadLocal<Integer>() {
        @Override
        protected Integer initialValue() {
            return 0;
        }
    };

    private static final String MULTIPLEX_THREAD_PREFIX = "Multiplex";

    private final BufferedInputStream bis;

    private final byte[] buffer;

    private Phaser phaser;

    private int currentReadBytes;

    private ExecutorService service;

    private final AtomicInteger factoryIdx = new AtomicInteger(0);

    /**
     * Creates a new {@code MultiplexInputStream} instance with an in-memory buffer of default
     * size, 2048 bytes.
     *
     * @param source the underlying input stream to decorate and multiplex
     * @throws IOException if there is a problem reading the underlying stream
     */
    public MultiplexInputStream(InputStream source) throws IOException {
        this(source, DEFAULT_BUFFER_SIZE);
    }

    /**
     * Creates a new {@code MultiplexInputStream} instance with an in-memory buffer of
     * {@code bufferSize)} bytes.
     *
     * @param source     the underlying input stream to decorate and multiplex
     * @param bufferSize the size of the in-memory buffer
     * @throws IOException if there is a problem reading the underlying stream
     */
    public MultiplexInputStream(InputStream source, int bufferSize) throws IOException {
        buffer = new byte[bufferSize];
        bis = new BufferedInputStream(source);

        currentReadBytes = bis.read(buffer);
        if (currentReadBytes == -1) {
            throw new IllegalStateException(
                    "Error initializing stream; no content found in source.");
        }
    }

    /**
     * Starts the multiplexer, running the list of provided {@code Consumer}s to read the stream.
     * <p>
     * It is the responsibility of the caller to ensure that each of the provided {@code Consumer}s
     * correctly ingests and consumes the {@code InputStream} and does not block.
     *
     * @param consumers the list of processors reading the {@code InputStream}
     * @throws ExecutionException
     * @throws InterruptedException
     */
    public void invoke(List<Consumer<InputStream>> consumers)
            throws ExecutionException, InterruptedException {
        phaser = new Phaser(consumers.size()) {
            @Override
            protected boolean onAdvance(int phase, int registeredParties) {
                if (registeredParties == 0) {
                    return true;
                }
                try {
                    currentReadBytes = bis.read(buffer);
                } catch (IOException e) {
                    // Failure to read the underlying stream should result in a failure of all running
                    // consumer threads.
                    throw new UncheckedIOException(e);
                }

                return false;
            }
        };

        service = Executors.newFixedThreadPool(consumers.size(),
                r -> new Thread(r, getConsumerThreadName()));

        Set<Future> futures = consumers.stream()
                .map(consumer -> service.submit(() -> consumer.accept(MultiplexInputStream.this)))
                .collect(Collectors.toSet());

        for (Future future : futures) {
            future.get();
        }
    }

    @Override
    public int read() throws IOException {
        Integer index = INDEX.get();
        // Block if this thread has gotten the last byte of the current buffer
        if (index == currentReadBytes) {
            try {
                phaser.awaitAdvanceInterruptibly(phaser.arrive(), AWAIT_SECONDS, TimeUnit.SECONDS);

                index = 0;
                INDEX.set(index);
            } catch (InterruptedException | TimeoutException e) {
                throw new IOException("Error processing multiplexed input", e);
            }
        }

        if (currentReadBytes == -1) {
            return -1;
        }

        byte b = buffer[index++];
        INDEX.set(index);
        return b;
    }

    @Override
    public void close() throws IOException {
        // If caller is one of the consumer threads, arriveAndDeregister on its behalf;
        // else, close down resources
        synchronized (MULTIPLEX_THREAD_PREFIX) {
            if (Thread.currentThread()
                    .getName()
                    .startsWith(MULTIPLEX_THREAD_PREFIX)) {
                phaser.arriveAndDeregister();
            } else {
                bis.close();
                service.shutdownNow();
            }
        }
    }

    private String getConsumerThreadName() {
        return String.format("%s-%d-%s", MULTIPLEX_THREAD_PREFIX, factoryIdx.getAndIncrement(),
                UUID.randomUUID()
                        .toString());
    }
}

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: Problem using Phaser.awaitAdvanceInterruptibly()

Jens Wilke
On Saturday 18 June 2016 21:33:35 R.A. Porter wrote:
> I'm trying to write a multiplexed input stream where multiple consumers can read a common IO stream concurrently without making multiple copies of the stream in memory.

Multiplexing means combining multiple streams into one, e.g. by chunking and with each chunk having an identifier of the source stream. Reading a multiplexed stream, would be actually demultiplexing. I think that is not what you try to do here.

You just want to forward the identical inputstream data to multiple consumers, correct?

Cheers,

Jens

--
"Everything superfluous is wrong!"

   // Jens Wilke - headissue GmbH - Germany
 \//  https://headissue.com
_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: Problem using Phaser.awaitAdvanceInterruptibly()

R.A. Porter

Right you are, Jens. It indeed should be called something different.

-R.A. Porter


On Sun, Jun 19, 2016, 3:30 AM Jens Wilke <[hidden email]> wrote:
On Saturday 18 June 2016 21:33:35 R.A. Porter wrote:
> I'm trying to write a multiplexed input stream where multiple consumers can read a common IO stream concurrently without making multiple copies of the stream in memory.

Multiplexing means combining multiple streams into one, e.g. by chunking and with each chunk having an identifier of the source stream. Reading a multiplexed stream, would be actually demultiplexing. I think that is not what you try to do here.

You just want to forward the identical inputstream data to multiple consumers, correct?

Cheers,

Jens

--
"Everything superfluous is wrong!"

   // Jens Wilke - headissue GmbH - Germany
 \//  https://headissue.com

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: Problem using Phaser.awaitAdvanceInterruptibly()

Alex Otenko

The crucial point here is how dynamic the reader count is. If it is fixed, then any solution is not lock free, and the slowest reader problem will manifest itself in some form. If it is not fixed,  then you need to define the condition when the buffer can be freed.

Alex

On 19 Jun 2016 13:59, "R.A. Porter" <[hidden email]> wrote:

Right you are, Jens. It indeed should be called something different.

-R.A. Porter


On Sun, Jun 19, 2016, 3:30 AM Jens Wilke <[hidden email]> wrote:
On Saturday 18 June 2016 21:33:35 R.A. Porter wrote:
> I'm trying to write a multiplexed input stream where multiple consumers can read a common IO stream concurrently without making multiple copies of the stream in memory.

Multiplexing means combining multiple streams into one, e.g. by chunking and with each chunk having an identifier of the source stream. Reading a multiplexed stream, would be actually demultiplexing. I think that is not what you try to do here.

You just want to forward the identical inputstream data to multiple consumers, correct?

Cheers,

Jens

--
"Everything superfluous is wrong!"

   // Jens Wilke - headissue GmbH - Germany
 \//  https://headissue.com

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest


_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: Problem using Phaser.awaitAdvanceInterruptibly()

R.A. Porter

My preference for resolving the slowest reader problem in this case is to timeout and fail all the readers. It's not completely fixed, as the consumers can deregister mid-process; I'm just hoping to determine when that happens. As I understood awaitAdvanceInterruptibly(), it should have timed out my other threads and allowed me to terminate the process.

-r


On Sun, Jun 19, 2016, 7:34 AM Alex Otenko <[hidden email]> wrote:

The crucial point here is how dynamic the reader count is. If it is fixed, then any solution is not lock free, and the slowest reader problem will manifest itself in some form. If it is not fixed,  then you need to define the condition when the buffer can be freed.

Alex

On 19 Jun 2016 13:59, "R.A. Porter" <[hidden email]> wrote:

Right you are, Jens. It indeed should be called something different.

-R.A. Porter


On Sun, Jun 19, 2016, 3:30 AM Jens Wilke <[hidden email]> wrote:
On Saturday 18 June 2016 21:33:35 R.A. Porter wrote:
> I'm trying to write a multiplexed input stream where multiple consumers can read a common IO stream concurrently without making multiple copies of the stream in memory.

Multiplexing means combining multiple streams into one, e.g. by chunking and with each chunk having an identifier of the source stream. Reading a multiplexed stream, would be actually demultiplexing. I think that is not what you try to do here.

You just want to forward the identical inputstream data to multiple consumers, correct?

Cheers,

Jens

--
"Everything superfluous is wrong!"

   // Jens Wilke - headissue GmbH - Germany
 \//  https://headissue.com

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest


_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: Problem using Phaser.awaitAdvanceInterruptibly()

Alex Otenko

You are better off having a sequencer and a single linked list of non-reusable buffers. Then you don't need to know how many readers are looking at the stream.

You can reuse the buffers, if the reader can tell it is the last reader. This is where the reader count is needed.

On 19 Jun 2016 15:51, "R.A. Porter" <[hidden email]> wrote:

My preference for resolving the slowest reader problem in this case is to timeout and fail all the readers. It's not completely fixed, as the consumers can deregister mid-process; I'm just hoping to determine when that happens. As I understood awaitAdvanceInterruptibly(), it should have timed out my other threads and allowed me to terminate the process.

-r


On Sun, Jun 19, 2016, 7:34 AM Alex Otenko <[hidden email]> wrote:

The crucial point here is how dynamic the reader count is. If it is fixed, then any solution is not lock free, and the slowest reader problem will manifest itself in some form. If it is not fixed,  then you need to define the condition when the buffer can be freed.

Alex

On 19 Jun 2016 13:59, "R.A. Porter" <[hidden email]> wrote:

Right you are, Jens. It indeed should be called something different.

-R.A. Porter


On Sun, Jun 19, 2016, 3:30 AM Jens Wilke <[hidden email]> wrote:
On Saturday 18 June 2016 21:33:35 R.A. Porter wrote:
> I'm trying to write a multiplexed input stream where multiple consumers can read a common IO stream concurrently without making multiple copies of the stream in memory.

Multiplexing means combining multiple streams into one, e.g. by chunking and with each chunk having an identifier of the source stream. Reading a multiplexed stream, would be actually demultiplexing. I think that is not what you try to do here.

You just want to forward the identical inputstream data to multiple consumers, correct?

Cheers,

Jens

--
"Everything superfluous is wrong!"

   // Jens Wilke - headissue GmbH - Germany
 \//  https://headissue.com

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest


_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: Problem using Phaser.awaitAdvanceInterruptibly()

Alexei Kaigorodov
In reply to this post by R.A. Porter
R.A. Porter <coyotesqrl <at> gmail.com> writes:

>
> I'm trying to write a multiplexed input stream where multiple consumers
can read a common IO stream concurrently without making multiple copies of
the stream in memory. My first pass through had used a CyclicBarrier, but my
friend pointed out to me that it wasn't flexible enough to allow one of the
consumers to drop out early by calling close on the stream.
> I've updated to use Phaser and, while I have a lot of tests to write and
perform, it appears that almost everything is working as it should be. The
only issue I have is with the use of awaitAdvanceInterruptibly().
>
> I don't want a single bad consumer blocking or otherwise taking too long
consuming the current in-memory buffer to cause all my other consumers to
block on phase advance. I thought I'd be able to manage that with the
await/Advance timeout, but either I've used it wrong or I completely
misunderstand its purpose and function.
>
>
> Any guidance would be much appreciated. If I can't get this to work with
Phaser, I may need to drop to simpler concurrency controls and manage the
thread interaction with a lot more manual code. That's certainly doable, but
definitely not preferred.
>
> N.B. This is a work in progress; it does not reflect my best, final
efforts.
>
> -R.A. Porter
>
> ===
>
> /*
>  Copyright (C) 2016 R.A. Porter
>     This program is free software: you can redistribute it and/or modify
>     it under the terms of the GNU Lesser General Public License as
published by
>     the Free Software Foundation, either version 3 of the License, or
>     (at your option) any later version.
>
>     This program is distributed in the hope that it will be useful,
>     but WITHOUT ANY WARRANTY; without even the implied warranty of
>     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
>     GNU Lesser General Public License for more details.
>
>     You should have received a copy of the GNU Lesser General Public
License

>     along with this program.  If not, see <http://www.gnu.org/licenses/>.
>  */
> package com.dreamloom.multiplex;
>
> import java.io.BufferedInputStream;
> import java.io.IOException;
> import java.io.InputStream;
> import java.io.UncheckedIOException;
> import java.util.List;
> import java.util.Set;
> import java.util.UUID;
> import java.util.concurrent.ExecutionException;
> import java.util.concurrent.ExecutorService;
> import java.util.concurrent.Executors;
> import java.util.concurrent.Future;
> import java.util.concurrent.Phaser;
> import java.util.concurrent.TimeUnit;
> import java.util.concurrent.TimeoutException;
> import java.util.concurrent.atomic.AtomicInteger;
> import java.util.function.Consumer;
> import java.util.stream.Collectors;
>
> /**
>  * Creates a multiplexed input stream, capable of parallel, concurrent
reads by multiple consumers.
>  * <p>
>  * Input streams are depleted as they are consumed; this decorator allows
multiple consumers to read
>  * from the same stream without making multiple copies and overfilling
memory. Because it operates
>  * in parallel, it also provides the performance benefit of concurrent
stream reading without adding
>  * complexity to client code.
>  * <p>
>  * If any of the { <at> code Consumer} threads throw an exception or take
longer than the configured time
>  * to read a chunk of data from memory ({ <at> link #AWAIT_SECONDS}
seconds), the multiplexer will fail
>  * all the consumer threads.
>  */
> public class MultiplexInputStream extends InputStream {
>     private static final int DEFAULT_BUFFER_SIZE = 2048;
>
>     private static final int AWAIT_SECONDS = 10;
>
>     private static final ThreadLocal<Integer> INDEX = new
ThreadLocal<Integer>() {

>          <at> Override
>         protected Integer initialValue() {
>             return 0;
>         }
>     };
>
>     private static final String MULTIPLEX_THREAD_PREFIX = "Multiplex";
>
>     private final BufferedInputStream bis;
>
>     private final byte[] buffer;
>
>     private Phaser phaser;
>
>     private int currentReadBytes;
>
>     private ExecutorService service;
>
>     private final AtomicInteger factoryIdx = new AtomicInteger(0);
>
>     /**
>      * Creates a new { <at> code MultiplexInputStream} instance with an
in-memory buffer of default
>      * size, 2048 bytes.
>      *
>      *  <at> param source the underlying input stream to decorate and
multiplex
>      *  <at> throws IOException if there is a problem reading the
underlying stream
>      */
>     public MultiplexInputStream(InputStream source) throws IOException {
>         this(source, DEFAULT_BUFFER_SIZE);
>     }
>
>     /**
>      * Creates a new { <at> code MultiplexInputStream} instance with an
in-memory buffer of
>      * { <at> code bufferSize)} bytes.
>      *
>      *  <at> param source     the underlying input stream to decorate and
multiplex
>      *  <at> param bufferSize the size of the in-memory buffer
>      *  <at> throws IOException if there is a problem reading the
underlying stream
>      */
>     public MultiplexInputStream(InputStream source, int bufferSize) throws
IOException {
>         buffer = new byte[bufferSize];
>         bis = new BufferedInputStream(source);
>
>         currentReadBytes = bis.read(buffer);
>         if (currentReadBytes == -1) {
>             throw new IllegalStateException(
>                     "Error initializing stream; no content found in
source.");
>         }
>     }
>
>     /**
>      * Starts the multiplexer, running the list of provided { <at> code
Consumer}s to read the stream.
>      * <p>
>      * It is the responsibility of the caller to ensure that each of the
provided { <at> code Consumer}s
>      * correctly ingests and consumes the { <at> code InputStream} and
does not block.
>      *
>      *  <at> param consumers the list of processors reading the { <at>
code InputStream}
>      *  <at> throws ExecutionException
>      *  <at> throws InterruptedException
>      */
>     public void invoke(List<Consumer<InputStream>> consumers)
>             throws ExecutionException, InterruptedException {
>         phaser = new Phaser(consumers.size()) {
>              <at> Override
>             protected boolean onAdvance(int phase, int registeredParties)
{
>                 if (registeredParties == 0) {
>                     return true;
>                 }
>                 try {
>                     currentReadBytes = bis.read(buffer);
>                 } catch (IOException e) {
>                     // Failure to read the underlying stream should result
in a failure of all running

>                     // consumer threads.
>                     throw new UncheckedIOException(e);
>                 }
>
>                 return false;
>             }
>         };
>
>         service = Executors.newFixedThreadPool(consumers.size(),
>                 r -> new Thread(r, getConsumerThreadName()));
>
>         Set<Future> futures = consumers.stream()
>                 .map(consumer -> service.submit(() ->
consumer.accept(MultiplexInputStream.this)))

>                 .collect(Collectors.toSet());
>
>         for (Future future : futures) {
>             future.get();
>         }
>     }
>
>      <at> Override
>     public int read() throws IOException {
>         Integer index = INDEX.get();
>         // Block if this thread has gotten the last byte of the current
buffer
>         if (index == currentReadBytes) {
>             try {
>                 phaser.awaitAdvanceInterruptibly(phaser.arrive(),
AWAIT_SECONDS, TimeUnit.SECONDS);
>
>                 index = 0;
>                 INDEX.set(index);
>             } catch (InterruptedException | TimeoutException e) {
>                 throw new IOException("Error processing multiplexed
input", e);

>             }
>         }
>
>         if (currentReadBytes == -1) {
>             return -1;
>         }
>
>         byte b = buffer[index++];
>         INDEX.set(index);
>         return b;
>     }
>
>      <at> Override
>     public void close() throws IOException {
>         // If caller is one of the consumer threads, arriveAndDeregister
on its behalf;

>         // else, close down resources
>         synchronized (MULTIPLEX_THREAD_PREFIX) {
>             if (Thread.currentThread()
>                     .getName()
>                     .startsWith(MULTIPLEX_THREAD_PREFIX)) {
>                 phaser.arriveAndDeregister();
>             } else {
>                 bis.close();
>                 service.shutdownNow();
>             }
>         }
>     }
>
>     private String getConsumerThreadName() {
>         return String.format("%s-%d-%s", MULTIPLEX_THREAD_PREFIX,
factoryIdx.getAndIncrement(),

>                 UUID.randomUUID()
>                         .toString());
>     }
> }
>
>
>
> _______________________________________________
> Concurrency-interest mailing list
> Concurrency-interest <at> cs.oswego.edu
> http://cs.oswego.edu/mailman/listinfo/concurrency-interest
>

You are trying to use the same instance of MultiplexInputStream for each
consumer, while each consumer has its own reading pointer. You save that
pointer in a thread local variable. As a result, that pointer is available
for the MultiplexInputStream only when consumer calls read() method.
Pointers of other consumers are not available. If they were available,
MultiplexInputStream could know if some part of the buffer has been read by
all consumers, fill it from the source, and allow fast consumers read more
bytes, thus increasing the level of parallelism.

That is, my advice is a) split MultiplexInputStream in 2 classes, one
representing the source and the buffer, and the second representing the
state of consumer, b) use the buffer as a ring buffer c) do not use thread
local. As a result, you'll need not to use Phaser at all -
synchronized/wait/notify would be enough.



_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest