Synchronization primitives in Reactive Streams implementations

classic Classic list List threaded Threaded
18 messages Options
Reply | Threaded
Open this post in threaded view
|

Synchronization primitives in Reactive Streams implementations

JSR166 Concurrency mailing list
Hello,

Let me start this discussion as a branch of the "Reactive Streams utility API"
thread here:

  http://mail.openjdk.java.net/pipermail/core-libs-dev/2018-September/055671.html

I would like to talk about a particular synchronization primitive I find to be
repeatedly reinvented in Reactive Streams implementations. I hope that we could
discuss different incarnations of this primitive and maybe extract it into a
reusable component.

The primitive in question assists a number of parties in executing tasks in a
sequential and non-blocking fashion without having to use a dedicated thread of
execution. Though specifics may vary, the core of the primitive consists of a
tiny protocol for bidirectional communication between parties, a backlog of
tasks and an executor to execute the tasks in. All this is packed into a single
method for a party to invoke. When a party invokes the method, the method adds
the passed task to the backlog and then checks whether the method must handle
the task or the method may simply return. If the method must handle the task,
then the method executes a special service task in the executor and returns.
From that moment and up until the service task has finished all tasks that are
added to the backlog are handled by this service task. The service task finishes
when the backlog becomes empty.

In other words, parties are busy with dumping their tasks on each other. Either
a party gets lucky and can rest assured the task it has just added will be
looked after, or it ends up with executing its task and, possibly, many others
dumped on it by its luckier peers. In an extreme case where the underlying
executor is a "calling thread executor" (`Executor executor = Runnable::run`),
an unlucky party can get really buried itself in work. A metaphor for this
strategy could be "work dumping" (or "work foisting").

Without diving further into too much detail here are 2 components I would like
to start with:

  1. MutexExecutor
  https://github.com/lightbend/microprofile-reactive-streams/blob/master/zerodep/src/main/java/com/lightbend/microprofile/reactive/streams/zerodep/MutexExecutor.java
  2. SequentialScheduler[^1]
  http://hg.openjdk.java.net/jdk/jdk11/file/ee6f7a61f3a5/src/java.net.http/share/classes/jdk/internal/net/http/common/SequentialScheduler.java

I believe that a similar (though a bit more sophisticated) primitive was implemented
in `java.util.concurrent.SubmissionPublisher`.

Do you think they fit into the idea described above? Could there be some common
mechanics extracted? Is there any potentially reusable component? Are there any
other contexts in which this can be used?

Thanks,
-Pavel

[^1]: `SequentialScheduler` evolved from a much simpler `CooperativeHandler` http://hg.openjdk.java.net/jdk9/sandbox/jdk/file/7c7a3c48196e/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/CooperativeHandler.java 
adding asynchronous tasks and an underlying executor.

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: Synchronization primitives in Reactive Streams implementations

JSR166 Concurrency mailing list
We call this trampolining or queue-drain in ReactiveX and the concept was already invented by the time we switched to such lock-free coordination.

There are two typical implementations. The first reuses one of the involved threads to do as much work as possible:

concurrentQueue.offer(ThingsToDo);
if (wip.getAndIncrement() == 0) {
   do {
      var todo =  concurrentQueue.poll();
      handle(todo);
   } while (wip.decrementAndGet() != 0);
}

Another one uses a dedicated executor (scheduler, worker, actor, etc.) to perform the loop body:

concurrentQueue.offer(ThingsToDo);
if (wip.getAndIncrement() == 0) {
   executor.execute(() -> {
      do {
         var todo =  concurrentQueue.poll();
         handle(todo);
      } while (wip.decrementAndGet() != 0);
   });
}

The latter has the benefit that the handle() method will run non-overlappingly even if the executor is multi-threaded; it will occupy one of its threads as long as possible and the atomic operation on wip makes sure the do-loop body gets executed exclusively as well. Also unlike the first one, this variant could be made fair by allowing it to get interleaved by other tasks on the same executor.

The reason we don't have one abstraction for these in general is the inlining of other features/behavior of reactive operators while making them more concise and performant.

Pavel Rappo via Concurrency-interest <[hidden email]> ezt írta (időpont: 2018. szept. 28., P, 15:24):
Hello,

Let me start this discussion as a branch of the "Reactive Streams utility API"
thread here:

  http://mail.openjdk.java.net/pipermail/core-libs-dev/2018-September/055671.html

I would like to talk about a particular synchronization primitive I find to be
repeatedly reinvented in Reactive Streams implementations. I hope that we could
discuss different incarnations of this primitive and maybe extract it into a
reusable component.

The primitive in question assists a number of parties in executing tasks in a
sequential and non-blocking fashion without having to use a dedicated thread of
execution. Though specifics may vary, the core of the primitive consists of a
tiny protocol for bidirectional communication between parties, a backlog of
tasks and an executor to execute the tasks in. All this is packed into a single
method for a party to invoke. When a party invokes the method, the method adds
the passed task to the backlog and then checks whether the method must handle
the task or the method may simply return. If the method must handle the task,
then the method executes a special service task in the executor and returns.
From that moment and up until the service task has finished all tasks that are
added to the backlog are handled by this service task. The service task finishes
when the backlog becomes empty.

In other words, parties are busy with dumping their tasks on each other. Either
a party gets lucky and can rest assured the task it has just added will be
looked after, or it ends up with executing its task and, possibly, many others
dumped on it by its luckier peers. In an extreme case where the underlying
executor is a "calling thread executor" (`Executor executor = Runnable::run`),
an unlucky party can get really buried itself in work. A metaphor for this
strategy could be "work dumping" (or "work foisting").

Without diving further into too much detail here are 2 components I would like
to start with:

  1. MutexExecutor
  https://github.com/lightbend/microprofile-reactive-streams/blob/master/zerodep/src/main/java/com/lightbend/microprofile/reactive/streams/zerodep/MutexExecutor.java
  2. SequentialScheduler[^1]
  http://hg.openjdk.java.net/jdk/jdk11/file/ee6f7a61f3a5/src/java.net.http/share/classes/jdk/internal/net/http/common/SequentialScheduler.java

I believe that a similar (though a bit more sophisticated) primitive was implemented
in `java.util.concurrent.SubmissionPublisher`.

Do you think they fit into the idea described above? Could there be some common
mechanics extracted? Is there any potentially reusable component? Are there any
other contexts in which this can be used?

Thanks,
-Pavel

[^1]: `SequentialScheduler` evolved from a much simpler `CooperativeHandler` http://hg.openjdk.java.net/jdk9/sandbox/jdk/file/7c7a3c48196e/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/CooperativeHandler.java
adding asynchronous tasks and an underlying executor.

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest


--
Best regards,
David Karnok

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: Synchronization primitives in Reactive Streams implementations

JSR166 Concurrency mailing list
In reply to this post by JSR166 Concurrency mailing list
Hi Pavel,

It's an interesting problem.
Come to think about it, besides the MutexExecutor, I've also got this and that.

On Fri, Sep 28, 2018 at 3:24 PM Pavel Rappo via Concurrency-interest <[hidden email]> wrote:
Hello,

Let me start this discussion as a branch of the "Reactive Streams utility API"
thread here:

  http://mail.openjdk.java.net/pipermail/core-libs-dev/2018-September/055671.html

I would like to talk about a particular synchronization primitive I find to be
repeatedly reinvented in Reactive Streams implementations. I hope that we could
discuss different incarnations of this primitive and maybe extract it into a
reusable component.

The primitive in question assists a number of parties in executing tasks in a
sequential and non-blocking fashion without having to use a dedicated thread of
execution. Though specifics may vary, the core of the primitive consists of a
tiny protocol for bidirectional communication between parties, a backlog of
tasks and an executor to execute the tasks in. All this is packed into a single
method for a party to invoke. When a party invokes the method, the method adds
the passed task to the backlog and then checks whether the method must handle
the task or the method may simply return. If the method must handle the task,
then the method executes a special service task in the executor and returns.
From that moment and up until the service task has finished all tasks that are
added to the backlog are handled by this service task. The service task finishes
when the backlog becomes empty.

In other words, parties are busy with dumping their tasks on each other. Either
a party gets lucky and can rest assured the task it has just added will be
looked after, or it ends up with executing its task and, possibly, many others
dumped on it by its luckier peers. In an extreme case where the underlying
executor is a "calling thread executor" (`Executor executor = Runnable::run`),
an unlucky party can get really buried itself in work. A metaphor for this
strategy could be "work dumping" (or "work foisting").

Without diving further into too much detail here are 2 components I would like
to start with:

  1. MutexExecutor
  https://github.com/lightbend/microprofile-reactive-streams/blob/master/zerodep/src/main/java/com/lightbend/microprofile/reactive/streams/zerodep/MutexExecutor.java
  2. SequentialScheduler[^1]
  http://hg.openjdk.java.net/jdk/jdk11/file/ee6f7a61f3a5/src/java.net.http/share/classes/jdk/internal/net/http/common/SequentialScheduler.java

I believe that a similar (though a bit more sophisticated) primitive was implemented
in `java.util.concurrent.SubmissionPublisher`.

Do you think they fit into the idea described above? Could there be some common
mechanics extracted? Is there any potentially reusable component? Are there any
other contexts in which this can be used?

Thanks,
-Pavel

[^1]: `SequentialScheduler` evolved from a much simpler `CooperativeHandler` http://hg.openjdk.java.net/jdk9/sandbox/jdk/file/7c7a3c48196e/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/CooperativeHandler.java
adding asynchronous tasks and an underlying executor.

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest


--
Cheers,

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: Synchronization primitives in Reactive Streams implementations

JSR166 Concurrency mailing list
In reply to this post by JSR166 Concurrency mailing list


> On 28 Sep 2018, at 15:25, Dávid Karnok <[hidden email]> wrote:
>

> We call this trampolining or queue-drain in ReactiveX and the concept was already invented by the time we switched to such lock-free coordination.

Wow! Did you recognize the idea from the description I gave or from those
implementations I referred to? Anyhow, it's good to learn one more name of this
idea. For the sake of tracing the origins and discovering more use cases could
you maybe remember how exactly you learned about this concept? You said it had
been already invented by the time you switched to it in ReactiveX? Did you see
it somewhere?

> There are two typical implementations.

And more flavours thereof. Having or not having an executor is only one of the
choices an implementor has to make[^1]. The nature of the task, the way the
backlog is represented, reentrancy, fairness and error handling -- all these are
important. The result is that a particular implementation may look quite
different from the examples you've provided. However, no matter which
implementation we pick, the core stays the same. An executor, a cumulating
backlog, and a state machine with at least 3 states[^2].

> The reason we don't have one abstraction for these in general is the inlining of other features/behavior of reactive operators while making them more concise and performant.

Maybe you could provide a couple of examples showing that abstracting out the
common bits would singnificantly hinder performance or conciseness? The reason
I'm asking is that I find discovering this idea is mentally difficult and
implementing it is error-prone. I still hope we can do something about it.



[^1]: Though once again, a "calling thread executor" may help with the
"always use an executor" choice.

[^2]: For example, we can't get away with switching to `AtomicBoolean wip` in
the examples you've provided, as this way we will risk missing tasks added to
the backlog. By the way, does "wip" stand for "Work In Progress"?

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: Synchronization primitives in Reactive Streams implementations

JSR166 Concurrency mailing list
In reply to this post by JSR166 Concurrency mailing list
> On 30 Sep 2018, at 23:30, Viktor Klang <[hidden email]> wrote:
>
> Come to think about it, besides the MutexExecutor, I've also got this and that.
>

Oh my! It will take me some time to read Scala code. Sorry about that.
Meanwhile, maybe you could answer some questions I have on `MutexExecutor`?

1. I don't know whether this question should go to you or James Roper. The way
`MutexExecutor`[^1] is used in microprofile-reactive-streams-lightbend[^2]
hints that we could probably use a simpler specialization of the `MutexExecutor`
leveraging the fact that the task is always the same. Maybe only for the sake of
generating less garbage?

2. Looking at line 63. Was there a particular reason you used busy-waiting
instead of, say, reusing `ConcurrentLinkedQueue` or making threads help each
other?

3. I have little experience with the VarHandle API, but I have a question :-)
Would it broke the code if we switched to `getAcquire()` instead of `get()`,
line 63? The corresponding operation is `lazySet`, or VarHandle `setRelease`,
line 31.



[^1]: https://github.com/lightbend/microprofile-reactive-streams/blob/master/zerodep/src/main/java/com/lightbend/microprofile/reactive/streams/zerodep/MutexExecutor.java
[^2]: https://github.com/lightbend/microprofile-reactive-streams/blob/master/zerodep/src/main/java/com/lightbend/microprofile/reactive/streams/zerodep/BuiltGraph.java#L108
_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: Synchronization primitives in Reactive Streams implementations

JSR166 Concurrency mailing list
Interesting to see this discussion of a MutexExecutor concept, because
I've implemented something called GuardedExecutor that I'm planning to
share soon. I'll put it up in its own GitHub repo at some point, but for
now I've posted the API as a Guava feature request (since I see it as
the natural evolution of Guava's Monitor, which I wrote several years
ago):

https://github.com/google/guava/issues/3265

Once the code is posted I will start a new thread here to drum up
interest. Just wanted to share the idea on this thread in case anyone's
exploring similar use cases.

Cheers,
Justin


On 10/1/18, 9:12 AM, Pavel Rappo wrote:

> > On 30 Sep 2018, at 23:30, Viktor Klang <[hidden email]> wrote:
> >
> > Come to think about it, besides the MutexExecutor, I've also got this and that.
> >
>
> Oh my! It will take me some time to read Scala code. Sorry about that.
> Meanwhile, maybe you could answer some questions I have on `MutexExecutor`?
>
> 1. I don't know whether this question should go to you or James Roper. The way
> `MutexExecutor`[^1] is used in microprofile-reactive-streams-lightbend[^2]
> hints that we could probably use a simpler specialization of the `MutexExecutor`
> leveraging the fact that the task is always the same. Maybe only for the sake of
> generating less garbage?
>
> 2. Looking at line 63. Was there a particular reason you used busy-waiting
> instead of, say, reusing `ConcurrentLinkedQueue` or making threads help each
> other?
>
> 3. I have little experience with the VarHandle API, but I have a question :-)
> Would it broke the code if we switched to `getAcquire()` instead of `get()`,
> line 63? The corresponding operation is `lazySet`, or VarHandle `setRelease`,
> line 31.

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: Synchronization primitives in Reactive Streams implementations

JSR166 Concurrency mailing list
In reply to this post by JSR166 Concurrency mailing list
I have a similar Executor to your dedicated example.  It's called SerializingExecutor and does the same thing but with one fewer atomic operation.   

I don't think it is easily extractable because some decisions cannot be made automatically:

- What should happen on an exception?  Does the queue stop draining?  Should it power through it?  If it stops, should the next execute() call keep going?   
- Can a serializing executor wrap another serializing executor?  The "serial" ness of this gets tricky fast with multiple, because the order of executions can change.   
- CLQ is MPMC, but it could feasibly be optimized to MPSC, since there is only one thread draining the queue.  (and in special circumstances, be SPSC).   
- For single thread scenarios, serial execution can be done without synchronization.   Its possible to avoid reentrancy more cheaply, especially when wrapping a direct executor.  It's unclear what the commonality is between the the properly synchronized and unsynchronized one is.


On Fri, Sep 28, 2018 at 7:58 AM Dávid Karnok via Concurrency-interest <[hidden email]> wrote:
We call this trampolining or queue-drain in ReactiveX and the concept was already invented by the time we switched to such lock-free coordination.

There are two typical implementations. The first reuses one of the involved threads to do as much work as possible:

concurrentQueue.offer(ThingsToDo);
if (wip.getAndIncrement() == 0) {
   do {
      var todo =  concurrentQueue.poll();
      handle(todo);
   } while (wip.decrementAndGet() != 0);
}

Another one uses a dedicated executor (scheduler, worker, actor, etc.) to perform the loop body:

concurrentQueue.offer(ThingsToDo);
if (wip.getAndIncrement() == 0) {
   executor.execute(() -> {
      do {
         var todo =  concurrentQueue.poll();
         handle(todo);
      } while (wip.decrementAndGet() != 0);
   });
}

The latter has the benefit that the handle() method will run non-overlappingly even if the executor is multi-threaded; it will occupy one of its threads as long as possible and the atomic operation on wip makes sure the do-loop body gets executed exclusively as well. Also unlike the first one, this variant could be made fair by allowing it to get interleaved by other tasks on the same executor.

The reason we don't have one abstraction for these in general is the inlining of other features/behavior of reactive operators while making them more concise and performant.

Pavel Rappo via Concurrency-interest <[hidden email]> ezt írta (időpont: 2018. szept. 28., P, 15:24):
Hello,

Let me start this discussion as a branch of the "Reactive Streams utility API"
thread here:

  http://mail.openjdk.java.net/pipermail/core-libs-dev/2018-September/055671.html

I would like to talk about a particular synchronization primitive I find to be
repeatedly reinvented in Reactive Streams implementations. I hope that we could
discuss different incarnations of this primitive and maybe extract it into a
reusable component.

The primitive in question assists a number of parties in executing tasks in a
sequential and non-blocking fashion without having to use a dedicated thread of
execution. Though specifics may vary, the core of the primitive consists of a
tiny protocol for bidirectional communication between parties, a backlog of
tasks and an executor to execute the tasks in. All this is packed into a single
method for a party to invoke. When a party invokes the method, the method adds
the passed task to the backlog and then checks whether the method must handle
the task or the method may simply return. If the method must handle the task,
then the method executes a special service task in the executor and returns.
From that moment and up until the service task has finished all tasks that are
added to the backlog are handled by this service task. The service task finishes
when the backlog becomes empty.

In other words, parties are busy with dumping their tasks on each other. Either
a party gets lucky and can rest assured the task it has just added will be
looked after, or it ends up with executing its task and, possibly, many others
dumped on it by its luckier peers. In an extreme case where the underlying
executor is a "calling thread executor" (`Executor executor = Runnable::run`),
an unlucky party can get really buried itself in work. A metaphor for this
strategy could be "work dumping" (or "work foisting").

Without diving further into too much detail here are 2 components I would like
to start with:

  1. MutexExecutor
  https://github.com/lightbend/microprofile-reactive-streams/blob/master/zerodep/src/main/java/com/lightbend/microprofile/reactive/streams/zerodep/MutexExecutor.java
  2. SequentialScheduler[^1]
  http://hg.openjdk.java.net/jdk/jdk11/file/ee6f7a61f3a5/src/java.net.http/share/classes/jdk/internal/net/http/common/SequentialScheduler.java

I believe that a similar (though a bit more sophisticated) primitive was implemented
in `java.util.concurrent.SubmissionPublisher`.

Do you think they fit into the idea described above? Could there be some common
mechanics extracted? Is there any potentially reusable component? Are there any
other contexts in which this can be used?

Thanks,
-Pavel

[^1]: `SequentialScheduler` evolved from a much simpler `CooperativeHandler` http://hg.openjdk.java.net/jdk9/sandbox/jdk/file/7c7a3c48196e/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/CooperativeHandler.java
adding asynchronous tasks and an underlying executor.

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest


--
Best regards,
David Karnok
_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: Synchronization primitives in Reactive Streams implementations

JSR166 Concurrency mailing list
> On 2 Oct 2018, at 00:35, Carl Mastrangelo <[hidden email]> wrote:
>
> I have a similar Executor to your dedicated example.  It's called SerializingExecutor and does the same thing but with one fewer atomic operation.  

Thanks for the link. On an unrelated note, I've looked through the repository
and now have a question. Are these related?

  https://github.com/grpc/grpc-java/blob/60a0b0c471d720b0546ee3a5b4fa4283635dfbcf/core/src/main/java/io/grpc/internal/SerializingExecutor.java

and

  https://github.com/google/guava/blob/0cd4e9faa1360da4a343f84cb275d6eda0c5e732/android/guava/src/com/google/common/util/concurrent/SequentialExecutor.java       

> I don't think it is easily extractable because some decisions cannot be made automatically:

What do you mean by "making decisions automatically"? I hope we could make these
decisions configurable.

> - What should happen on an exception?  Does the queue stop draining?  Should it power through it?  If it stops, should the next execute() call keep going?

How many practical strategies of handling execution errors in Executors are
there? If there are only a couple of strategies, we can enumerate them,
otherwise provide an extension mechanism (patterns Strategy, Template method,
etc.)?

> - Can a serializing executor wrap another serializing executor?  The "serial" ness of this gets tricky fast with multiple, because the order of executions can change.

Could you maybe provide an example where the order of executions changes?

> - CLQ is MPMC, but it could feasibly be optimized to MPSC, since there is only one thread draining the queue.  (and in special circumstances, be SPSC).  

Right. One thread at a time. The said optimizations in queueing are possible,
sure. I'd be happy to discuss them.

> - For single thread scenarios, serial execution can be done without synchronization.   Its possible to avoid reentrancy more cheaply, especially when wrapping a direct executor.

I don't see why this can't be configurable.

>  It's unclear what the commonality is between the the properly synchronized and unsynchronized one is.

Do you mean the single-threaded case? If so, then more likely "it's unclear what
the difference is."

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: Synchronization primitives in Reactive Streams implementations

JSR166 Concurrency mailing list
In reply to this post by JSR166 Concurrency mailing list
Hi Justin,

I've skimmed through the `Monitor` javadoc here

  https://google.github.io/guava/releases/snapshot-jre/api/docs/com/google/common/util/concurrent/Monitor.html

It looks like it's a neat tool. Unfortunately, I haven't had a chance to use it.
I'd be interested in looking at `GuardedExecutor` too. Please keep us posted.

Thanks,
-Pavel

> On 1 Oct 2018, at 21:06, Justin Sampson <[hidden email]> wrote:
>
> Interesting to see this discussion of a MutexExecutor concept, because
> I've implemented something called GuardedExecutor that I'm planning to
> share soon. I'll put it up in its own GitHub repo at some point, but for
> now I've posted the API as a Guava feature request (since I see it as
> the natural evolution of Guava's Monitor, which I wrote several years
> ago):
>
> https://github.com/google/guava/issues/3265
>
> Once the code is posted I will start a new thread here to drum up
> interest. Just wanted to share the idea on this thread in case anyone's
> exploring similar use cases.
>
> Cheers,
> Justin

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: Synchronization primitives in Reactive Streams implementations

JSR166 Concurrency mailing list
In reply to this post by JSR166 Concurrency mailing list
Responses inline
On Tue, Oct 2, 2018 at 11:43 AM Pavel Rappo <[hidden email]> wrote:
> On 2 Oct 2018, at 00:35, Carl Mastrangelo <[hidden email]> wrote:
>
> I have a similar Executor to your dedicated example.  It's called SerializingExecutor and does the same thing but with one fewer atomic operation.   

Thanks for the link. On an unrelated note, I've looked through the repository
and now have a question. Are these related?

  https://github.com/grpc/grpc-java/blob/60a0b0c471d720b0546ee3a5b4fa4283635dfbcf/core/src/main/java/io/grpc/internal/SerializingExecutor.java

and

  https://github.com/google/guava/blob/0cd4e9faa1360da4a343f84cb275d6eda0c5e732/android/guava/src/com/google/common/util/concurrent/SequentialExecutor.java     

> I don't think it is easily extractable because some decisions cannot be made automatically:

What do you mean by "making decisions automatically"? I hope we could make these
decisions configurable.

> - What should happen on an exception?  Does the queue stop draining?  Should it power through it?  If it stops, should the next execute() call keep going?

How many practical strategies of handling execution errors in Executors are
there? If there are only a couple of strategies, we can enumerate them,
otherwise provide an extension mechanism (patterns Strategy, Template method,
etc.)?

One of the cases that came up is what to do for interruption?  If the answer is reset the interrupted bit and throw IE, the the executor has to declare it in the the throws.  If it keeps going, any blocking operations in the runnables are going to fail, cause all submitted tasks that block to exit early.  (Which is weird).  SerializingExceutor is not thread oriented.    Additionally, does it bubble up to the Thread's uncaught exception handler?

 

> - Can a serializing executor wrap another serializing executor?  The "serial" ness of this gets tricky fast with multiple, because the order of executions can change.

Could you maybe provide an example where the order of executions changes?


Sure.  Suppose as an optimization, you wanted to avoid allocation SerializingExecutors if one wraps another.  I.e. SE(SE(E)), where E is the innermost.  There is not a reason to allocate a new outermost SE because the inner one guarantees the sequential ordering.  Alternatively, If you assumed wrapping one SE in another SE returned a new one, using them in separate places may result in accidental serial execution even if you did not want it.   This came up when Guava's sequentialExecutor() method was made public.  The optimization (and any possibly confusion) was lost since MoreExecutors.newSequentialExecutor() always returns a new instance.   
 

> - CLQ is MPMC, but it could feasibly be optimized to MPSC, since there is only one thread draining the queue.  (and in special circumstances, be SPSC).   

Right. One thread at a time. The said optimizations in queueing are possible,
sure. I'd be happy to discuss them.

To wrap up the queue for external usage, you have to trust the caller knows how many readers and writers there are.   That would make the API uglier, for the possibility of having a faster queue.
 

> - For single thread scenarios, serial execution can be done without synchronization.   Its possible to avoid reentrancy more cheaply, especially when wrapping a direct executor.

I don't see why this can't be configurable.

You're right.  The issue is that with so many configuration options, it doesn't really feel like there is much reuse, which was the original point.  Also, too many options would be bad API design.
 

>  It's unclear what the commonality is between the the properly synchronized and unsynchronized one is.

Do you mean the single-threaded case? If so, then more likely "it's unclear what
the difference is."



 

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: Synchronization primitives in Reactive Streams implementations

JSR166 Concurrency mailing list
In reply to this post by JSR166 Concurrency mailing list


On Tue., 2 Oct. 2018, 01:48 Pavel Rappo via Concurrency-interest, <[hidden email]> wrote:


> On 28 Sep 2018, at 15:25, Dávid Karnok <[hidden email]> wrote:
>

> We call this trampolining or queue-drain in ReactiveX and the concept was already invented by the time we switched to such lock-free coordination.

Wow! Did you recognize the idea from the description I gave or from those
implementations I referred to? Anyhow, it's good to learn one more name of this
idea. For the sake of tracing the origins and discovering more use cases could
you maybe remember how exactly you learned about this concept? You said it had
been already invented by the time you switched to it in ReactiveX? Did you see
it somewhere?

While all the executors mentioned are trampolining executors, trampolining itself is an orthogonal concept to the mutually exclusive serialisation of executed tasks - trampolining is a technique used to solve infinite recursion problems, implemented by unrolling the stack and then bouncing back to the callback. Not every trampolining executor implements mutually exclusive execution of tasks, for example, you can implement a trampoline using a thread local to hold the task queue, this achieves trampolining (unrolling the stack to prevent infinite recursions), but does not achieve mutual exclusion. One reason executors used by Reactive Streams tend to be trampolining is that the Reactive Streams spec requires implementations to prevent infinite recursion, so trampolines are useful for that.

This 2009 blog post by a former colleague talks about trampolines:


It references an earlier discussion of trampolines in Scala 2.8, but that link is dead.


> There are two typical implementations.

And more flavours thereof. Having or not having an executor is only one of the
choices an implementor has to make[^1]. The nature of the task, the way the
backlog is represented, reentrancy, fairness and error handling -- all these are
important. The result is that a particular implementation may look quite
different from the examples you've provided. However, no matter which
implementation we pick, the core stays the same. An executor, a cumulating
backlog, and a state machine with at least 3 states[^2].

> The reason we don't have one abstraction for these in general is the inlining of other features/behavior of reactive operators while making them more concise and performant.

Maybe you could provide a couple of examples showing that abstracting out the
common bits would singnificantly hinder performance or conciseness? The reason
I'm asking is that I find discovering this idea is mentally difficult and
implementing it is error-prone. I still hope we can do something about it.



[^1]: Though once again, a "calling thread executor" may help with the
"always use an executor" choice.

[^2]: For example, we can't get away with switching to `AtomicBoolean wip` in
the examples you've provided, as this way we will risk missing tasks added to
the backlog. By the way, does "wip" stand for "Work In Progress"?

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: Synchronization primitives in Reactive Streams implementations

JSR166 Concurrency mailing list
In reply to this post by JSR166 Concurrency mailing list


On Mon, Oct 1, 2018 at 6:12 PM Pavel Rappo <[hidden email]> wrote:
> On 30 Sep 2018, at 23:30, Viktor Klang <[hidden email]> wrote:
>
> Come to think about it, besides the MutexExecutor, I've also got this and that.
>

Oh my! It will take me some time to read Scala code. Sorry about that.
Meanwhile, maybe you could answer some questions I have on `MutexExecutor`?

1. I don't know whether this question should go to you or James Roper. The way
`MutexExecutor`[^1] is used in microprofile-reactive-streams-lightbend[^2]
hints that we could probably use a simpler specialization of the `MutexExecutor`
leveraging the fact that the task is always the same. Maybe only for the sake of
generating less garbage?

We do that in Akka—non-blocking "resubmission" of cached "Runnables".
 

2. Looking at line 63. Was there a particular reason you used busy-waiting
instead of, say, reusing `ConcurrentLinkedQueue` or making threads help each
other?

63 is to close the gap between 29 and 31.
 

3. I have little experience with the VarHandle API, but I have a question :-)
Would it broke the code if we switched to `getAcquire()` instead of `get()`,
line 63? The corresponding operation is `lazySet`, or VarHandle `setRelease`,
line 31.

At first glance, yes that should be possible. Not sure there'll be any material gain though. Others: please chime in.
 



[^1]: https://github.com/lightbend/microprofile-reactive-streams/blob/master/zerodep/src/main/java/com/lightbend/microprofile/reactive/streams/zerodep/MutexExecutor.java
[^2]: https://github.com/lightbend/microprofile-reactive-streams/blob/master/zerodep/src/main/java/com/lightbend/microprofile/reactive/streams/zerodep/BuiltGraph.java#L108


--
Cheers,

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: Synchronization primitives in Reactive Streams implementations

JSR166 Concurrency mailing list
> On 3 Oct 2018, at 08:11, Viktor Klang <[hidden email]> wrote:
>
> We do that in Akka—non-blocking "resubmission" of cached "Runnables".

Could you please send me a link to the corresponding code?

When the task is the same, not only does it make recreating this task
unnecessary, it also eliminates the need for the task queue. For example,
consider we were to implement a `Publisher<ByteBuffer>` of data read from a
socket. This publisher would publish data only if there are both unfulfilled
demand and data available. The check-and-publish task could then be executed
every time either the demand increases, or the socket becomes readable. If all
satisfied the task arranges for `SocketChannel.read` and `Subscriber.onNext` to
be called.

Since I started from this point of view I didn't see this primitive as an
executor, but rather a consumer of pending signals. No doubt the sequential
executor can be implemented on top of this primitive and vice-versa.

> 63 is to close the gap between 29 and 31.

Pardon, I might not have been clear. I understand the purpose of this particular
busy-waiting. What I was intending to ask is: why did you chose this approach
over others? I might seem irrational here, but I don't feel comfortable when I
see busy-waiting[^1]. So anytime it appears on the horizon, I try to think of
some other way of achieving the goal.

For example, one option could be to make threads finish updates for each other.
To achieve this the order of updates needs to be changed. First the "next" link
is updated, then the `last`. The downside is increased complexity. I can try to
sketch something if you are interested.

> Others: please chime in.

Yes, please! I'd like to understand the difference between visibility guarantees
for `lazySet` -> `get` and `setRelease` -> `getAcquire`. For some reason this
document[^2] hasn't helped much.



[^1]: I realize that it may seem overkill to protect against a failure happening
between lines 29 and 31 that renders some other thread spinning forever. I also
realize that extra strain this puts on the CPU will probably be negligible in
most cases.
[^2]: http://gee.cs.oswego.edu/dl/html/j9mm.html

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Fwd: Synchronization primitives in Reactive Streams implementations

JSR166 Concurrency mailing list
(Replying to list this time :-))


Pavel,

On Thu, Oct 4, 2018 at 12:38 PM Pavel Rappo <[hidden email]> wrote:
> On 3 Oct 2018, at 08:11, Viktor Klang <[hidden email]> wrote:
>
> We do that in Akka—non-blocking "resubmission" of cached "Runnables".

Could you please send me a link to the corresponding code?

Yeah, I could probably see if I can link direct to it, however I'm not sure it is applicable here since it deals with submissions to *any* Executor, no matter if they are multithreaded or not.
It's just a mechanism for being able to resubmit (conditionally of course) Runnables if they have not yet been scheduled (or if they are currently running).
 

When the task is the same, not only does it make recreating this task
unnecessary, it also eliminates the need for the task queue. For example,
consider we were to implement a `Publisher<ByteBuffer>` of data read from a
socket. This publisher would publish data only if there are both unfulfilled
demand and data available. The check-and-publish task could then be executed
every time either the demand increases, or the socket becomes readable. If all
satisfied the task arranges for `SocketChannel.read` and `Subscriber.onNext` to
be called.

You also need to deal with fairness.
 

Since I started from this point of view I didn't see this primitive as an
executor, but rather a consumer of pending signals. No doubt the sequential
executor can be implemented on top of this primitive and vice-versa.

> 63 is to close the gap between 29 and 31.

Pardon, I might not have been clear. I understand the purpose of this particular
busy-waiting. What I was intending to ask is: why did you chose this approach
over others?

It's the standard way of dealing with that situation. The other alternative is to exit even if you know that there is a pending write, but you'll need to recheck later anyway, and the only way to know when that is is either by implementing Monitors/wait-lists and then do the park/unpark dance, which is definitely going to be more expensive in the general case for a queue.

Also, remember that the "missed" write (or rather the busy-spin) will only occur when the reader has caught up with the writers.
 
I might seem irrational here, but I don't feel comfortable when I
see busy-waiting[^1]. So anytime it appears on the horizon, I try to think of
some other way of achieving the goal.

Sure, I'm not a huge fan of busy-waiting either, but for all intents and purposes, a naïve implementation of something like incrementAndGet/getAndIncrement could potentially be busy-waiting forever (assuming some form of unfair scheduling). Same for many CAS+retry constructs.
 

For example, one option could be to make threads finish updates for each other.
To achieve this the order of updates needs to be changed. First the "next" link
is updated, then the `last`. The downside is increased complexity. I can try to
sketch something if you are interested.

I haven't found it to be a problem, but if you have any measurement that it is, I'd be more than happy to have a look.
 

> Others: please chime in.

Yes, please! I'd like to understand the difference between visibility guarantees
for `lazySet` -> `get` and `setRelease` -> `getAcquire`. For some reason this
document[^2] hasn't helped much.



[^1]: I realize that it may seem overkill to protect against a failure happening
between lines 29 and 31 that renders some other thread spinning forever. I also
realize that extra strain this puts on the CPU will probably be negligible in
most cases.
[^2]: http://gee.cs.oswego.edu/dl/html/j9mm.html



--
Cheers,
--
Cheers,

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: Synchronization primitives in Reactive Streams implementations

JSR166 Concurrency mailing list
In reply to this post by JSR166 Concurrency mailing list
On 10/04/2018 06:38 AM, Pavel Rappo via Concurrency-interest wrote:

> Yes, please! I'd like to understand the difference between visibility guarantees
> for `lazySet` -> `get` and `setRelease` -> `getAcquire`. For some reason this
> document[^2] hasn't helped much.
> [^2]: http://gee.cs.oswego.edu/dl/html/j9mm.html

In general setRelease must be matched with getAcquire for anything
producer-consumer-like that doesn't include Dekker-like scenarios
encountered with blocking synchronization (in which case you generally
need {get,set,cas}Volatile). It is a little confusing here because the
default "get" for AtomicReference is equivalent to VarHandle getVolatile
(coping the best we could with evolving language/APIs). Also, whenever a
read will be validated with a CAS, you could weaken to
AtomicReference.getPlain === VarHandle.get. And the name lazySet is
being retired in part because the name is so bad (use setRelease). Maybe
we should denigrate (deprecate not-for-removal)

While I'm at it: I've been staying out of discussion so far about added
j.u.c support for reactive streams applications, to see where it heads.
One underlying issue seems to be whether any of the component
functionality of SubmissionPublisher should/could be made available in
streamlined form. (BTW, note that it can works with an
Executors.newSingleThreadExecutor.)

Also, as a secondary aside. I like Justin's Guava GuardedExecutor, but
I'm undecided whether it or something like it should be introduced in
j.u.c.

-Doug

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: Synchronization primitives in Reactive Streams implementations

JSR166 Concurrency mailing list
In reply to this post by JSR166 Concurrency mailing list
> On 3 Oct 2018, at 06:23, James Roper <[hidden email]> wrote:
>
>
> While all the executors mentioned are trampolining executors, trampolining itself is an orthogonal concept to the mutually exclusive serialisation of executed tasks - trampolining is a technique used to solve infinite recursion problems, implemented by unrolling the stack and then bouncing back to the callback. Not every trampolining executor implements mutually exclusive execution of tasks, for example, you can implement a trampoline using a thread local to hold the task queue, this achieves trampolining (unrolling the stack to prevent infinite recursions), but does not achieve mutual exclusion. One reason executors used by Reactive Streams tend to be trampolining is that the Reactive Streams spec requires implementations to prevent infinite recursion, so trampolines are useful for that.
>
> This 2009 blog post by a former colleague talks about trampolines:
>
> http://blog.richdougherty.com/2009/04/tail-calls-tailrec-and-trampolines.html?m=1
>
> It references an earlier discussion of trampolines in Scala 2.8, but that link is dead.

Thanks for the important clarification and the links. Thankfully, not all dead
links have gone forever:

  https://web.archive.org/web/20090812090720/http://www.nabble.com:80/-scala--Manifest,-implicits-and-apply-for-DSL-td22381717.html

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: Synchronization primitives in Reactive Streams implementations

JSR166 Concurrency mailing list
In reply to this post by JSR166 Concurrency mailing list
> On 3 Oct 2018, at 01:12, Carl Mastrangelo <[hidden email]> wrote:
>
> One of the cases that came up is what to do for interruption?  If the answer is reset the interrupted bit and throw IE, the the executor has to declare it in the the throws.  If it keeps going, any blocking operations in the runnables are going to fail, cause all submitted tasks that block to exit early.  (Which is weird).  SerializingExceutor is not thread oriented.    Additionally, does it bubble up to the Thread's uncaught exception handler?

This question should be answered in the course of implementation. I must say
that it's getting more difficult for me to answer questions like this one
without writing the corresponding code.
 
> Could you maybe provide an example where the order of executions changes?
>
>
> Sure.  Suppose as an optimization, you wanted to avoid allocation SerializingExecutors if one wraps another.  I.e. SE(SE(E)), where E is the innermost.  There is not a reason to allocate a new outermost SE because the inner one guarantees the sequential ordering.  Alternatively, If you assumed wrapping one SE in another SE returned a new one, using them in separate places may result in accidental serial execution even if you did not want it.   This came up when Guava's sequentialExecutor() method was made public.  The optimization (and any possibly confusion) was lost since MoreExecutors.newSequentialExecutor() always returns a new instance.

I'm not following. Whether a factory method creates a new executor or reuses an
existing one shouldn't be a problem, if an end result doesn't violate the
properties of the said executor. What do you mean by "accidental serial
execution"? From the point of view of a user, once they have an instance of an
executor that executes their tasks sequentially, they should not care whether or
not this executor is shared.

Please provide relevant links to "This came up when Guava's sequentialExecutor()
method was made public" discussions, so we could better understand the problem.
I've only found the revisions where the method was introduced[^1] and then
renamed[^2]. Unfortunately, those changes do not tell the whole story.

> To wrap up the queue for external usage, you have to trust the caller knows how many readers and writers there are.   That would make the API uglier, for the possibility of having a faster queue.

As you've said, the queue can be drained by (at most) one thread at a time.
Hence, there doesn't seem to be the need for a "Multiple Producers - Multiple
Consumers" queue. Again, we could start with something and iterate, gradually
answering these questions.



[^1]: https://github.com/google/guava/commit/89fb0654edc987741441c5329fee606a4a2a9224#diff-7a493427c77df959295f1d55574dbd50
[^2]: https://github.com/google/guava/commit/1c760101a7fcd8798eb0c6ad3c277ff190bcb22f#diff-7a493427c77df959295f1d55574dbd50

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: Synchronization primitives in Reactive Streams implementations

JSR166 Concurrency mailing list
In reply to this post by JSR166 Concurrency mailing list
> On 4 Oct 2018, at 18:14, Doug Lea via Concurrency-interest <[hidden email]> wrote:
>
> (BTW, note that it can works with an
> Executors.newSingleThreadExecutor.)

Yes, the executor returned by the `Executors.newSingleThreadExecutor` method
provides the same _service_ for the user as does the `SequentialExecutor` (or
similar). It's the _implementation_ that might not be satisfactory for some
cases. I think resource-wise this has a lot in common with the problem of
effective mapping of M lightweight threads to N threads.

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest