Awaiting a set of tasks on an ExecutorService

classic Classic list List threaded Threaded
10 messages Options
Reply | Threaded
Open this post in threaded view
|

Awaiting a set of tasks on an ExecutorService

Shevek
Hi,

I need to build a complicated object on a multiprocessor system. The
target object uses synchronization to guarantee thread-safe access.
However, the individual tasks which build elements of the target object
are expensive, so I farm them out to an ExecutorService. The question is:

What is the recommended way to submit a stream of tasks to an
ExecutorService, and then at a point, wait until they're all done.

* I don't know the list of tasks up front. I read them iteratively from
a set of files or events.
* A big collection of Future(s) isn't realistic, there are just too many
tasks.
* Reaping completed futures on submission means that I end up with
exceptions in weird places; I really need to gather max 100 suppressed
exceptions, and discard the rest.
* ForkJoinPool has invoke() but I think for a million-task job, I still
end up with a huge list of futures. This also assumes I ignore the note
about synchronization.
* CompletionService allows me to wait for _any_ of the submitted tasks,
but not for _all_ of them.

* Bonus points for sharing a single ExecutorService and having "sets" of
tasks which can be independently awaited. This starts to sound very like
FJP, except for the blocking/I/O and the stream-ness of my task set.

Mostly, dear list, I'm expecting a one-liner from some API class that
I've missed because this is NOT an original request, I just can't find a
standard solution.

Thank you.

S.
_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: Awaiting a set of tasks on an ExecutorService

Josh Humphries-2
I think the easiest thing would be to decorate each task to call "latch.countDown()" on a CountDownLatch that is initialized with the total number of tasks. After they are all submitted, the code that wants to wait for them to finish would simply await the latch. This decouples completion of all tasks from the actual ExecutorService that is running them, so you can share the same ExecutorService for multiple, even overlapping, sets of tasks.

----
Josh Humphries
[hidden email]

On Fri, Aug 18, 2017 at 4:42 PM, Shevek <[hidden email]> wrote:
Hi,

I need to build a complicated object on a multiprocessor system. The target object uses synchronization to guarantee thread-safe access. However, the individual tasks which build elements of the target object are expensive, so I farm them out to an ExecutorService. The question is:

What is the recommended way to submit a stream of tasks to an ExecutorService, and then at a point, wait until they're all done.

* I don't know the list of tasks up front. I read them iteratively from a set of files or events.
* A big collection of Future(s) isn't realistic, there are just too many tasks.
* Reaping completed futures on submission means that I end up with exceptions in weird places; I really need to gather max 100 suppressed exceptions, and discard the rest.
* ForkJoinPool has invoke() but I think for a million-task job, I still end up with a huge list of futures. This also assumes I ignore the note about synchronization.
* CompletionService allows me to wait for _any_ of the submitted tasks, but not for _all_ of them.

* Bonus points for sharing a single ExecutorService and having "sets" of tasks which can be independently awaited. This starts to sound very like FJP, except for the blocking/I/O and the stream-ness of my task set.

Mostly, dear list, I'm expecting a one-liner from some API class that I've missed because this is NOT an original request, I just can't find a standard solution.

Thank you.

S.
_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest


_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: Awaiting a set of tasks on an ExecutorService

Josh Humphries-2
I forgot to address the concern about error handling. For that, you could collect them into a synchronized list, where you simply ignore failures after the size of the list reaches the limit.

Another possibility is to use an ExecutorCompletionService. You can create a new one for each set of tasks, submit all tasks to it, and then poll them all to wait until tasks finish. With that approach, you won't need to decorate the tasks, and instead handle the errors and tracking how many tasks you've polled in the code that is waiting for them to finish.

The ExecutorCompletionService also lets you re-use an ExecutorService. You create a new CompletionService for each group of tasks, but they can all wrap the same underlying ExecutorService.


----
Josh Humphries
[hidden email]

On Fri, Aug 18, 2017 at 4:48 PM, Josh Humphries <[hidden email]> wrote:
I think the easiest thing would be to decorate each task to call "latch.countDown()" on a CountDownLatch that is initialized with the total number of tasks. After they are all submitted, the code that wants to wait for them to finish would simply await the latch. This decouples completion of all tasks from the actual ExecutorService that is running them, so you can share the same ExecutorService for multiple, even overlapping, sets of tasks.

----
Josh Humphries
[hidden email]

On Fri, Aug 18, 2017 at 4:42 PM, Shevek <[hidden email]> wrote:
Hi,

I need to build a complicated object on a multiprocessor system. The target object uses synchronization to guarantee thread-safe access. However, the individual tasks which build elements of the target object are expensive, so I farm them out to an ExecutorService. The question is:

What is the recommended way to submit a stream of tasks to an ExecutorService, and then at a point, wait until they're all done.

* I don't know the list of tasks up front. I read them iteratively from a set of files or events.
* A big collection of Future(s) isn't realistic, there are just too many tasks.
* Reaping completed futures on submission means that I end up with exceptions in weird places; I really need to gather max 100 suppressed exceptions, and discard the rest.
* ForkJoinPool has invoke() but I think for a million-task job, I still end up with a huge list of futures. This also assumes I ignore the note about synchronization.
* CompletionService allows me to wait for _any_ of the submitted tasks, but not for _all_ of them.

* Bonus points for sharing a single ExecutorService and having "sets" of tasks which can be independently awaited. This starts to sound very like FJP, except for the blocking/I/O and the stream-ness of my task set.

Mostly, dear list, I'm expecting a one-liner from some API class that I've missed because this is NOT an original request, I just can't find a standard solution.

Thank you.

S.
_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest



_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: Awaiting a set of tasks on an ExecutorService

Shevek
In reply to this post by Josh Humphries-2
I can't use CountDownLatch because of the streamy nature of the source.
I have no idea how many tasks there are going to be up front. I read an
incoming stream, it turns out there's about 50 million in it, and this
is going to go up by the usual orders of magnitude.

I could use a List of CountDownLatches each with ... but then I rapidly
get into "Someone cleverer than me MUST have solved this before" territory.

I can do my own tricks with an AtomicLong and wait()/notify(), but I
have to make sure the master thread calls get() on the relevant Future
each time a job finishes, otherwise there isn't a happens-before
relationship with the master thread, so I'd need the job to put its own
Future onto a Deque, or something...?

The tasks all return Void, but it's nice to collect the first few
exceptions (currently there are about 750,000 total exceptions thrown).

S.

On 08/18/2017 01:48 PM, Josh Humphries wrote:

> I think the easiest thing would be to decorate each task to call
> "latch.countDown()" on a CountDownLatch that is initialized with the
> total number of tasks. After they are all submitted, the code that wants
> to wait for them to finish would simply await the latch. This decouples
> completion of all tasks from the actual ExecutorService that is running
> them, so you can share the same ExecutorService for multiple, even
> overlapping, sets of tasks.
>
> ----
> *Josh Humphries*
> [hidden email] <mailto:[hidden email]>
>
> On Fri, Aug 18, 2017 at 4:42 PM, Shevek <[hidden email]
> <mailto:[hidden email]>> wrote:
>
>     Hi,
>
>     I need to build a complicated object on a multiprocessor system. The
>     target object uses synchronization to guarantee thread-safe access.
>     However, the individual tasks which build elements of the target
>     object are expensive, so I farm them out to an ExecutorService. The
>     question is:
>
>     What is the recommended way to submit a stream of tasks to an
>     ExecutorService, and then at a point, wait until they're all done.
>
>     * I don't know the list of tasks up front. I read them iteratively
>     from a set of files or events.
>     * A big collection of Future(s) isn't realistic, there are just too
>     many tasks.
>     * Reaping completed futures on submission means that I end up with
>     exceptions in weird places; I really need to gather max 100
>     suppressed exceptions, and discard the rest.
>     * ForkJoinPool has invoke() but I think for a million-task job, I
>     still end up with a huge list of futures. This also assumes I ignore
>     the note about synchronization.
>     * CompletionService allows me to wait for _any_ of the submitted
>     tasks, but not for _all_ of them.
>
>     * Bonus points for sharing a single ExecutorService and having
>     "sets" of tasks which can be independently awaited. This starts to
>     sound very like FJP, except for the blocking/I/O and the stream-ness
>     of my task set.
>
>     Mostly, dear list, I'm expecting a one-liner from some API class
>     that I've missed because this is NOT an original request, I just
>     can't find a standard solution.
>
>     Thank you.
>
>     S.
>     _______________________________________________
>     Concurrency-interest mailing list
>     [hidden email]
>     <mailto:[hidden email]>
>     http://cs.oswego.edu/mailman/listinfo/concurrency-interest
>     <http://cs.oswego.edu/mailman/listinfo/concurrency-interest>
>
>
_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: Awaiting a set of tasks on an ExecutorService

Josh Humphries-2
Sorry that my suggestion got broken into two message. Indeed my first suggestion was incomplete. But the second suggestion, of using a CompletionService should suffice. Would you queue up 50 million tasks in the scenario you describe or do you also need a solution that provides back-pressure?

Without back-pressure, it's simple enough to submit everything to a CompletionService and then poll as many as you submitted when done. But that also will buffer up all results as they complete (and if using a limited-capacity queue, it could deadlock if it's the same thread submitting as polling).

If you need backpressure, I'd imagine something more like this:
  • One thread pulling tasks as they come and submitting to the CompletionService.
    • Before submitting each task:
      • Acquire a permit from a semaphore that has been setup with the maximum number of in-flight/queued tasks you want.
      • Increment a counter (e.g. AtomicLong).
    • After submitting the last task (e.g. end of stream has been detected), set a flag (AtomicBoolean) and then also submit a no-op task (sentinel) to the CompletionService.
      • Acquire permit and increment counter, even for the no-op task
  • Have a second thread that is awaiting completion of all tasks. It is in a loop polling the CompletionService.
    • After polling a task, add exception to list of failures if list is not already maxed out (e.g. already extracted 100 failures).
    • Also release a permit back to the semaphore and decrement the counter.
    • If counter has reached zero and flag is set, all tasks are processed. Done!
    • Otherwise, go back to top of loop, to poll next task.

----
Josh Humphries
[hidden email]

On Fri, Aug 18, 2017 at 4:54 PM, Shevek <[hidden email]> wrote:
I can't use CountDownLatch because of the streamy nature of the source. I have no idea how many tasks there are going to be up front. I read an incoming stream, it turns out there's about 50 million in it, and this is going to go up by the usual orders of magnitude.

I could use a List of CountDownLatches each with ... but then I rapidly get into "Someone cleverer than me MUST have solved this before" territory.

I can do my own tricks with an AtomicLong and wait()/notify(), but I have to make sure the master thread calls get() on the relevant Future each time a job finishes, otherwise there isn't a happens-before relationship with the master thread, so I'd need the job to put its own Future onto a Deque, or something...?

The tasks all return Void, but it's nice to collect the first few exceptions (currently there are about 750,000 total exceptions thrown).

S.

On 08/18/2017 01:48 PM, Josh Humphries wrote:
I think the easiest thing would be to decorate each task to call "latch.countDown()" on a CountDownLatch that is initialized with the total number of tasks. After they are all submitted, the code that wants to wait for them to finish would simply await the latch. This decouples completion of all tasks from the actual ExecutorService that is running them, so you can share the same ExecutorService for multiple, even overlapping, sets of tasks.

----
*Josh Humphries*
[hidden email] <mailto:[hidden email]>


On Fri, Aug 18, 2017 at 4:42 PM, Shevek <[hidden email] <mailto:[hidden email]>> wrote:

    Hi,

    I need to build a complicated object on a multiprocessor system. The
    target object uses synchronization to guarantee thread-safe access.
    However, the individual tasks which build elements of the target
    object are expensive, so I farm them out to an ExecutorService. The
    question is:

    What is the recommended way to submit a stream of tasks to an
    ExecutorService, and then at a point, wait until they're all done.

    * I don't know the list of tasks up front. I read them iteratively
    from a set of files or events.
    * A big collection of Future(s) isn't realistic, there are just too
    many tasks.
    * Reaping completed futures on submission means that I end up with
    exceptions in weird places; I really need to gather max 100
    suppressed exceptions, and discard the rest.
    * ForkJoinPool has invoke() but I think for a million-task job, I
    still end up with a huge list of futures. This also assumes I ignore
    the note about synchronization.
    * CompletionService allows me to wait for _any_ of the submitted
    tasks, but not for _all_ of them.

    * Bonus points for sharing a single ExecutorService and having
    "sets" of tasks which can be independently awaited. This starts to
    sound very like FJP, except for the blocking/I/O and the stream-ness
    of my task set.

    Mostly, dear list, I'm expecting a one-liner from some API class
    that I've missed because this is NOT an original request, I just
    can't find a standard solution.

    Thank you.

    S.
    _______________________________________________
    Concurrency-interest mailing list
    [hidden email]
    <mailto:[hidden email]>
    http://cs.oswego.edu/mailman/listinfo/concurrency-interest
    <http://cs.oswego.edu/mailman/listinfo/concurrency-interest>




_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: Awaiting a set of tasks on an ExecutorService

Nathan and Ila Reynolds
In reply to this post by Shevek
What about CountedCompleter?  The master thread registers a callback via
CountedCompleter.onCompletion().  Call
CountedCompleter.addToPendingCount() before a new task is enqueued. When
a task is completed, call CountedCompleter.tryComplete().

The master thread should call addToPendingCount(1) when the stream of
tasks is first started.  The master thread should call tryComplete()
when the end of the stream is reached.  Otherwise, onCompletion() might
be called prematurely because the master thread is too slow at adding tasks.

If CountedCompleter does not work, consider Phaser.bulkRegister() and
arrive().

-Nathan

On 8/18/2017 2:54 PM, Shevek wrote:

> I can't use CountDownLatch because of the streamy nature of the
> source. I have no idea how many tasks there are going to be up front.
> I read an incoming stream, it turns out there's about 50 million in
> it, and this is going to go up by the usual orders of magnitude.
>
> I could use a List of CountDownLatches each with ... but then I
> rapidly get into "Someone cleverer than me MUST have solved this
> before" territory.
>
> I can do my own tricks with an AtomicLong and wait()/notify(), but I
> have to make sure the master thread calls get() on the relevant Future
> each time a job finishes, otherwise there isn't a happens-before
> relationship with the master thread, so I'd need the job to put its
> own Future onto a Deque, or something...?
>
> The tasks all return Void, but it's nice to collect the first few
> exceptions (currently there are about 750,000 total exceptions thrown).
>
> S.
>
> On 08/18/2017 01:48 PM, Josh Humphries wrote:
>> I think the easiest thing would be to decorate each task to call
>> "latch.countDown()" on a CountDownLatch that is initialized with the
>> total number of tasks. After they are all submitted, the code that
>> wants to wait for them to finish would simply await the latch. This
>> decouples completion of all tasks from the actual ExecutorService
>> that is running them, so you can share the same ExecutorService for
>> multiple, even overlapping, sets of tasks.
>>
>> ----
>> *Josh Humphries*
>> [hidden email] <mailto:[hidden email]>
>>
>> On Fri, Aug 18, 2017 at 4:42 PM, Shevek <[hidden email]
>> <mailto:[hidden email]>> wrote:
>>
>>     Hi,
>>
>>     I need to build a complicated object on a multiprocessor system. The
>>     target object uses synchronization to guarantee thread-safe access.
>>     However, the individual tasks which build elements of the target
>>     object are expensive, so I farm them out to an ExecutorService. The
>>     question is:
>>
>>     What is the recommended way to submit a stream of tasks to an
>>     ExecutorService, and then at a point, wait until they're all done.
>>
>>     * I don't know the list of tasks up front. I read them iteratively
>>     from a set of files or events.
>>     * A big collection of Future(s) isn't realistic, there are just too
>>     many tasks.
>>     * Reaping completed futures on submission means that I end up with
>>     exceptions in weird places; I really need to gather max 100
>>     suppressed exceptions, and discard the rest.
>>     * ForkJoinPool has invoke() but I think for a million-task job, I
>>     still end up with a huge list of futures. This also assumes I ignore
>>     the note about synchronization.
>>     * CompletionService allows me to wait for _any_ of the submitted
>>     tasks, but not for _all_ of them.
>>
>>     * Bonus points for sharing a single ExecutorService and having
>>     "sets" of tasks which can be independently awaited. This starts to
>>     sound very like FJP, except for the blocking/I/O and the stream-ness
>>     of my task set.
>>
>>     Mostly, dear list, I'm expecting a one-liner from some API class
>>     that I've missed because this is NOT an original request, I just
>>     can't find a standard solution.
>>
>>     Thank you.
>>
>>     S.
>>     _______________________________________________
>>     Concurrency-interest mailing list
>>     [hidden email]
>>     <mailto:[hidden email]>
>>     http://cs.oswego.edu/mailman/listinfo/concurrency-interest
>> <http://cs.oswego.edu/mailman/listinfo/concurrency-interest>
>>
>>
> _______________________________________________
> Concurrency-interest mailing list
> [hidden email]
> http://cs.oswego.edu/mailman/listinfo/concurrency-interest

--
-Nathan

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: Awaiting a set of tasks on an ExecutorService

Dávid Karnok
In reply to this post by Shevek
I don't think there is a one liner solution for your case given the amount of tasks, exception handling and other constraints.

You may consider using a reactive library for the job, which are data centric instead of task centric and can manage an unknow number of data items, asynchrony, parallelism and error handling for you. 

Due to this data centric viewpoint, I can only give a general feel how I'd do a files-lines processing with RxJava:

ExpensiveObject o = new ExpensiveObject();
List<Throwable> errors = new ArrayList<Throwable>();

Flowable.fromIterable(listOfFiles)
    .flatMapIterable(file -> Files.readLines(file))
    .parallel()
    .runOn(Schedulers.computation())
    .map(line -> {
         List<Runnable> toExecute = new ArrayList<>();
         try {
             toExecute.add(() -> { o.lines++ });

             if (line.contains("error")) {
                 toExecute.add(() -> { o.errorLines++ });
             }
         } catch (Throwable ex) {
             toExecute.add(() -> { if (errors.size() < 100) { errors.add(ex); } });
         }
         return toExecute;
    })
    .sequential()
    .blockingSubscribe(list -> list.forEach(Runnable::run), Throwable::printStackTrace);

In your case, you'd probably calculate something in "map", then create a Runnable that sets a field/calls a setter on "o" with the calculated value.

2017-08-18 22:42 GMT+02:00 Shevek <[hidden email]>:
Hi,

I need to build a complicated object on a multiprocessor system. The target object uses synchronization to guarantee thread-safe access. However, the individual tasks which build elements of the target object are expensive, so I farm them out to an ExecutorService. The question is:

What is the recommended way to submit a stream of tasks to an ExecutorService, and then at a point, wait until they're all done.

* I don't know the list of tasks up front. I read them iteratively from a set of files or events.
* A big collection of Future(s) isn't realistic, there are just too many tasks.
* Reaping completed futures on submission means that I end up with exceptions in weird places; I really need to gather max 100 suppressed exceptions, and discard the rest.
* ForkJoinPool has invoke() but I think for a million-task job, I still end up with a huge list of futures. This also assumes I ignore the note about synchronization.
* CompletionService allows me to wait for _any_ of the submitted tasks, but not for _all_ of them.

* Bonus points for sharing a single ExecutorService and having "sets" of tasks which can be independently awaited. This starts to sound very like FJP, except for the blocking/I/O and the stream-ness of my task set.

Mostly, dear list, I'm expecting a one-liner from some API class that I've missed because this is NOT an original request, I just can't find a standard solution.

Thank you.

S.
_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest



--
Best regards,
David Karnok

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: Awaiting a set of tasks on an ExecutorService

Shevek
In reply to this post by Josh Humphries-2
Thank you all.

I now have this:

https://gist.github.com/shevek/5f8b3fad55c346ca1391349404a2c0fa

It's meant to allow multiple submits and awaits from various threads;
the caller is responsible for making sure that people stop calling
submit() before anyone calls await() in seriousness.

I do need backpressure; I've been achieving that with an
ArrayBlockingQueue(ncpus * 10) and a CallerRunsPolicy, and the *10 is
meant to deal with the uneven-ness in the task sizes. Is that sane?

S.

On 08/18/2017 02:06 PM, Josh Humphries wrote:

> Sorry that my suggestion got broken into two message. Indeed my first
> suggestion was incomplete. But the second suggestion, of using a
> CompletionService should suffice. Would you queue up 50 million tasks in
> the scenario you describe or do you also need a solution that provides
> back-pressure?
>
> Without back-pressure, it's simple enough to submit everything to a
> CompletionService and then poll as many as you submitted when done. But
> that also will buffer up all results as they complete (and if using a
> limited-capacity queue, it could deadlock if it's the same thread
> submitting as polling).
>
> If you need backpressure, I'd imagine something more like this:
>
>   * One thread pulling tasks as they come and submitting to the
>     CompletionService.
>       o Before submitting each task:
>           + Acquire a permit from a semaphore that has been setup with
>             the maximum number of in-flight/queued tasks you want.
>           + Increment a counter (e.g. AtomicLong).
>       o After submitting the last task (e.g. end of stream has been
>         detected), set a flag (AtomicBoolean) and then also submit a
>         no-op task (sentinel) to the CompletionService.
>           + Acquire permit and increment counter, even for the no-op task
>   * Have a second thread that is awaiting completion of all tasks. It is
>     in a loop polling the CompletionService.
>       o After polling a task, add exception to list of failures if list
>         is not already maxed out (e.g. already extracted 100 failures).
>       o Also release a permit back to the semaphore and decrement the
>         counter.
>       o If counter has reached zero and flag is set, all tasks are
>         processed. Done!
>       o Otherwise, go back to top of loop, to poll next task.
>
>
> ----
> *Josh Humphries*
> [hidden email] <mailto:[hidden email]>
>
> On Fri, Aug 18, 2017 at 4:54 PM, Shevek <[hidden email]
> <mailto:[hidden email]>> wrote:
>
>     I can't use CountDownLatch because of the streamy nature of the
>     source. I have no idea how many tasks there are going to be up
>     front. I read an incoming stream, it turns out there's about 50
>     million in it, and this is going to go up by the usual orders of
>     magnitude.
>
>     I could use a List of CountDownLatches each with ... but then I
>     rapidly get into "Someone cleverer than me MUST have solved this
>     before" territory.
>
>     I can do my own tricks with an AtomicLong and wait()/notify(), but I
>     have to make sure the master thread calls get() on the relevant
>     Future each time a job finishes, otherwise there isn't a
>     happens-before relationship with the master thread, so I'd need the
>     job to put its own Future onto a Deque, or something...?
>
>     The tasks all return Void, but it's nice to collect the first few
>     exceptions (currently there are about 750,000 total exceptions thrown).
>
>     S.
>
>     On 08/18/2017 01:48 PM, Josh Humphries wrote:
>
>         I think the easiest thing would be to decorate each task to call
>         "latch.countDown()" on a CountDownLatch that is initialized with
>         the total number of tasks. After they are all submitted, the
>         code that wants to wait for them to finish would simply await
>         the latch. This decouples completion of all tasks from the
>         actual ExecutorService that is running them, so you can share
>         the same ExecutorService for multiple, even overlapping, sets of
>         tasks.
>
>         ----
>         *Josh Humphries*
>         [hidden email] <mailto:[hidden email]>
>         <mailto:[hidden email] <mailto:[hidden email]>>
>
>
>         On Fri, Aug 18, 2017 at 4:42 PM, Shevek <[hidden email]
>         <mailto:[hidden email]> <mailto:[hidden email]
>         <mailto:[hidden email]>>> wrote:
>
>              Hi,
>
>              I need to build a complicated object on a multiprocessor
>         system. The
>              target object uses synchronization to guarantee thread-safe
>         access.
>              However, the individual tasks which build elements of the
>         target
>              object are expensive, so I farm them out to an
>         ExecutorService. The
>              question is:
>
>              What is the recommended way to submit a stream of tasks to an
>              ExecutorService, and then at a point, wait until they're
>         all done.
>
>              * I don't know the list of tasks up front. I read them
>         iteratively
>              from a set of files or events.
>              * A big collection of Future(s) isn't realistic, there are
>         just too
>              many tasks.
>              * Reaping completed futures on submission means that I end
>         up with
>              exceptions in weird places; I really need to gather max 100
>              suppressed exceptions, and discard the rest.
>              * ForkJoinPool has invoke() but I think for a million-task
>         job, I
>              still end up with a huge list of futures. This also assumes
>         I ignore
>              the note about synchronization.
>              * CompletionService allows me to wait for _any_ of the
>         submitted
>              tasks, but not for _all_ of them.
>
>              * Bonus points for sharing a single ExecutorService and having
>              "sets" of tasks which can be independently awaited. This
>         starts to
>              sound very like FJP, except for the blocking/I/O and the
>         stream-ness
>              of my task set.
>
>              Mostly, dear list, I'm expecting a one-liner from some API
>         class
>              that I've missed because this is NOT an original request, I
>         just
>              can't find a standard solution.
>
>              Thank you.
>
>              S.
>              _______________________________________________
>              Concurrency-interest mailing list
>         [hidden email]
>         <mailto:[hidden email]>
>              <mailto:[hidden email]
>         <mailto:[hidden email]>>
>         http://cs.oswego.edu/mailman/listinfo/concurrency-interest
>         <http://cs.oswego.edu/mailman/listinfo/concurrency-interest>
>              <http://cs.oswego.edu/mailman/listinfo/concurrency-interest
>         <http://cs.oswego.edu/mailman/listinfo/concurrency-interest>>
>
>
>
_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: Awaiting a set of tasks on an ExecutorService

Josh Humphries-2
On Fri, Aug 18, 2017 at 5:38 PM, Shevek <[hidden email]> wrote:
Thank you all.

I now have this:

https://gist.github.com/shevek/5f8b3fad55c346ca1391349404a2c0fa


It's meant to allow multiple submits and awaits from various threads; the caller is responsible for making sure that people stop calling submit() before anyone calls await() in seriousness.

I do need backpressure; I've been achieving that with an ArrayBlockingQueue(ncpus * 10) and a CallerRunsPolicy, and the *10 is meant to deal with the uneven-ness in the task sizes. Is that sane?

Definitely sane! ThreadPoolExecutor's rejection policy + fixed capacity queue is definitely the way to do backpressure. I would have suggested it, but wasn't sure if you needed to re-use ExecutorService because it was some sort of otherwise shared resource. (In which case, relying on that for backpressure vs. a separate mechanism could starve other clients of the thread pool.)

 

S.

On 08/18/2017 02:06 PM, Josh Humphries wrote:
Sorry that my suggestion got broken into two message. Indeed my first suggestion was incomplete. But the second suggestion, of using a CompletionService should suffice. Would you queue up 50 million tasks in the scenario you describe or do you also need a solution that provides back-pressure?

Without back-pressure, it's simple enough to submit everything to a CompletionService and then poll as many as you submitted when done. But that also will buffer up all results as they complete (and if using a limited-capacity queue, it could deadlock if it's the same thread submitting as polling).

If you need backpressure, I'd imagine something more like this:

  * One thread pulling tasks as they come and submitting to the
    CompletionService.
      o Before submitting each task:
          + Acquire a permit from a semaphore that has been setup with
            the maximum number of in-flight/queued tasks you want.
          + Increment a counter (e.g. AtomicLong).
      o After submitting the last task (e.g. end of stream has been
        detected), set a flag (AtomicBoolean) and then also submit a
        no-op task (sentinel) to the CompletionService.
          + Acquire permit and increment counter, even for the no-op task
  * Have a second thread that is awaiting completion of all tasks. It is
    in a loop polling the CompletionService.
      o After polling a task, add exception to list of failures if list
        is not already maxed out (e.g. already extracted 100 failures).
      o Also release a permit back to the semaphore and decrement the
        counter.
      o If counter has reached zero and flag is set, all tasks are
        processed. Done!
      o Otherwise, go back to top of loop, to poll next task.


----
*Josh Humphries*
[hidden email] <mailto:[hidden email]>

On Fri, Aug 18, 2017 at 4:54 PM, Shevek <[hidden email] <mailto:[hidden email]>> wrote:

    I can't use CountDownLatch because of the streamy nature of the
    source. I have no idea how many tasks there are going to be up
    front. I read an incoming stream, it turns out there's about 50
    million in it, and this is going to go up by the usual orders of
    magnitude.

    I could use a List of CountDownLatches each with ... but then I
    rapidly get into "Someone cleverer than me MUST have solved this
    before" territory.

    I can do my own tricks with an AtomicLong and wait()/notify(), but I
    have to make sure the master thread calls get() on the relevant
    Future each time a job finishes, otherwise there isn't a
    happens-before relationship with the master thread, so I'd need the
    job to put its own Future onto a Deque, or something...?

    The tasks all return Void, but it's nice to collect the first few
    exceptions (currently there are about 750,000 total exceptions thrown).

    S.

    On 08/18/2017 01:48 PM, Josh Humphries wrote:

        I think the easiest thing would be to decorate each task to call
        "latch.countDown()" on a CountDownLatch that is initialized with
        the total number of tasks. After they are all submitted, the
        code that wants to wait for them to finish would simply await
        the latch. This decouples completion of all tasks from the
        actual ExecutorService that is running them, so you can share
        the same ExecutorService for multiple, even overlapping, sets of
        tasks.

        ----
        *Josh Humphries*
        [hidden email] <mailto:[hidden email]>
        <mailto:[hidden email] <mailto:[hidden email]>>


        On Fri, Aug 18, 2017 at 4:42 PM, Shevek <[hidden email]
        <mailto:[hidden email]> <mailto:[hidden email]

        <mailto:[hidden email]>>> wrote:

             Hi,

             I need to build a complicated object on a multiprocessor
        system. The
             target object uses synchronization to guarantee thread-safe
        access.
             However, the individual tasks which build elements of the
        target
             object are expensive, so I farm them out to an
        ExecutorService. The
             question is:

             What is the recommended way to submit a stream of tasks to an
             ExecutorService, and then at a point, wait until they're
        all done.

             * I don't know the list of tasks up front. I read them
        iteratively
             from a set of files or events.
             * A big collection of Future(s) isn't realistic, there are
        just too
             many tasks.
             * Reaping completed futures on submission means that I end
        up with
             exceptions in weird places; I really need to gather max 100
             suppressed exceptions, and discard the rest.
             * ForkJoinPool has invoke() but I think for a million-task
        job, I
             still end up with a huge list of futures. This also assumes
        I ignore
             the note about synchronization.
             * CompletionService allows me to wait for _any_ of the
        submitted
             tasks, but not for _all_ of them.

             * Bonus points for sharing a single ExecutorService and having
             "sets" of tasks which can be independently awaited. This
        starts to
             sound very like FJP, except for the blocking/I/O and the
        stream-ness
             of my task set.

             Mostly, dear list, I'm expecting a one-liner from some API
        class
             that I've missed because this is NOT an original request, I
        just
             can't find a standard solution.

             Thank you.

             S.
             _______________________________________________
             Concurrency-interest mailing list
        [hidden email]
        <mailto:[hidden email]>
             <mailto:[hidden email]
        <mailto:[hidden email]>>
        http://cs.oswego.edu/mailman/listinfo/concurrency-interest
        <http://cs.oswego.edu/mailman/listinfo/concurrency-interest>
             <http://cs.oswego.edu/mailman/listinfo/concurrency-interest
        <http://cs.oswego.edu/mailman/listinfo/concurrency-interest>>



_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest


_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: Awaiting a set of tasks on an ExecutorService

Josh Humphries-2
In reply to this post by Shevek
On Fri, Aug 18, 2017 at 5:38 PM, Shevek <[hidden email]> wrote:
Thank you all.

I now have this:

https://gist.github.com/shevek/5f8b3fad55c346ca1391349404a2c0fa

It's meant to allow multiple submits and awaits from various threads; the caller is responsible for making sure that people stop calling submit() before anyone calls await() in seriousness.

Thinking about this a bit more, I think Nathan Reynold's suggestion to use a Phaser, instead of my CompletionService idea, would be better. This will result in less garbage (since you don't need to create and queue futures -- "execute" the tasks instead of "submit" them) and allows you to skip the logic that drains the completion service queue as tasks are submitted, to prevent too many from accumulating (which is non-deterministic and could still result in a large number of futures queued up if many tasks can complete in between calls to add new tasks).

As tasks are submitted, "register" with the phaser. Then you need only "await" the phaser's current/latest phase at the end. You'd then decorate each task to catch any throwables (and record up to 100, or whatever limit, in a concurrent or synchronized collection) as well as finally "arrive" with the phaser on completion.

Phaser's API is a bit confusing IMO (capable of supporting far more complex use cases than this one). I've written simple "up-and-down" latches (on top of AbstractQueuedSynchronizer) that result in easier to read and understand usages (more-or-less like Go's sync.WaitGroup), but I can't seem to find a link to one right now (I guess the ones I've written weren't in any open-source projects). If I find one, I'll send it in case it helps.


I do need backpressure; I've been achieving that with an ArrayBlockingQueue(ncpus * 10) and a CallerRunsPolicy, and the *10 is meant to deal with the uneven-ness in the task sizes. Is that sane?

S.

On 08/18/2017 02:06 PM, Josh Humphries wrote:
Sorry that my suggestion got broken into two message. Indeed my first suggestion was incomplete. But the second suggestion, of using a CompletionService should suffice. Would you queue up 50 million tasks in the scenario you describe or do you also need a solution that provides back-pressure?

Without back-pressure, it's simple enough to submit everything to a CompletionService and then poll as many as you submitted when done. But that also will buffer up all results as they complete (and if using a limited-capacity queue, it could deadlock if it's the same thread submitting as polling).

If you need backpressure, I'd imagine something more like this:

  * One thread pulling tasks as they come and submitting to the
    CompletionService.
      o Before submitting each task:
          + Acquire a permit from a semaphore that has been setup with
            the maximum number of in-flight/queued tasks you want.
          + Increment a counter (e.g. AtomicLong).
      o After submitting the last task (e.g. end of stream has been
        detected), set a flag (AtomicBoolean) and then also submit a
        no-op task (sentinel) to the CompletionService.
          + Acquire permit and increment counter, even for the no-op task
  * Have a second thread that is awaiting completion of all tasks. It is
    in a loop polling the CompletionService.
      o After polling a task, add exception to list of failures if list
        is not already maxed out (e.g. already extracted 100 failures).
      o Also release a permit back to the semaphore and decrement the
        counter.
      o If counter has reached zero and flag is set, all tasks are
        processed. Done!
      o Otherwise, go back to top of loop, to poll next task.


----
*Josh Humphries*
[hidden email] <mailto:[hidden email]>

On Fri, Aug 18, 2017 at 4:54 PM, Shevek <[hidden email] <mailto:[hidden email]>> wrote:

    I can't use CountDownLatch because of the streamy nature of the
    source. I have no idea how many tasks there are going to be up
    front. I read an incoming stream, it turns out there's about 50
    million in it, and this is going to go up by the usual orders of
    magnitude.

    I could use a List of CountDownLatches each with ... but then I
    rapidly get into "Someone cleverer than me MUST have solved this
    before" territory.

    I can do my own tricks with an AtomicLong and wait()/notify(), but I
    have to make sure the master thread calls get() on the relevant
    Future each time a job finishes, otherwise there isn't a
    happens-before relationship with the master thread, so I'd need the
    job to put its own Future onto a Deque, or something...?

    The tasks all return Void, but it's nice to collect the first few
    exceptions (currently there are about 750,000 total exceptions thrown).

    S.

    On 08/18/2017 01:48 PM, Josh Humphries wrote:

        I think the easiest thing would be to decorate each task to call
        "latch.countDown()" on a CountDownLatch that is initialized with
        the total number of tasks. After they are all submitted, the
        code that wants to wait for them to finish would simply await
        the latch. This decouples completion of all tasks from the
        actual ExecutorService that is running them, so you can share
        the same ExecutorService for multiple, even overlapping, sets of
        tasks.

        ----
        *Josh Humphries*
        [hidden email] <mailto:[hidden email]>
        <mailto:[hidden email] <mailto:[hidden email]>>


        On Fri, Aug 18, 2017 at 4:42 PM, Shevek <[hidden email]
        <mailto:[hidden email]> <mailto:[hidden email]

        <mailto:[hidden email]>>> wrote:

             Hi,

             I need to build a complicated object on a multiprocessor
        system. The
             target object uses synchronization to guarantee thread-safe
        access.
             However, the individual tasks which build elements of the
        target
             object are expensive, so I farm them out to an
        ExecutorService. The
             question is:

             What is the recommended way to submit a stream of tasks to an
             ExecutorService, and then at a point, wait until they're
        all done.

             * I don't know the list of tasks up front. I read them
        iteratively
             from a set of files or events.
             * A big collection of Future(s) isn't realistic, there are
        just too
             many tasks.
             * Reaping completed futures on submission means that I end
        up with
             exceptions in weird places; I really need to gather max 100
             suppressed exceptions, and discard the rest.
             * ForkJoinPool has invoke() but I think for a million-task
        job, I
             still end up with a huge list of futures. This also assumes
        I ignore
             the note about synchronization.
             * CompletionService allows me to wait for _any_ of the
        submitted
             tasks, but not for _all_ of them.

             * Bonus points for sharing a single ExecutorService and having
             "sets" of tasks which can be independently awaited. This
        starts to
             sound very like FJP, except for the blocking/I/O and the
        stream-ness
             of my task set.

             Mostly, dear list, I'm expecting a one-liner from some API
        class
             that I've missed because this is NOT an original request, I
        just
             can't find a standard solution.

             Thank you.

             S.
             _______________________________________________
             Concurrency-interest mailing list
        [hidden email]
        <mailto:[hidden email]>
             <mailto:[hidden email]
        <mailto:[hidden email]>>
        http://cs.oswego.edu/mailman/listinfo/concurrency-interest
        <http://cs.oswego.edu/mailman/listinfo/concurrency-interest>
             <http://cs.oswego.edu/mailman/listinfo/concurrency-interest
        <http://cs.oswego.edu/mailman/listinfo/concurrency-interest>>



_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest


_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest