Upcoming jdk9 j.u.c JEP

classic Classic list List threaded Threaded
22 messages Options
12
Reply | Threaded
Open this post in threaded view
|

Upcoming jdk9 j.u.c JEP

Doug Lea
Thanks to people who have tried out the proposed updates for jdk9
(modest, compared to previous releases) that have been in place since
January -- a few enhancements to CompletableFuture, added class
SubmissionPublisher and its interface-holder class Flow, plus misc
minor updates. If you haven't, try them out from the usual places:

     API specs: http://gee.cs.oswego.edu/dl/jsr166/dist/docs/
     jar file: http://gee.cs.oswego.edu/dl/jsr166/dist/jsr166.jar (compiled
using Java8 javac).
     Browsable CVS sources:
http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/src/main/java/util/

(As usual, the easiest way is to run with -Xbootclasspath/p:jsr166.jar
but this will almost surely change under upcoming jdk9 module support.)

We've been also working to improve async performance more generally
inside implementations. Things seem to be stabilizing without
controversy. (There has been continuing discussion of Flow interfaces
at http://www.reactive-streams.org/ but nothing that has impacted
these APIs.)  So it's time to actually propose them for jdk9 as an
openjdk JEP.  Which I plan to do within a few days. If you have any
comments or concerns, please let me know!

Some notes:

* Reactive-stream users may be disappointed that we do not include any
net/IO-based Flow.Publisher/Subscriber classes, considering that
reactive-streams are mainly motivated by net-based frameworks.  The
reasons for triaging these out are that (1) IO generally falls outside
of java.util.concurrent (2) Most net-based frameworks seem to use
custom data representation etc (e.g., JSON) that are even further out
of scope.  However class SubmissionPublisher can be used as an adaptor
to turn just about any kind of source into a Publisher, so provides a
nearly universal way of constructing a good non-custom Publisher even
from IO-based sources.  (Also notice that SubmissionPublisher can
serve as the basis of other actor-like frameworks, including those
turning off back-pressure by calling
subscription.request(Long.MAX_VALUE) in onSubscribe).

* If you are trying out updates with jdk9 early access,
(https://jdk9.java.net/download/) notice that as of b73 last week,
it by default uses the G1 collector, which is (currently,
but hopefully just transiently) not usually the best choice for
high-throughput concurrency. You probably want to use switches
  -XX:+UseParallelGC -XX:+UseCondCardMark -XX:-UseBiasedLocking

* The implementation updates (some not yet committed) focus on
improvements in async contexts (CompletableFutures,
SubmissionPublishers, plus other older components).  Symmetric async
(where agents may be both producers and consumers) performance is
inherently more variable than other cases. ForkJoinPool in particular
handles now these better. Updates include (1) pool.submit and
task.fork now have identical semantics; (2) "Async" FJP mode is now
probabilistically fair; (3) Spare threads cannot accumulate; and (4)
Ramp-down is faster on large systems.  (In fact, of the oddities under
abuse that Ed Harned enjoys identifying, I think the only remaining
one is that we still take a global lock to fill in stack traces for
cross-thread exceptions, which can slow things to a crawl if you have
tons of them, but is still worthwhile in helping developer track down
most problems.)

* It is likely that some implementations (and possibly even new
classes) will be impacted by VarHandles-related work (mainly by Paul
Sandoz), that replaced the syntactically-nicer but too-problematic
"Enhanced Volatiles" proposal (still the same JEP
http://openjdk.java.net/jeps/193).

-Doug

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: Upcoming jdk9 j.u.c JEP

Dávid Karnok
Hello,

I've read through the code and javadoc and I have the following comments:

Flow.java
-------

Line 70: the code schedules the exception with executor.execute(() -> subscriber.onError(ex)); but the subscriber can't cancel this action. For example, a subscriber calling request(-1) and then cancel() will inevitably receive the exception. I understand the RS spec allows for such late-coming events, but since this is an example, it should convey the right practices: one should always get the future of a scheduled action.

Line 98: the text reads "Subscriber maintains multiple Subscriptions" which I don't see how would look like and I think the RS spec at least implies this shouldn't happen.

Line 114: the count variable is set after the request is issued so a synchronous onNext call will request bufferSize - bufferSize / 2 immediately, having a total of ~1.5 * bufferSize outstanding requests. In addition, if the generator emits asynchronous onNext events, a stall just before the count assignment may have the same effect, the onNext may experience a torn-read of count or onSubscribe and onNext may destroy each others count value. I suggest assigning count before the initial request.

Line 314: the same issue is present: count is assigned after the request is issued.

Line 325: I think, the subscription needs to be cancelled, otherwise the Consumer keeps receiving more elements, which may yield undefined behavior with most subscribers.

Line 361: The consume() method returns a CompletableFuture whose cancel does nothing and there is no way to cancel the consumption unless the consumer throws an exception (which also doesn't work now due to 325). I can see implementing the cancellation is difficult due to the potential asynchrony involved, so at least the javadoc should mention the problem with cancel().

Line 416: the text reads "Preliminary release note: Currently, this method collects all items before executing the stream computation." Correct me if I'm wrong, but since Stream is pull based, the stream() method would need to inject concurrency into the flow: the push end fills a queue and the pull end takes (blockingly) from the queue from some other thread.


SubmissionPublisher.java
---------

Line 69: "to share schedulers" not sure about "schedulers" here, because the Flow terminology uses Executors otherwise.

Line 222: From experience with RxJava, holding a lock while emitting events should be avoided, especially when blocking actions are nearby.

Line 230: In RxJava, if the Subject is terminated with an exception, late Subscribers will receive the exception but here, they just get completed. Maybe it is worth considering this exception-replaying behavior.

Line 623: getSubscribers() why would anyone be interested in the actual subscribers? Calling methods on them shouldn't be allowed anyway and I don't see any other reason to expose them.




2015-07-23 16:23 GMT+02:00 Doug Lea <[hidden email]>:
Thanks to people who have tried out the proposed updates for jdk9
(modest, compared to previous releases) that have been in place since
January -- a few enhancements to CompletableFuture, added class
SubmissionPublisher and its interface-holder class Flow, plus misc
minor updates. If you haven't, try them out from the usual places:

    API specs: http://gee.cs.oswego.edu/dl/jsr166/dist/docs/
    jar file: http://gee.cs.oswego.edu/dl/jsr166/dist/jsr166.jar (compiled using Java8 javac).
    Browsable CVS sources: http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/src/main/java/util/

(As usual, the easiest way is to run with -Xbootclasspath/p:jsr166.jar
but this will almost surely change under upcoming jdk9 module support.)

We've been also working to improve async performance more generally
inside implementations. Things seem to be stabilizing without
controversy. (There has been continuing discussion of Flow interfaces
at http://www.reactive-streams.org/ but nothing that has impacted
these APIs.)  So it's time to actually propose them for jdk9 as an
openjdk JEP.  Which I plan to do within a few days. If you have any
comments or concerns, please let me know!

Some notes:

* Reactive-stream users may be disappointed that we do not include any
net/IO-based Flow.Publisher/Subscriber classes, considering that
reactive-streams are mainly motivated by net-based frameworks.  The
reasons for triaging these out are that (1) IO generally falls outside
of java.util.concurrent (2) Most net-based frameworks seem to use
custom data representation etc (e.g., JSON) that are even further out
of scope.  However class SubmissionPublisher can be used as an adaptor
to turn just about any kind of source into a Publisher, so provides a
nearly universal way of constructing a good non-custom Publisher even
from IO-based sources.  (Also notice that SubmissionPublisher can
serve as the basis of other actor-like frameworks, including those
turning off back-pressure by calling
subscription.request(Long.MAX_VALUE) in onSubscribe).

* If you are trying out updates with jdk9 early access,
(https://jdk9.java.net/download/) notice that as of b73 last week,
it by default uses the G1 collector, which is (currently,
but hopefully just transiently) not usually the best choice for
high-throughput concurrency. You probably want to use switches
 -XX:+UseParallelGC -XX:+UseCondCardMark -XX:-UseBiasedLocking

* The implementation updates (some not yet committed) focus on
improvements in async contexts (CompletableFutures,
SubmissionPublishers, plus other older components).  Symmetric async
(where agents may be both producers and consumers) performance is
inherently more variable than other cases. ForkJoinPool in particular
handles now these better. Updates include (1) pool.submit and
task.fork now have identical semantics; (2) "Async" FJP mode is now
probabilistically fair; (3) Spare threads cannot accumulate; and (4)
Ramp-down is faster on large systems.  (In fact, of the oddities under
abuse that Ed Harned enjoys identifying, I think the only remaining
one is that we still take a global lock to fill in stack traces for
cross-thread exceptions, which can slow things to a crawl if you have
tons of them, but is still worthwhile in helping developer track down
most problems.)

* It is likely that some implementations (and possibly even new
classes) will be impacted by VarHandles-related work (mainly by Paul
Sandoz), that replaced the syntactically-nicer but too-problematic
"Enhanced Volatiles" proposal (still the same JEP
http://openjdk.java.net/jeps/193).

-Doug

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest



--
Best regards,
David Karnok

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: Upcoming jdk9 j.u.c JEP

Greg Wilkins
In reply to this post by Doug Lea


On 24 July 2015 at 00:23, Doug Lea <[hidden email]> wrote:

* Reactive-stream users may be disappointed that we do not include any
net/IO-based Flow.Publisher/Subscriber classes, considering that
reactive-streams are mainly motivated by net-based frameworks.  The
reasons for triaging these out are that (1) IO generally falls outside
of java.util.concurrent (2) Most net-based frameworks seem to use
custom data representation etc (e.g., JSON) that are even further out
of scope.  However class SubmissionPublisher can be used as an adaptor
to turn just about any kind of source into a Publisher, so provides a
nearly universal way of constructing a good non-custom Publisher even
from IO-based sources.  (Also notice that SubmissionPublisher can
serve as the basis of other actor-like frameworks, including those
turning off back-pressure by calling
subscription.request(Long.MAX_VALUE) in onSubscribe).



Doug et al,

The Jetty project has been experimenting with the reactive streams API: https://github.com/jetty-project/jetty-reactive albiet not with the JDK-9 version of it, but inspired by the proposed inclusion of it.

We very much like the API and what it can bring to our space.  We don't see that it needs direct IO support and that it's power is actually bridging domains with a good asynchronous model that supports flow control.  

We've also begun some preliminary discussions about developing RS based proposal for the Servlet 4.0 specification.    Currently the Servlet API does well support asynchronous IO and behaviour, but the API is deceptively difficult to use correctly and gives no support for back pressure.   With RS's we can envisage solutions that look like:
  • A database provides a RS Producer that provides the large results of a query asynchronously from a remote database server
  • Some business logic is encapsulated as a RS Processor subscribed to the database producer
  • Some framework provided  Porocessors subscribe to the business logic Processor to perform a chain of functions such as serialization, compression
  • A container provided Subscriber terminates the chain and sends the resulting byte out over HTTP/HTTP2 or Websocket.   The flow control mechanisms of these protocols would be the basis of the RS back pressure. 
In such solutions, a full HTTP/2 flow control window would result in back pressure on the remote database server, allowing threadless waiting without unlimited queuing of data.

However, we have a significant concern with the API in that we do not like it's error handling design.  Specifically that it is asymmetric and an error in the middle of a chain of processors can be propagated downstream with onError(Throwable) but can only be propagated upstream with cancel().

We believe that cancel without reason is an insufficient semantic to build a robust ecosystem of RS Processors that can be used to build applications.   Consider the above example, it would be ideal if the object serialization was handled by a 3rd party Processor (let's say JSONEncodingProcessor). If the business logic erroneously sent an non-jsonable object, or if the JSON converter was incorrectly configured then the JSONEcondiingProcessor could encounter an error during its onNext(Object item) handling and it's only permitted handling of that is to cancel the stream, without explanation.

I have raised this as an issue on the RS github and it the current recommendation is to log and cancel: https://github.com/reactive-streams/reactive-streams-jvm/issues/271#issuecomment-121974544   However I believe that log and cancel is a insufficient semantic.   Logging in assembled applications is often fraught as each component provider will fight over which logging framework is best.  RS chains may cross jurisdictional boundaries and logs may not even be readily available.

The solution we see is to replace/augment cancel() with either cancel(Throwable reason) or an upstream onError(Throwable reason).  I acknowledge that the passed reason may not always be meaningful to the upstream processors and publishers, but it is better to ignore a meaningless reason than to be ignorant of a meaningful one.

When considering this API, we have to look beyond usages that work well and consider usages that will fail well also!

cheers

--
Greg Wilkins <[hidden email]>


_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: Upcoming jdk9 j.u.c JEP

Remi Forax
In reply to this post by Doug Lea
Hi Doug,
I've again try to understand how the Flow API can be safe and i still do
not understand.

Let suppose i want to use the OneShotPublisher with the SampleSubscriber
described in the overview of the class Flow.
If you take a look to the fields of the class SampleSubscriber:
- count is written in onSubscribe which is called by the thread that
call subscribe() but read in onNext() which is called by a thread of the
executor service.
- subscription has the same issue, it is initialized by onSubscribe()
and read by onNext().
Maybe it's working because ExecutorService.submit() works like
Thread.start() but as far as i know, the javadoc of sumit() doesn't
specify that.

Moreover, the API expose an issue that are not visible in the current
snippets of code,
onError() can be called either by the thread that have called
subscribe() and the thread that call onNext(), so basically, onError()
only works if the code of onError() has no side effect, so
ex.printStackTrace is fine but anything but logging is not.

I'm sure i'm wrong and there is something subtle that my poor mind
doesn't understand.

Rémi

On 07/23/2015 04:23 PM, Doug Lea wrote:

> Thanks to people who have tried out the proposed updates for jdk9
> (modest, compared to previous releases) that have been in place since
> January -- a few enhancements to CompletableFuture, added class
> SubmissionPublisher and its interface-holder class Flow, plus misc
> minor updates. If you haven't, try them out from the usual places:
>
>     API specs: http://gee.cs.oswego.edu/dl/jsr166/dist/docs/
>     jar file: http://gee.cs.oswego.edu/dl/jsr166/dist/jsr166.jar 
> (compiled using Java8 javac).
>     Browsable CVS sources:
> http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/src/main/java/util/
>
> (As usual, the easiest way is to run with -Xbootclasspath/p:jsr166.jar
> but this will almost surely change under upcoming jdk9 module support.)
>
> We've been also working to improve async performance more generally
> inside implementations. Things seem to be stabilizing without
> controversy. (There has been continuing discussion of Flow interfaces
> at http://www.reactive-streams.org/ but nothing that has impacted
> these APIs.)  So it's time to actually propose them for jdk9 as an
> openjdk JEP.  Which I plan to do within a few days. If you have any
> comments or concerns, please let me know!
>
> Some notes:
>
> * Reactive-stream users may be disappointed that we do not include any
> net/IO-based Flow.Publisher/Subscriber classes, considering that
> reactive-streams are mainly motivated by net-based frameworks. The
> reasons for triaging these out are that (1) IO generally falls outside
> of java.util.concurrent (2) Most net-based frameworks seem to use
> custom data representation etc (e.g., JSON) that are even further out
> of scope.  However class SubmissionPublisher can be used as an adaptor
> to turn just about any kind of source into a Publisher, so provides a
> nearly universal way of constructing a good non-custom Publisher even
> from IO-based sources.  (Also notice that SubmissionPublisher can
> serve as the basis of other actor-like frameworks, including those
> turning off back-pressure by calling
> subscription.request(Long.MAX_VALUE) in onSubscribe).
>
> * If you are trying out updates with jdk9 early access,
> (https://jdk9.java.net/download/) notice that as of b73 last week,
> it by default uses the G1 collector, which is (currently,
> but hopefully just transiently) not usually the best choice for
> high-throughput concurrency. You probably want to use switches
>  -XX:+UseParallelGC -XX:+UseCondCardMark -XX:-UseBiasedLocking
>
> * The implementation updates (some not yet committed) focus on
> improvements in async contexts (CompletableFutures,
> SubmissionPublishers, plus other older components).  Symmetric async
> (where agents may be both producers and consumers) performance is
> inherently more variable than other cases. ForkJoinPool in particular
> handles now these better. Updates include (1) pool.submit and
> task.fork now have identical semantics; (2) "Async" FJP mode is now
> probabilistically fair; (3) Spare threads cannot accumulate; and (4)
> Ramp-down is faster on large systems.  (In fact, of the oddities under
> abuse that Ed Harned enjoys identifying, I think the only remaining
> one is that we still take a global lock to fill in stack traces for
> cross-thread exceptions, which can slow things to a crawl if you have
> tons of them, but is still worthwhile in helping developer track down
> most problems.)
>
> * It is likely that some implementations (and possibly even new
> classes) will be impacted by VarHandles-related work (mainly by Paul
> Sandoz), that replaced the syntactically-nicer but too-problematic
> "Enhanced Volatiles" proposal (still the same JEP
> http://openjdk.java.net/jeps/193).
>
> -Doug
>
> _______________________________________________
> Concurrency-interest mailing list
> [hidden email]
> http://cs.oswego.edu/mailman/listinfo/concurrency-interest

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: Upcoming jdk9 j.u.c JEP

David Holmes-6
Hi Remi,

Remi Forax writes:

> Hi Doug,
> I've again try to understand how the Flow API can be safe and i still do
> not understand.
>
> Let suppose i want to use the OneShotPublisher with the SampleSubscriber
> described in the overview of the class Flow.
> If you take a look to the fields of the class SampleSubscriber:
> - count is written in onSubscribe which is called by the thread that
> call subscribe() but read in onNext() which is called by a thread of the
> executor service.
> - subscription has the same issue, it is initialized by onSubscribe()
> and read by onNext().
> Maybe it's working because ExecutorService.submit() works like
> Thread.start() but as far as i know, the javadoc of sumit() doesn't
> specify that.

Memory Consistency Properties

"Actions in a thread prior to the submission of a Runnable to an Executor
happen-before its execution begins. Similarly for Callables submitted to an
ExecutorService."

http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/package-summar
y.html

Cheers,
David
-----

> Moreover, the API expose an issue that are not visible in the current
> snippets of code,
> onError() can be called either by the thread that have called
> subscribe() and the thread that call onNext(), so basically, onError()
> only works if the code of onError() has no side effect, so
> ex.printStackTrace is fine but anything but logging is not.
>
> I'm sure i'm wrong and there is something subtle that my poor mind
> doesn't understand.
>
> Rémi
>
> On 07/23/2015 04:23 PM, Doug Lea wrote:
> > Thanks to people who have tried out the proposed updates for jdk9
> > (modest, compared to previous releases) that have been in place since
> > January -- a few enhancements to CompletableFuture, added class
> > SubmissionPublisher and its interface-holder class Flow, plus misc
> > minor updates. If you haven't, try them out from the usual places:
> >
> >     API specs: http://gee.cs.oswego.edu/dl/jsr166/dist/docs/
> >     jar file: http://gee.cs.oswego.edu/dl/jsr166/dist/jsr166.jar
> > (compiled using Java8 javac).
> >     Browsable CVS sources:
> > http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/src/main/java/util/
> >
> > (As usual, the easiest way is to run with -Xbootclasspath/p:jsr166.jar
> > but this will almost surely change under upcoming jdk9 module support.)
> >
> > We've been also working to improve async performance more generally
> > inside implementations. Things seem to be stabilizing without
> > controversy. (There has been continuing discussion of Flow interfaces
> > at http://www.reactive-streams.org/ but nothing that has impacted
> > these APIs.)  So it's time to actually propose them for jdk9 as an
> > openjdk JEP.  Which I plan to do within a few days. If you have any
> > comments or concerns, please let me know!
> >
> > Some notes:
> >
> > * Reactive-stream users may be disappointed that we do not include any
> > net/IO-based Flow.Publisher/Subscriber classes, considering that
> > reactive-streams are mainly motivated by net-based frameworks. The
> > reasons for triaging these out are that (1) IO generally falls outside
> > of java.util.concurrent (2) Most net-based frameworks seem to use
> > custom data representation etc (e.g., JSON) that are even further out
> > of scope.  However class SubmissionPublisher can be used as an adaptor
> > to turn just about any kind of source into a Publisher, so provides a
> > nearly universal way of constructing a good non-custom Publisher even
> > from IO-based sources.  (Also notice that SubmissionPublisher can
> > serve as the basis of other actor-like frameworks, including those
> > turning off back-pressure by calling
> > subscription.request(Long.MAX_VALUE) in onSubscribe).
> >
> > * If you are trying out updates with jdk9 early access,
> > (https://jdk9.java.net/download/) notice that as of b73 last week,
> > it by default uses the G1 collector, which is (currently,
> > but hopefully just transiently) not usually the best choice for
> > high-throughput concurrency. You probably want to use switches
> >  -XX:+UseParallelGC -XX:+UseCondCardMark -XX:-UseBiasedLocking
> >
> > * The implementation updates (some not yet committed) focus on
> > improvements in async contexts (CompletableFutures,
> > SubmissionPublishers, plus other older components).  Symmetric async
> > (where agents may be both producers and consumers) performance is
> > inherently more variable than other cases. ForkJoinPool in particular
> > handles now these better. Updates include (1) pool.submit and
> > task.fork now have identical semantics; (2) "Async" FJP mode is now
> > probabilistically fair; (3) Spare threads cannot accumulate; and (4)
> > Ramp-down is faster on large systems.  (In fact, of the oddities under
> > abuse that Ed Harned enjoys identifying, I think the only remaining
> > one is that we still take a global lock to fill in stack traces for
> > cross-thread exceptions, which can slow things to a crawl if you have
> > tons of them, but is still worthwhile in helping developer track down
> > most problems.)
> >
> > * It is likely that some implementations (and possibly even new
> > classes) will be impacted by VarHandles-related work (mainly by Paul
> > Sandoz), that replaced the syntactically-nicer but too-problematic
> > "Enhanced Volatiles" proposal (still the same JEP
> > http://openjdk.java.net/jeps/193).
> >
> > -Doug
> >
> > _______________________________________________
> > Concurrency-interest mailing list
> > [hidden email]
> > http://cs.oswego.edu/mailman/listinfo/concurrency-interest
>
> _______________________________________________
> Concurrency-interest mailing list
> [hidden email]
> http://cs.oswego.edu/mailman/listinfo/concurrency-interest

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: Upcoming jdk9 j.u.c JEP

Vitaly Davidovich
In reply to this post by Remi Forax

ExecutorService interface javadoc mentions the memory effects, and notes that the submission process ensures that effects prior to submission are visible in the task itself.

sent from my phone

On Jul 23, 2015 8:36 PM, "Remi Forax" <[hidden email]> wrote:
Hi Doug,
I've again try to understand how the Flow API can be safe and i still do not understand.

Let suppose i want to use the OneShotPublisher with the SampleSubscriber described in the overview of the class Flow.
If you take a look to the fields of the class SampleSubscriber:
- count is written in onSubscribe which is called by the thread that call subscribe() but read in onNext() which is called by a thread of the executor service.
- subscription has the same issue, it is initialized by onSubscribe() and read by onNext().
Maybe it's working because ExecutorService.submit() works like Thread.start() but as far as i know, the javadoc of sumit() doesn't specify that.

Moreover, the API expose an issue that are not visible in the current snippets of code,
onError() can be called either by the thread that have called subscribe() and the thread that call onNext(), so basically, onError() only works if the code of onError() has no side effect, so ex.printStackTrace is fine but anything but logging is not.

I'm sure i'm wrong and there is something subtle that my poor mind doesn't understand.

Rémi

On 07/23/2015 04:23 PM, Doug Lea wrote:
Thanks to people who have tried out the proposed updates for jdk9
(modest, compared to previous releases) that have been in place since
January -- a few enhancements to CompletableFuture, added class
SubmissionPublisher and its interface-holder class Flow, plus misc
minor updates. If you haven't, try them out from the usual places:

    API specs: http://gee.cs.oswego.edu/dl/jsr166/dist/docs/
    jar file: http://gee.cs.oswego.edu/dl/jsr166/dist/jsr166.jar (compiled using Java8 javac).
    Browsable CVS sources: http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/src/main/java/util/

(As usual, the easiest way is to run with -Xbootclasspath/p:jsr166.jar
but this will almost surely change under upcoming jdk9 module support.)

We've been also working to improve async performance more generally
inside implementations. Things seem to be stabilizing without
controversy. (There has been continuing discussion of Flow interfaces
at http://www.reactive-streams.org/ but nothing that has impacted
these APIs.)  So it's time to actually propose them for jdk9 as an
openjdk JEP.  Which I plan to do within a few days. If you have any
comments or concerns, please let me know!

Some notes:

* Reactive-stream users may be disappointed that we do not include any
net/IO-based Flow.Publisher/Subscriber classes, considering that
reactive-streams are mainly motivated by net-based frameworks. The
reasons for triaging these out are that (1) IO generally falls outside
of java.util.concurrent (2) Most net-based frameworks seem to use
custom data representation etc (e.g., JSON) that are even further out
of scope.  However class SubmissionPublisher can be used as an adaptor
to turn just about any kind of source into a Publisher, so provides a
nearly universal way of constructing a good non-custom Publisher even
from IO-based sources.  (Also notice that SubmissionPublisher can
serve as the basis of other actor-like frameworks, including those
turning off back-pressure by calling
subscription.request(Long.MAX_VALUE) in onSubscribe).

* If you are trying out updates with jdk9 early access,
(https://jdk9.java.net/download/) notice that as of b73 last week,
it by default uses the G1 collector, which is (currently,
but hopefully just transiently) not usually the best choice for
high-throughput concurrency. You probably want to use switches
 -XX:+UseParallelGC -XX:+UseCondCardMark -XX:-UseBiasedLocking

* The implementation updates (some not yet committed) focus on
improvements in async contexts (CompletableFutures,
SubmissionPublishers, plus other older components).  Symmetric async
(where agents may be both producers and consumers) performance is
inherently more variable than other cases. ForkJoinPool in particular
handles now these better. Updates include (1) pool.submit and
task.fork now have identical semantics; (2) "Async" FJP mode is now
probabilistically fair; (3) Spare threads cannot accumulate; and (4)
Ramp-down is faster on large systems.  (In fact, of the oddities under
abuse that Ed Harned enjoys identifying, I think the only remaining
one is that we still take a global lock to fill in stack traces for
cross-thread exceptions, which can slow things to a crawl if you have
tons of them, but is still worthwhile in helping developer track down
most problems.)

* It is likely that some implementations (and possibly even new
classes) will be impacted by VarHandles-related work (mainly by Paul
Sandoz), that replaced the syntactically-nicer but too-problematic
"Enhanced Volatiles" proposal (still the same JEP
http://openjdk.java.net/jeps/193).

-Doug

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: Upcoming jdk9 j.u.c JEP

Greg Wilkins
In reply to this post by Remi Forax

Remi,

I think the point that you are making is that implementations do have to be careful about which threads they use when calling back (this is not unique to this API, but a common problem with all async APIs).

In our jetty work, we've encapsulated the concerns in a base class

https://github.com/jetty-project/jetty-reactive/blob/master/src/main/java/org/eclipse/jetty/reactive/IteratingProcessor.java

Who's prime intention is to stop the infinite recursion that can result with request() -> onNext() -> request() -> ...  However it also addresses similar issues of using the thread calling subscribe to perform work and other callbacks.

cheers





On 24 July 2015 at 10:12, Remi Forax <[hidden email]> wrote:
Hi Doug,
I've again try to understand how the Flow API can be safe and i still do not understand.

Let suppose i want to use the OneShotPublisher with the SampleSubscriber described in the overview of the class Flow.
If you take a look to the fields of the class SampleSubscriber:
- count is written in onSubscribe which is called by the thread that call subscribe() but read in onNext() which is called by a thread of the executor service.
- subscription has the same issue, it is initialized by onSubscribe() and read by onNext().
Maybe it's working because ExecutorService.submit() works like Thread.start() but as far as i know, the javadoc of sumit() doesn't specify that.

Moreover, the API expose an issue that are not visible in the current snippets of code,
onError() can be called either by the thread that have called subscribe() and the thread that call onNext(), so basically, onError() only works if the code of onError() has no side effect, so ex.printStackTrace is fine but anything but logging is not.

I'm sure i'm wrong and there is something subtle that my poor mind doesn't understand.

Rémi


On 07/23/2015 04:23 PM, Doug Lea wrote:
Thanks to people who have tried out the proposed updates for jdk9
(modest, compared to previous releases) that have been in place since
January -- a few enhancements to CompletableFuture, added class
SubmissionPublisher and its interface-holder class Flow, plus misc
minor updates. If you haven't, try them out from the usual places:

    API specs: http://gee.cs.oswego.edu/dl/jsr166/dist/docs/
    jar file: http://gee.cs.oswego.edu/dl/jsr166/dist/jsr166.jar (compiled using Java8 javac).
    Browsable CVS sources: http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/src/main/java/util/

(As usual, the easiest way is to run with -Xbootclasspath/p:jsr166.jar
but this will almost surely change under upcoming jdk9 module support.)

We've been also working to improve async performance more generally
inside implementations. Things seem to be stabilizing without
controversy. (There has been continuing discussion of Flow interfaces
at http://www.reactive-streams.org/ but nothing that has impacted
these APIs.)  So it's time to actually propose them for jdk9 as an
openjdk JEP.  Which I plan to do within a few days. If you have any
comments or concerns, please let me know!

Some notes:

* Reactive-stream users may be disappointed that we do not include any
net/IO-based Flow.Publisher/Subscriber classes, considering that
reactive-streams are mainly motivated by net-based frameworks. The
reasons for triaging these out are that (1) IO generally falls outside
of java.util.concurrent (2) Most net-based frameworks seem to use
custom data representation etc (e.g., JSON) that are even further out
of scope.  However class SubmissionPublisher can be used as an adaptor
to turn just about any kind of source into a Publisher, so provides a
nearly universal way of constructing a good non-custom Publisher even
from IO-based sources.  (Also notice that SubmissionPublisher can
serve as the basis of other actor-like frameworks, including those
turning off back-pressure by calling
subscription.request(Long.MAX_VALUE) in onSubscribe).

* If you are trying out updates with jdk9 early access,
(https://jdk9.java.net/download/) notice that as of b73 last week,
it by default uses the G1 collector, which is (currently,
but hopefully just transiently) not usually the best choice for
high-throughput concurrency. You probably want to use switches
 -XX:+UseParallelGC -XX:+UseCondCardMark -XX:-UseBiasedLocking

* The implementation updates (some not yet committed) focus on
improvements in async contexts (CompletableFutures,
SubmissionPublishers, plus other older components).  Symmetric async
(where agents may be both producers and consumers) performance is
inherently more variable than other cases. ForkJoinPool in particular
handles now these better. Updates include (1) pool.submit and
task.fork now have identical semantics; (2) "Async" FJP mode is now
probabilistically fair; (3) Spare threads cannot accumulate; and (4)
Ramp-down is faster on large systems.  (In fact, of the oddities under
abuse that Ed Harned enjoys identifying, I think the only remaining
one is that we still take a global lock to fill in stack traces for
cross-thread exceptions, which can slow things to a crawl if you have
tons of them, but is still worthwhile in helping developer track down
most problems.)

* It is likely that some implementations (and possibly even new
classes) will be impacted by VarHandles-related work (mainly by Paul
Sandoz), that replaced the syntactically-nicer but too-problematic
"Enhanced Volatiles" proposal (still the same JEP
http://openjdk.java.net/jeps/193).

-Doug

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest


_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: Upcoming jdk9 j.u.c JEP

Roland Kuhn-2
In reply to this post by Greg Wilkins
Hi Greg,

the reasoning behind the asymmetric RS design is that this communication primitive targets unidirectional communication, bidirectional conversations would utilize two such streams running in opposite directions. This means that for a single stream data elements (of which onError is but a special one) flow downstream and only demand flows upstream. Publishers only need to know about when and if to produce the next element(s), hence we didn’t see a use-case for propagating more information than “N elements needed” and “no more elements needed”. 

If a single Reactive Stream could transport data upstream then we would need to implement back-pressure on that back channel as well, leading to the same complexity as having two RS running in opposite directions. Another reason why we made this separation lies in not burdening the API designers of conforming implementations with an impossible task: the combinators offered on stream transformation APIs flow with the (English) language from left to right and describe sequences of transformation stages but with data flowing upstream there would be the need for also describing how to handle that—even if it is “only” an error channel—and since these data flow in the opposite direction there would be no natural way to write this down.

Learning about the reason behind cancellation seems geared towards recovery in the sense that the Publisher would then construct and attach a different Subscriber afterwards—please let me know if you have something else in mind—and if you want to do that then the Subscriber will in any case be under the Publisher’s control and can use a different channel to communicate the onError signal back to the data source. Since that channel would transport data it would be a separate one flowing in the opposite direction as mentioned above, at least conceptually; with a single element like you describe it could well be a simpler callback mechanism and might not need full back-pressure.

I hope this clarifies some of the background behind the RS design. Please share more of your intended use of an error back-channel so that we can understand what exactly the upstream components would do with that data in the example case you mention.

Regards,

Roland

24 jul 2015 kl. 00:35 skrev Greg Wilkins <[hidden email]>:



On 24 July 2015 at 00:23, Doug Lea <[hidden email]> wrote:

* Reactive-stream users may be disappointed that we do not include any
net/IO-based Flow.Publisher/Subscriber classes, considering that
reactive-streams are mainly motivated by net-based frameworks.  The
reasons for triaging these out are that (1) IO generally falls outside
of java.util.concurrent (2) Most net-based frameworks seem to use
custom data representation etc (e.g., JSON) that are even further out
of scope.  However class SubmissionPublisher can be used as an adaptor
to turn just about any kind of source into a Publisher, so provides a
nearly universal way of constructing a good non-custom Publisher even
from IO-based sources.  (Also notice that SubmissionPublisher can
serve as the basis of other actor-like frameworks, including those
turning off back-pressure by calling
subscription.request(Long.MAX_VALUE) in onSubscribe).



Doug et al,

The Jetty project has been experimenting with the reactive streams API: https://github.com/jetty-project/jetty-reactive albiet not with the JDK-9 version of it, but inspired by the proposed inclusion of it.

We very much like the API and what it can bring to our space.  We don't see that it needs direct IO support and that it's power is actually bridging domains with a good asynchronous model that supports flow control.  

We've also begun some preliminary discussions about developing RS based proposal for the Servlet 4.0 specification.    Currently the Servlet API does well support asynchronous IO and behaviour, but the API is deceptively difficult to use correctly and gives no support for back pressure.   With RS's we can envisage solutions that look like:
  • A database provides a RS Producer that provides the large results of a query asynchronously from a remote database server
  • Some business logic is encapsulated as a RS Processor subscribed to the database producer
  • Some framework provided  Porocessors subscribe to the business logic Processor to perform a chain of functions such as serialization, compression
  • A container provided Subscriber terminates the chain and sends the resulting byte out over HTTP/HTTP2 or Websocket.   The flow control mechanisms of these protocols would be the basis of the RS back pressure. 
In such solutions, a full HTTP/2 flow control window would result in back pressure on the remote database server, allowing threadless waiting without unlimited queuing of data.

However, we have a significant concern with the API in that we do not like it's error handling design.  Specifically that it is asymmetric and an error in the middle of a chain of processors can be propagated downstream with onError(Throwable) but can only be propagated upstream with cancel().

We believe that cancel without reason is an insufficient semantic to build a robust ecosystem of RS Processors that can be used to build applications.   Consider the above example, it would be ideal if the object serialization was handled by a 3rd party Processor (let's say JSONEncodingProcessor). If the business logic erroneously sent an non-jsonable object, or if the JSON converter was incorrectly configured then the JSONEcondiingProcessor could encounter an error during its onNext(Object item) handling and it's only permitted handling of that is to cancel the stream, without explanation.

I have raised this as an issue on the RS github and it the current recommendation is to log and cancel: https://github.com/reactive-streams/reactive-streams-jvm/issues/271#issuecomment-121974544   However I believe that log and cancel is a insufficient semantic.   Logging in assembled applications is often fraught as each component provider will fight over which logging framework is best.  RS chains may cross jurisdictional boundaries and logs may not even be readily available.

The solution we see is to replace/augment cancel() with either cancel(Throwable reason) or an upstream onError(Throwable reason).  I acknowledge that the passed reason may not always be meaningful to the upstream processors and publishers, but it is better to ignore a meaningless reason than to be ignorant of a meaningful one.

When considering this API, we have to look beyond usages that work well and consider usages that will fail well also!

cheers

--
Greg Wilkins <[hidden email]>

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest

--
I'm a physicist: I have a basic working knowledge of the universe and everything it contains!
    - Sheldon Cooper (The Big Bang Theory)


_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: Upcoming jdk9 j.u.c JEP

Greg Wilkins
Roland,

thanks for the response.

But I don't understand why you consider a terminal exception being notified upstream as a data flow?   It is data, but it is not a flow because it is terminal and cannot be used as a back channel.

Implementations of the API are already required to send data upstream:  Cancellation is a terminal boolean data state that must be sent upstream, and request(int) is a flow of integers that must be sent upstream [and as an aside, it is not beyond imagination that request(int) will be misused as a back channel for data - hey it might even get used to send an error code immediately prior/post to a cancel! ]

Thus I don't see that there is any significant additional complexity with that cancellation having a reason associated with it.   Implementations must already support upward bound data and any sequencing and/or race conditions that exist with cancel(Throwable) also exist with just cancel().
 
I also dispute that a Subscriber will be under the control of the Publisher.     In the example cited and application is providing a Processor, that is using a Publisher provided by a 3rd party database and an Subscriber provided by the Servlet container, with perhaps some framework provided Processors for serialization.   In this example there is the possibility of components from at least 4 difference code sources being combined in a chain that crosses deployment administration boundaries of: database, application and server.     The log & cancel handling of errors is going to be very difficult because many different log mechanism may be in use and access may not be easily achieved.  ie applications developers may not have full viability of database logs or servlet container logs.

The type of error I'm concerned about are all terminal style errors and not intended to be a back flow of data, nor acknowledgement of messages sent.   It is probably that the implementers of cancel(Throwable) would just log, cancel themselves and pass on the cancel(Throwable) to any of their Subscripions.   However the point being that would allow the reason for the failure to cross the administrative boundaries so that it can be known to all.

I think that any argument that can be made for not sending a Throwable upstream can equally be made for not sending one downstream (or for not having any exceptions in the java language).   Exceptions are very rarely handled in any meaningful way, but are extremely useful for passing details of a failure so that they may be known to all who may need to know.   

Without exceptions  I'm imagining many many  stack over flow questions like "Why was my Subscription cancelled?" followed by obligatory "RTFLog Stupid!" responses!

cheers

















On 24 July 2015 at 15:55, Roland Kuhn <[hidden email]> wrote:
Hi Greg,

the reasoning behind the asymmetric RS design is that this communication primitive targets unidirectional communication, bidirectional conversations would utilize two such streams running in opposite directions. This means that for a single stream data elements (of which onError is but a special one) flow downstream and only demand flows upstream. Publishers only need to know about when and if to produce the next element(s), hence we didn’t see a use-case for propagating more information than “N elements needed” and “no more elements needed”. 

If a single Reactive Stream could transport data upstream then we would need to implement back-pressure on that back channel as well, leading to the same complexity as having two RS running in opposite directions. Another reason why we made this separation lies in not burdening the API designers of conforming implementations with an impossible task: the combinators offered on stream transformation APIs flow with the (English) language from left to right and describe sequences of transformation stages but with data flowing upstream there would be the need for also describing how to handle that—even if it is “only” an error channel—and since these data flow in the opposite direction there would be no natural way to write this down.

Learning about the reason behind cancellation seems geared towards recovery in the sense that the Publisher would then construct and attach a different Subscriber afterwards—please let me know if you have something else in mind—and if you want to do that then the Subscriber will in any case be under the Publisher’s control and can use a different channel to communicate the onError signal back to the data source. Since that channel would transport data it would be a separate one flowing in the opposite direction as mentioned above, at least conceptually; with a single element like you describe it could well be a simpler callback mechanism and might not need full back-pressure.

I hope this clarifies some of the background behind the RS design. Please share more of your intended use of an error back-channel so that we can understand what exactly the upstream components would do with that data in the example case you mention.

Regards,

Roland

24 jul 2015 kl. 00:35 skrev Greg Wilkins <[hidden email]>:



On 24 July 2015 at 00:23, Doug Lea <[hidden email]> wrote:

* Reactive-stream users may be disappointed that we do not include any
net/IO-based Flow.Publisher/Subscriber classes, considering that
reactive-streams are mainly motivated by net-based frameworks.  The
reasons for triaging these out are that (1) IO generally falls outside
of java.util.concurrent (2) Most net-based frameworks seem to use
custom data representation etc (e.g., JSON) that are even further out
of scope.  However class SubmissionPublisher can be used as an adaptor
to turn just about any kind of source into a Publisher, so provides a
nearly universal way of constructing a good non-custom Publisher even
from IO-based sources.  (Also notice that SubmissionPublisher can
serve as the basis of other actor-like frameworks, including those
turning off back-pressure by calling
subscription.request(Long.MAX_VALUE) in onSubscribe).



Doug et al,

The Jetty project has been experimenting with the reactive streams API: https://github.com/jetty-project/jetty-reactive albiet not with the JDK-9 version of it, but inspired by the proposed inclusion of it.

We very much like the API and what it can bring to our space.  We don't see that it needs direct IO support and that it's power is actually bridging domains with a good asynchronous model that supports flow control.  

We've also begun some preliminary discussions about developing RS based proposal for the Servlet 4.0 specification.    Currently the Servlet API does well support asynchronous IO and behaviour, but the API is deceptively difficult to use correctly and gives no support for back pressure.   With RS's we can envisage solutions that look like:
  • A database provides a RS Producer that provides the large results of a query asynchronously from a remote database server
  • Some business logic is encapsulated as a RS Processor subscribed to the database producer
  • Some framework provided  Porocessors subscribe to the business logic Processor to perform a chain of functions such as serialization, compression
  • A container provided Subscriber terminates the chain and sends the resulting byte out over HTTP/HTTP2 or Websocket.   The flow control mechanisms of these protocols would be the basis of the RS back pressure. 
In such solutions, a full HTTP/2 flow control window would result in back pressure on the remote database server, allowing threadless waiting without unlimited queuing of data.

However, we have a significant concern with the API in that we do not like it's error handling design.  Specifically that it is asymmetric and an error in the middle of a chain of processors can be propagated downstream with onError(Throwable) but can only be propagated upstream with cancel().

We believe that cancel without reason is an insufficient semantic to build a robust ecosystem of RS Processors that can be used to build applications.   Consider the above example, it would be ideal if the object serialization was handled by a 3rd party Processor (let's say JSONEncodingProcessor). If the business logic erroneously sent an non-jsonable object, or if the JSON converter was incorrectly configured then the JSONEcondiingProcessor could encounter an error during its onNext(Object item) handling and it's only permitted handling of that is to cancel the stream, without explanation.

I have raised this as an issue on the RS github and it the current recommendation is to log and cancel: https://github.com/reactive-streams/reactive-streams-jvm/issues/271#issuecomment-121974544   However I believe that log and cancel is a insufficient semantic.   Logging in assembled applications is often fraught as each component provider will fight over which logging framework is best.  RS chains may cross jurisdictional boundaries and logs may not even be readily available.

The solution we see is to replace/augment cancel() with either cancel(Throwable reason) or an upstream onError(Throwable reason).  I acknowledge that the passed reason may not always be meaningful to the upstream processors and publishers, but it is better to ignore a meaningless reason than to be ignorant of a meaningful one.

When considering this API, we have to look beyond usages that work well and consider usages that will fail well also!

cheers

--
Greg Wilkins <[hidden email]>

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest

--
I'm a physicist: I have a basic working knowledge of the universe and everything it contains!
    - Sheldon Cooper (The Big Bang Theory)



_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: Upcoming jdk9 j.u.c JEP

Roland Kuhn-2
Hi Greg,

my reply has obviously opened two different discussions (namely “why are things as they are?” and “what is the suggested change all about”), I think it would be most fruitful if we stash the first one for now and come back to it after the second one has been understood better—at least by myself. That will put us into a better situation for judging the big picture.

Considering the flow of data from the DB via application and framework processors into the Servlet container, at any point along this line failures can happen. The component emitting the failure will use whatever means it has outside of Reactive Streams to log/audit/monitor and provide metrics, I assume that that is just part of all reasonable code; the database will do that, the application will do it, the framework will probably allow the application to configure how to do that, and the application server will be configured how to do that. This means that everyone can debug their own failures.

Data are flowing towards the Servlet (destined for whichever client made the request) and it is important to signal abnormal termination differently from normal termination, hence the onError propagation in this direction. This also allows downstream components to see failures coming from upstream, but this is a byproduct of needing to generate the right kind of final response to the external client. Now the interesting question is: why would the database need to know that some downstream component choked on the data it emitted? How exactly would this information be used by the database or its operators/programmers? Arguably the data exist and are “correct” by definition, guarded by Java types, and any validation errors that occur are not stream failures (cf. this definition) and should be treated as normal data elements and sent downstream (or filtered out, depending on the requirements & protocol).

I am deliberately painting with high contrast colors here in order to better understand what exactly it is that you want to achieve instead of just discussing the proposed solution, thanks for your patience!

Regards,

Roland

24 jul 2015 kl. 08:31 skrev Greg Wilkins <[hidden email]>:

Roland,

thanks for the response.

But I don't understand why you consider a terminal exception being notified upstream as a data flow?   It is data, but it is not a flow because it is terminal and cannot be used as a back channel.

Implementations of the API are already required to send data upstream:  Cancellation is a terminal boolean data state that must be sent upstream, and request(int) is a flow of integers that must be sent upstream [and as an aside, it is not beyond imagination that request(int) will be misused as a back channel for data - hey it might even get used to send an error code immediately prior/post to a cancel! ]

Thus I don't see that there is any significant additional complexity with that cancellation having a reason associated with it.   Implementations must already support upward bound data and any sequencing and/or race conditions that exist with cancel(Throwable) also exist with just cancel().
 
I also dispute that a Subscriber will be under the control of the Publisher.     In the example cited and application is providing a Processor, that is using a Publisher provided by a 3rd party database and an Subscriber provided by the Servlet container, with perhaps some framework provided Processors for serialization.   In this example there is the possibility of components from at least 4 difference code sources being combined in a chain that crosses deployment administration boundaries of: database, application and server.     The log & cancel handling of errors is going to be very difficult because many different log mechanism may be in use and access may not be easily achieved.  ie applications developers may not have full viability of database logs or servlet container logs.

The type of error I'm concerned about are all terminal style errors and not intended to be a back flow of data, nor acknowledgement of messages sent.   It is probably that the implementers of cancel(Throwable) would just log, cancel themselves and pass on the cancel(Throwable) to any of their Subscripions.   However the point being that would allow the reason for the failure to cross the administrative boundaries so that it can be known to all.

I think that any argument that can be made for not sending a Throwable upstream can equally be made for not sending one downstream (or for not having any exceptions in the java language).   Exceptions are very rarely handled in any meaningful way, but are extremely useful for passing details of a failure so that they may be known to all who may need to know.   

Without exceptions  I'm imagining many many  stack over flow questions like "Why was my Subscription cancelled?" followed by obligatory "RTFLog Stupid!" responses!

cheers

















On 24 July 2015 at 15:55, Roland Kuhn <[hidden email]> wrote:
Hi Greg,

the reasoning behind the asymmetric RS design is that this communication primitive targets unidirectional communication, bidirectional conversations would utilize two such streams running in opposite directions. This means that for a single stream data elements (of which onError is but a special one) flow downstream and only demand flows upstream. Publishers only need to know about when and if to produce the next element(s), hence we didn’t see a use-case for propagating more information than “N elements needed” and “no more elements needed”. 

If a single Reactive Stream could transport data upstream then we would need to implement back-pressure on that back channel as well, leading to the same complexity as having two RS running in opposite directions. Another reason why we made this separation lies in not burdening the API designers of conforming implementations with an impossible task: the combinators offered on stream transformation APIs flow with the (English) language from left to right and describe sequences of transformation stages but with data flowing upstream there would be the need for also describing how to handle that—even if it is “only” an error channel—and since these data flow in the opposite direction there would be no natural way to write this down.

Learning about the reason behind cancellation seems geared towards recovery in the sense that the Publisher would then construct and attach a different Subscriber afterwards—please let me know if you have something else in mind—and if you want to do that then the Subscriber will in any case be under the Publisher’s control and can use a different channel to communicate the onError signal back to the data source. Since that channel would transport data it would be a separate one flowing in the opposite direction as mentioned above, at least conceptually; with a single element like you describe it could well be a simpler callback mechanism and might not need full back-pressure.

I hope this clarifies some of the background behind the RS design. Please share more of your intended use of an error back-channel so that we can understand what exactly the upstream components would do with that data in the example case you mention.

Regards,

Roland

24 jul 2015 kl. 00:35 skrev Greg Wilkins <[hidden email]>:



On 24 July 2015 at 00:23, Doug Lea <[hidden email]> wrote:

* Reactive-stream users may be disappointed that we do not include any
net/IO-based Flow.Publisher/Subscriber classes, considering that
reactive-streams are mainly motivated by net-based frameworks.  The
reasons for triaging these out are that (1) IO generally falls outside
of java.util.concurrent (2) Most net-based frameworks seem to use
custom data representation etc (e.g., JSON) that are even further out
of scope.  However class SubmissionPublisher can be used as an adaptor
to turn just about any kind of source into a Publisher, so provides a
nearly universal way of constructing a good non-custom Publisher even
from IO-based sources.  (Also notice that SubmissionPublisher can
serve as the basis of other actor-like frameworks, including those
turning off back-pressure by calling
subscription.request(Long.MAX_VALUE) in onSubscribe).



Doug et al,

The Jetty project has been experimenting with the reactive streams API: https://github.com/jetty-project/jetty-reactive albiet not with the JDK-9 version of it, but inspired by the proposed inclusion of it.

We very much like the API and what it can bring to our space.  We don't see that it needs direct IO support and that it's power is actually bridging domains with a good asynchronous model that supports flow control.  

We've also begun some preliminary discussions about developing RS based proposal for the Servlet 4.0 specification.    Currently the Servlet API does well support asynchronous IO and behaviour, but the API is deceptively difficult to use correctly and gives no support for back pressure.   With RS's we can envisage solutions that look like:
  • A database provides a RS Producer that provides the large results of a query asynchronously from a remote database server
  • Some business logic is encapsulated as a RS Processor subscribed to the database producer
  • Some framework provided  Porocessors subscribe to the business logic Processor to perform a chain of functions such as serialization, compression
  • A container provided Subscriber terminates the chain and sends the resulting byte out over HTTP/HTTP2 or Websocket.   The flow control mechanisms of these protocols would be the basis of the RS back pressure. 
In such solutions, a full HTTP/2 flow control window would result in back pressure on the remote database server, allowing threadless waiting without unlimited queuing of data.

However, we have a significant concern with the API in that we do not like it's error handling design.  Specifically that it is asymmetric and an error in the middle of a chain of processors can be propagated downstream with onError(Throwable) but can only be propagated upstream with cancel().

We believe that cancel without reason is an insufficient semantic to build a robust ecosystem of RS Processors that can be used to build applications.   Consider the above example, it would be ideal if the object serialization was handled by a 3rd party Processor (let's say JSONEncodingProcessor). If the business logic erroneously sent an non-jsonable object, or if the JSON converter was incorrectly configured then the JSONEcondiingProcessor could encounter an error during its onNext(Object item) handling and it's only permitted handling of that is to cancel the stream, without explanation.

I have raised this as an issue on the RS github and it the current recommendation is to log and cancel: https://github.com/reactive-streams/reactive-streams-jvm/issues/271#issuecomment-121974544   However I believe that log and cancel is a insufficient semantic.   Logging in assembled applications is often fraught as each component provider will fight over which logging framework is best.  RS chains may cross jurisdictional boundaries and logs may not even be readily available.

The solution we see is to replace/augment cancel() with either cancel(Throwable reason) or an upstream onError(Throwable reason).  I acknowledge that the passed reason may not always be meaningful to the upstream processors and publishers, but it is better to ignore a meaningless reason than to be ignorant of a meaningful one.

When considering this API, we have to look beyond usages that work well and consider usages that will fail well also!

cheers

--
Greg Wilkins <[hidden email]>

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest

--
I'm a physicist: I have a basic working knowledge of the universe and everything it contains!
    - Sheldon Cooper (The Big Bang Theory)



--
I'm a physicist: I have a basic working knowledge of the universe and everything it contains!
    - Sheldon Cooper (The Big Bang Theory)


_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: Upcoming jdk9 j.u.c JEP

Greg Wilkins
Roland,

thanks for engaging in this discussion and I like high contrast colours.

Let me pick up on a number of key phrases in your response:

> "everyone can debug their own failures"

The location that a failure occurs does not imply ownership/blame of that failure.  I can think of lots of examples:
  • A publisher calls onNext more than it was allowed to be a request(n) call.  This is a specification compliance problem of the publisher that is detected in the subscriber.  The subscriber does not own this problem and thus cannot debug it.  The publisher needs to be told IllegalStateException, which can't be thrown by the onNext call, as that would make the subscriber violate the specification.
  • A GzipInflator processor receives a tiny message that inflates to an enormous buffer.  Either the OOME or preferably some limit violation exception should be sent to the source of the too large message.
  • A SerializingProcessor that receives an object that is not serializable. Again this is an error of the publisher not the processor.
> " why would the database need to know that some downstream component choked on the data it emitted?"

In the example that I described, the database would not need to know about the exception and would be perfectly able to ignore the associated reason.   However, note that in that example there was an ApplicationProcessor between the JsonProcessor and the database publisher.   In that example it is probably the fault of the Application processor that forwarded a DB object onto the Jsonprocessor when it should not have.   A good implementation of the application would intercept the resulting JSONException in the application processor and just pass on either null or some ApplicationException to the database publisher.

> " Arguably the data exist and are “correct” by definition"

Database developers are not beyond making errors, specially when implementing new asynchronous reactive stream clients.  Just because the database calls onNext does not mean that it is correctly implementing the RS specification.  Just because it provides an object does not mean that object entirely correct.  A database can be corrupted and could send an object containing a string with illegal unicode points for example.

In general If a subscriber has some assumptions about the data a publisher should emit and those assumptions are not meet, then either the subscriber is wrong or the publisher is wrong - but it is beyond the scope of the API to resolve who is at fault, so best to tell both parties and let them each work out who is to blame.   Assuming that publishers are always faultless does not reflect reality.


> "Data are flowing towards the Servlet (destined for whichever client made the request) and it is important to signal abnormal termination differently from normal termination, hence the onError propagation in this direction."

I can bounce your earlier question back at you - why would a HTTPResponseSubscriber need to know the specific reason that some upstream component chocked before completing emitting all its data?  Mostly the subscribers don't really care and just need to know a reason, and just need to know if the termination was abnormal or not, so  onError()  would do just as well as onError(Throwable).... except in the few special cases when it does matter and perhaps the HTTPResponseSubscriber wants to look at the exception to decide if it should send a 500, 503, 403, an chunk trailer, etc.

This type of argument applies equally to upstream and downstream. For the most part the reason throwable is ignorable in both directions, however there are always going to be special cases when it is meaningful.

I content that it is better to ignore a meaningless reason than to be ignorant of a meaningful one.

cheers






On 24 July 2015 at 17:12, Roland Kuhn <[hidden email]> wrote:
Hi Greg,

my reply has obviously opened two different discussions (namely “why are things as they are?” and “what is the suggested change all about”), I think it would be most fruitful if we stash the first one for now and come back to it after the second one has been understood better—at least by myself. That will put us into a better situation for judging the big picture.

Considering the flow of data from the DB via application and framework processors into the Servlet container, at any point along this line failures can happen. The component emitting the failure will use whatever means it has outside of Reactive Streams to log/audit/monitor and provide metrics, I assume that that is just part of all reasonable code; the database will do that, the application will do it, the framework will probably allow the application to configure how to do that, and the application server will be configured how to do that. This means that everyone can debug their own failures.

Data are flowing towards the Servlet (destined for whichever client made the request) and it is important to signal abnormal termination differently from normal termination, hence the onError propagation in this direction. This also allows downstream components to see failures coming from upstream, but this is a byproduct of needing to generate the right kind of final response to the external client. Now the interesting question is: why would the database need to know that some downstream component choked on the data it emitted? How exactly would this information be used by the database or its operators/programmers? Arguably the data exist and are “correct” by definition, guarded by Java types, and any validation errors that occur are not stream failures (cf. this definition) and should be treated as normal data elements and sent downstream (or filtered out, depending on the requirements & protocol).

I am deliberately painting with high contrast colors here in order to better understand what exactly it is that you want to achieve instead of just discussing the proposed solution, thanks for your patience!

Regards,

Roland

24 jul 2015 kl. 08:31 skrev Greg Wilkins <[hidden email]>:

Roland,

thanks for the response.

But I don't understand why you consider a terminal exception being notified upstream as a data flow?   It is data, but it is not a flow because it is terminal and cannot be used as a back channel.

Implementations of the API are already required to send data upstream:  Cancellation is a terminal boolean data state that must be sent upstream, and request(int) is a flow of integers that must be sent upstream [and as an aside, it is not beyond imagination that request(int) will be misused as a back channel for data - hey it might even get used to send an error code immediately prior/post to a cancel! ]

Thus I don't see that there is any significant additional complexity with that cancellation having a reason associated with it.   Implementations must already support upward bound data and any sequencing and/or race conditions that exist with cancel(Throwable) also exist with just cancel().
 
I also dispute that a Subscriber will be under the control of the Publisher.     In the example cited and application is providing a Processor, that is using a Publisher provided by a 3rd party database and an Subscriber provided by the Servlet container, with perhaps some framework provided Processors for serialization.   In this example there is the possibility of components from at least 4 difference code sources being combined in a chain that crosses deployment administration boundaries of: database, application and server.     The log & cancel handling of errors is going to be very difficult because many different log mechanism may be in use and access may not be easily achieved.  ie applications developers may not have full viability of database logs or servlet container logs.

The type of error I'm concerned about are all terminal style errors and not intended to be a back flow of data, nor acknowledgement of messages sent.   It is probably that the implementers of cancel(Throwable) would just log, cancel themselves and pass on the cancel(Throwable) to any of their Subscripions.   However the point being that would allow the reason for the failure to cross the administrative boundaries so that it can be known to all.

I think that any argument that can be made for not sending a Throwable upstream can equally be made for not sending one downstream (or for not having any exceptions in the java language).   Exceptions are very rarely handled in any meaningful way, but are extremely useful for passing details of a failure so that they may be known to all who may need to know.   

Without exceptions  I'm imagining many many  stack over flow questions like "Why was my Subscription cancelled?" followed by obligatory "RTFLog Stupid!" responses!

cheers

















On 24 July 2015 at 15:55, Roland Kuhn <[hidden email]> wrote:
Hi Greg,

the reasoning behind the asymmetric RS design is that this communication primitive targets unidirectional communication, bidirectional conversations would utilize two such streams running in opposite directions. This means that for a single stream data elements (of which onError is but a special one) flow downstream and only demand flows upstream. Publishers only need to know about when and if to produce the next element(s), hence we didn’t see a use-case for propagating more information than “N elements needed” and “no more elements needed”. 

If a single Reactive Stream could transport data upstream then we would need to implement back-pressure on that back channel as well, leading to the same complexity as having two RS running in opposite directions. Another reason why we made this separation lies in not burdening the API designers of conforming implementations with an impossible task: the combinators offered on stream transformation APIs flow with the (English) language from left to right and describe sequences of transformation stages but with data flowing upstream there would be the need for also describing how to handle that—even if it is “only” an error channel—and since these data flow in the opposite direction there would be no natural way to write this down.

Learning about the reason behind cancellation seems geared towards recovery in the sense that the Publisher would then construct and attach a different Subscriber afterwards—please let me know if you have something else in mind—and if you want to do that then the Subscriber will in any case be under the Publisher’s control and can use a different channel to communicate the onError signal back to the data source. Since that channel would transport data it would be a separate one flowing in the opposite direction as mentioned above, at least conceptually; with a single element like you describe it could well be a simpler callback mechanism and might not need full back-pressure.

I hope this clarifies some of the background behind the RS design. Please share more of your intended use of an error back-channel so that we can understand what exactly the upstream components would do with that data in the example case you mention.

Regards,

Roland

24 jul 2015 kl. 00:35 skrev Greg Wilkins <[hidden email]>:



On 24 July 2015 at 00:23, Doug Lea <[hidden email]> wrote:

* Reactive-stream users may be disappointed that we do not include any
net/IO-based Flow.Publisher/Subscriber classes, considering that
reactive-streams are mainly motivated by net-based frameworks.  The
reasons for triaging these out are that (1) IO generally falls outside
of java.util.concurrent (2) Most net-based frameworks seem to use
custom data representation etc (e.g., JSON) that are even further out
of scope.  However class SubmissionPublisher can be used as an adaptor
to turn just about any kind of source into a Publisher, so provides a
nearly universal way of constructing a good non-custom Publisher even
from IO-based sources.  (Also notice that SubmissionPublisher can
serve as the basis of other actor-like frameworks, including those
turning off back-pressure by calling
subscription.request(Long.MAX_VALUE) in onSubscribe).



Doug et al,

The Jetty project has been experimenting with the reactive streams API: https://github.com/jetty-project/jetty-reactive albiet not with the JDK-9 version of it, but inspired by the proposed inclusion of it.

We very much like the API and what it can bring to our space.  We don't see that it needs direct IO support and that it's power is actually bridging domains with a good asynchronous model that supports flow control.  

We've also begun some preliminary discussions about developing RS based proposal for the Servlet 4.0 specification.    Currently the Servlet API does well support asynchronous IO and behaviour, but the API is deceptively difficult to use correctly and gives no support for back pressure.   With RS's we can envisage solutions that look like:
  • A database provides a RS Producer that provides the large results of a query asynchronously from a remote database server
  • Some business logic is encapsulated as a RS Processor subscribed to the database producer
  • Some framework provided  Porocessors subscribe to the business logic Processor to perform a chain of functions such as serialization, compression
  • A container provided Subscriber terminates the chain and sends the resulting byte out over HTTP/HTTP2 or Websocket.   The flow control mechanisms of these protocols would be the basis of the RS back pressure. 
In such solutions, a full HTTP/2 flow control window would result in back pressure on the remote database server, allowing threadless waiting without unlimited queuing of data.

However, we have a significant concern with the API in that we do not like it's error handling design.  Specifically that it is asymmetric and an error in the middle of a chain of processors can be propagated downstream with onError(Throwable) but can only be propagated upstream with cancel().

We believe that cancel without reason is an insufficient semantic to build a robust ecosystem of RS Processors that can be used to build applications.   Consider the above example, it would be ideal if the object serialization was handled by a 3rd party Processor (let's say JSONEncodingProcessor). If the business logic erroneously sent an non-jsonable object, or if the JSON converter was incorrectly configured then the JSONEcondiingProcessor could encounter an error during its onNext(Object item) handling and it's only permitted handling of that is to cancel the stream, without explanation.

I have raised this as an issue on the RS github and it the current recommendation is to log and cancel: https://github.com/reactive-streams/reactive-streams-jvm/issues/271#issuecomment-121974544   However I believe that log and cancel is a insufficient semantic.   Logging in assembled applications is often fraught as each component provider will fight over which logging framework is best.  RS chains may cross jurisdictional boundaries and logs may not even be readily available.

The solution we see is to replace/augment cancel() with either cancel(Throwable reason) or an upstream onError(Throwable reason).  I acknowledge that the passed reason may not always be meaningful to the upstream processors and publishers, but it is better to ignore a meaningless reason than to be ignorant of a meaningful one.

When considering this API, we have to look beyond usages that work well and consider usages that will fail well also!

cheers

--
Greg Wilkins <[hidden email]>

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest

--
I'm a physicist: I have a basic working knowledge of the universe and everything it contains!
    - Sheldon Cooper (The Big Bang Theory)



--
I'm a physicist: I have a basic working knowledge of the universe and everything it contains!
    - Sheldon Cooper (The Big Bang Theory)



_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: Upcoming jdk9 j.u.c JEP

Viktor Klang
In reply to this post by Roland Kuhn-2
This is a great discussion on the rationale for unidirectional fatal error propagation with Reactive Streams,
may I be so bold to kindly request that we move it to the aforementioned RS Issue such that other parties that may not frequent this fine mailing list can stitch together the conversation that led to the final conclusion in this discussion?

On Fri, Jul 24, 2015 at 9:12 AM, Roland Kuhn <[hidden email]> wrote:
Hi Greg,

my reply has obviously opened two different discussions (namely “why are things as they are?” and “what is the suggested change all about”), I think it would be most fruitful if we stash the first one for now and come back to it after the second one has been understood better—at least by myself. That will put us into a better situation for judging the big picture.

Considering the flow of data from the DB via application and framework processors into the Servlet container, at any point along this line failures can happen. The component emitting the failure will use whatever means it has outside of Reactive Streams to log/audit/monitor and provide metrics, I assume that that is just part of all reasonable code; the database will do that, the application will do it, the framework will probably allow the application to configure how to do that, and the application server will be configured how to do that. This means that everyone can debug their own failures.

Data are flowing towards the Servlet (destined for whichever client made the request) and it is important to signal abnormal termination differently from normal termination, hence the onError propagation in this direction. This also allows downstream components to see failures coming from upstream, but this is a byproduct of needing to generate the right kind of final response to the external client. Now the interesting question is: why would the database need to know that some downstream component choked on the data it emitted? How exactly would this information be used by the database or its operators/programmers? Arguably the data exist and are “correct” by definition, guarded by Java types, and any validation errors that occur are not stream failures (cf. this definition) and should be treated as normal data elements and sent downstream (or filtered out, depending on the requirements & protocol).

I am deliberately painting with high contrast colors here in order to better understand what exactly it is that you want to achieve instead of just discussing the proposed solution, thanks for your patience!

Regards,

Roland

24 jul 2015 kl. 08:31 skrev Greg Wilkins <[hidden email]>:

Roland,

thanks for the response.

But I don't understand why you consider a terminal exception being notified upstream as a data flow?   It is data, but it is not a flow because it is terminal and cannot be used as a back channel.

Implementations of the API are already required to send data upstream:  Cancellation is a terminal boolean data state that must be sent upstream, and request(int) is a flow of integers that must be sent upstream [and as an aside, it is not beyond imagination that request(int) will be misused as a back channel for data - hey it might even get used to send an error code immediately prior/post to a cancel! ]

Thus I don't see that there is any significant additional complexity with that cancellation having a reason associated with it.   Implementations must already support upward bound data and any sequencing and/or race conditions that exist with cancel(Throwable) also exist with just cancel().
 
I also dispute that a Subscriber will be under the control of the Publisher.     In the example cited and application is providing a Processor, that is using a Publisher provided by a 3rd party database and an Subscriber provided by the Servlet container, with perhaps some framework provided Processors for serialization.   In this example there is the possibility of components from at least 4 difference code sources being combined in a chain that crosses deployment administration boundaries of: database, application and server.     The log & cancel handling of errors is going to be very difficult because many different log mechanism may be in use and access may not be easily achieved.  ie applications developers may not have full viability of database logs or servlet container logs.

The type of error I'm concerned about are all terminal style errors and not intended to be a back flow of data, nor acknowledgement of messages sent.   It is probably that the implementers of cancel(Throwable) would just log, cancel themselves and pass on the cancel(Throwable) to any of their Subscripions.   However the point being that would allow the reason for the failure to cross the administrative boundaries so that it can be known to all.

I think that any argument that can be made for not sending a Throwable upstream can equally be made for not sending one downstream (or for not having any exceptions in the java language).   Exceptions are very rarely handled in any meaningful way, but are extremely useful for passing details of a failure so that they may be known to all who may need to know.   

Without exceptions  I'm imagining many many  stack over flow questions like "Why was my Subscription cancelled?" followed by obligatory "RTFLog Stupid!" responses!

cheers

















On 24 July 2015 at 15:55, Roland Kuhn <[hidden email]> wrote:
Hi Greg,

the reasoning behind the asymmetric RS design is that this communication primitive targets unidirectional communication, bidirectional conversations would utilize two such streams running in opposite directions. This means that for a single stream data elements (of which onError is but a special one) flow downstream and only demand flows upstream. Publishers only need to know about when and if to produce the next element(s), hence we didn’t see a use-case for propagating more information than “N elements needed” and “no more elements needed”. 

If a single Reactive Stream could transport data upstream then we would need to implement back-pressure on that back channel as well, leading to the same complexity as having two RS running in opposite directions. Another reason why we made this separation lies in not burdening the API designers of conforming implementations with an impossible task: the combinators offered on stream transformation APIs flow with the (English) language from left to right and describe sequences of transformation stages but with data flowing upstream there would be the need for also describing how to handle that—even if it is “only” an error channel—and since these data flow in the opposite direction there would be no natural way to write this down.

Learning about the reason behind cancellation seems geared towards recovery in the sense that the Publisher would then construct and attach a different Subscriber afterwards—please let me know if you have something else in mind—and if you want to do that then the Subscriber will in any case be under the Publisher’s control and can use a different channel to communicate the onError signal back to the data source. Since that channel would transport data it would be a separate one flowing in the opposite direction as mentioned above, at least conceptually; with a single element like you describe it could well be a simpler callback mechanism and might not need full back-pressure.

I hope this clarifies some of the background behind the RS design. Please share more of your intended use of an error back-channel so that we can understand what exactly the upstream components would do with that data in the example case you mention.

Regards,

Roland

24 jul 2015 kl. 00:35 skrev Greg Wilkins <[hidden email]>:



On 24 July 2015 at 00:23, Doug Lea <[hidden email]> wrote:

* Reactive-stream users may be disappointed that we do not include any
net/IO-based Flow.Publisher/Subscriber classes, considering that
reactive-streams are mainly motivated by net-based frameworks.  The
reasons for triaging these out are that (1) IO generally falls outside
of java.util.concurrent (2) Most net-based frameworks seem to use
custom data representation etc (e.g., JSON) that are even further out
of scope.  However class SubmissionPublisher can be used as an adaptor
to turn just about any kind of source into a Publisher, so provides a
nearly universal way of constructing a good non-custom Publisher even
from IO-based sources.  (Also notice that SubmissionPublisher can
serve as the basis of other actor-like frameworks, including those
turning off back-pressure by calling
subscription.request(Long.MAX_VALUE) in onSubscribe).



Doug et al,

The Jetty project has been experimenting with the reactive streams API: https://github.com/jetty-project/jetty-reactive albiet not with the JDK-9 version of it, but inspired by the proposed inclusion of it.

We very much like the API and what it can bring to our space.  We don't see that it needs direct IO support and that it's power is actually bridging domains with a good asynchronous model that supports flow control.  

We've also begun some preliminary discussions about developing RS based proposal for the Servlet 4.0 specification.    Currently the Servlet API does well support asynchronous IO and behaviour, but the API is deceptively difficult to use correctly and gives no support for back pressure.   With RS's we can envisage solutions that look like:
  • A database provides a RS Producer that provides the large results of a query asynchronously from a remote database server
  • Some business logic is encapsulated as a RS Processor subscribed to the database producer
  • Some framework provided  Porocessors subscribe to the business logic Processor to perform a chain of functions such as serialization, compression
  • A container provided Subscriber terminates the chain and sends the resulting byte out over HTTP/HTTP2 or Websocket.   The flow control mechanisms of these protocols would be the basis of the RS back pressure. 
In such solutions, a full HTTP/2 flow control window would result in back pressure on the remote database server, allowing threadless waiting without unlimited queuing of data.

However, we have a significant concern with the API in that we do not like it's error handling design.  Specifically that it is asymmetric and an error in the middle of a chain of processors can be propagated downstream with onError(Throwable) but can only be propagated upstream with cancel().

We believe that cancel without reason is an insufficient semantic to build a robust ecosystem of RS Processors that can be used to build applications.   Consider the above example, it would be ideal if the object serialization was handled by a 3rd party Processor (let's say JSONEncodingProcessor). If the business logic erroneously sent an non-jsonable object, or if the JSON converter was incorrectly configured then the JSONEcondiingProcessor could encounter an error during its onNext(Object item) handling and it's only permitted handling of that is to cancel the stream, without explanation.

I have raised this as an issue on the RS github and it the current recommendation is to log and cancel: https://github.com/reactive-streams/reactive-streams-jvm/issues/271#issuecomment-121974544   However I believe that log and cancel is a insufficient semantic.   Logging in assembled applications is often fraught as each component provider will fight over which logging framework is best.  RS chains may cross jurisdictional boundaries and logs may not even be readily available.

The solution we see is to replace/augment cancel() with either cancel(Throwable reason) or an upstream onError(Throwable reason).  I acknowledge that the passed reason may not always be meaningful to the upstream processors and publishers, but it is better to ignore a meaningless reason than to be ignorant of a meaningful one.

When considering this API, we have to look beyond usages that work well and consider usages that will fail well also!

cheers

--
Greg Wilkins <[hidden email]>

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest

--
I'm a physicist: I have a basic working knowledge of the universe and everything it contains!
    - Sheldon Cooper (The Big Bang Theory)



--
I'm a physicist: I have a basic working knowledge of the universe and everything it contains!
    - Sheldon Cooper (The Big Bang Theory)


_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest




--
Cheers,

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: Upcoming jdk9 j.u.c JEP

Doug Lea
In reply to this post by Dávid Karnok
On 07/23/2015 02:55 PM, Dávid Karnok wrote:

> I've read through the code and javadoc and I have the following comments:

Thanks! I addressed all except the ones discussed below:

> Line 70: the code schedules the exception with executor.execute(() ->
> subscriber.onError(ex)); but the subscriber can't cancel this action. For
> example, a subscriber calling request(-1) and then cancel() will inevitably
> receive the exception. I understand the RS spec allows for such late-coming
> events, but since this is an example, it should convey the right practices: one
> should always get the future of a scheduled action.

This is illustrated with the simple OneShot version to minimize sample code
size of a minimal fully compliant implementation at the expense of
glossing over nuances. Unless you see a trivial change that would convey
this, just adding something to more nicely address { request(-1); cancel()}
would probably impair understandability more than it would help.

>
> Line 98: the text reads "Subscriber maintains multiple Subscriptions" which I
> don't see how would look like and I think the RS spec at least implies this
> shouldn't happen.

Right. The next sentence tells people what to do instead:

  * Subscription} are strictly ordered, there is no need for these
  * methods to use locks or volatiles unless a Subscriber maintains
  * multiple Subscriptions (in which case it is better to instead
  * define multiple Subscribers, each with its own Subscription).
  *

I suppose it could be strengthened by replacing "better" with
"less insane" :-)

>
> Line 114: the count variable is set after the request is issued so a synchronous
> onNext call will request bufferSize - bufferSize / 2 immediately, having a total
> of ~1.5 * bufferSize outstanding requests.

Not immediately: count is also initially set to: bufferSize - bufferSize / 2.

(It is nice to illustrate early on that people can use
techniques other than item-by-item request(1). This is the
least code-intensive one I could think of.)

>
> Line 416: the text reads "Preliminary release note: Currently, this method
> collects all items before executing the stream computation." Correct me if I'm
> wrong, but since Stream is pull based, the stream() method would need to inject
> concurrency into the flow:

Yes, in general, it should. The implementation is currently a stand-in.

> Line 222: From experience with RxJava, holding a lock while emitting events
> should be avoided, especially when blocking actions are nearby.

Right. Almost everything is lock-free. However, there is nothing
preventing two different threads generating items into the
same SubmissionPublisher, so this lock keeps the class fully
thread-safe in this case. It does come at the expense of
possibly blocking during onSubscribe, but still seems to be
the best tradeoff.

>
> Line 230: In RxJava, if the Subject is terminated with an exception, late
> Subscribers will receive the exception but here, they just get completed. Maybe
> it is worth considering this exception-replaying behavior.

Thanks. I'm going to sit on this for the moment. One alternative is
to add method isClosedExceptionally so that only those new Subscribers
that care how it was closed need to arrange special handling.

>
> Line 623: getSubscribers() why would anyone be interested in the actual
> subscribers? Calling methods on them shouldn't be allowed anyway and I don't see
> any other reason to expose them.

As noted in the javadoc, it is just for monitoring and management.
We have a lot of methods along these line in j.u.c, that aren't used much,
but people appreciate them when they need them.

-Doug



_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: Upcoming jdk9 j.u.c JEP

Doug Lea
In reply to this post by Greg Wilkins

I've been encouraging people to discuss spec issues at
   https://github.com/reactive-streams/reactive-streams-jvm/issues
just to keep them in one place.

But this one might impact SubmissionPublisher (a potential j.u.c
class, not the spec itself):


On 07/23/2015 06:35 PM, Greg Wilkins wrote:

> Specifically that it is asymmetric and an error in the
> middle of a chain of processors can be propagated downstream with
> onError(Throwable) but can only be propagated upstream with cancel().
>

If a call to onNext throws an exception, the only thing guaranteed
is that the subscription will be cancelled. (There are also some
words saying that the Subscriber is at that point non-compliant.)
But I don't see anything that stops any publisher/subscription
from doing something with that exception before/during the
cancellation. So should there be a SubmissionPublisher method
along the lines of:

   onSubscriberException(Consumer<? extends Throwable> handler);

-Doug






_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: Upcoming jdk9 j.u.c JEP

Greg Wilkins

Doug,

Note that the specification says that onNext should not throw an exception:

"the only legal way for a Subscriber to signal failure is by cancelling its Subscription"

However it also specifies what to do if that rule is broken:

In the case that this rule is violated, any associated Subscription to the Subscriber MUST be considered as cancelled, and the caller MUST raise this error condition in a fashion that is adequate for the runtime environment

So as there is no other legal outlet for failures in a subscriber, I'm guessing that lots of implementations will just throw unchecked exceptions from onNext (no point catching them if you have nothing you can do with them) and SubmissionPublisher needs to be able to deal with it....   which it currently does by feeding the exception back downstream by calling onError(Throwable)

I would think that it should call cancel() and then notify with a callback like you suggest (or perhaps the default onSubscriberException implementation should be cancel?).

So yes, I think the impl needs to be improved in this regards, but I also think this is a good example how not having a proper outlet for subscriber failures in the spec has confused an implementation.... but I'll take that discussion back to the issue.

cheers










On 25 July 2015 at 06:26, Doug Lea <[hidden email]> wrote:

I've been encouraging people to discuss spec issues at
  https://github.com/reactive-streams/reactive-streams-jvm/issues
just to keep them in one place.

But this one might impact SubmissionPublisher (a potential j.u.c
class, not the spec itself):


On 07/23/2015 06:35 PM, Greg Wilkins wrote:

Specifically that it is asymmetric and an error in the
middle of a chain of processors can be propagated downstream with
onError(Throwable) but can only be propagated upstream with cancel().


If a call to onNext throws an exception, the only thing guaranteed
is that the subscription will be cancelled. (There are also some
words saying that the Subscriber is at that point non-compliant.)
But I don't see anything that stops any publisher/subscription
from doing something with that exception before/during the
cancellation. So should there be a SubmissionPublisher method
along the lines of:

  onSubscriberException(Consumer<? extends Throwable> handler);


-Doug






_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest


_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: Upcoming jdk9 j.u.c JEP

Viktor Klang
Note that since it is a spec for async, the invocation of onNext (called a signal in the spec) is divorced from the processing of said signal.

Let's keep the discussion in the aforementioned Issue, to avoid confusion.

On Fri, Jul 24, 2015 at 11:59 PM, Greg Wilkins <[hidden email]> wrote:

Doug,

Note that the specification says that onNext should not throw an exception:

"the only legal way for a Subscriber to signal failure is by cancelling its Subscription"

However it also specifies what to do if that rule is broken:

In the case that this rule is violated, any associated Subscription to the Subscriber MUST be considered as cancelled, and the caller MUST raise this error condition in a fashion that is adequate for the runtime environment

So as there is no other legal outlet for failures in a subscriber, I'm guessing that lots of implementations will just throw unchecked exceptions from onNext (no point catching them if you have nothing you can do with them) and SubmissionPublisher needs to be able to deal with it....   which it currently does by feeding the exception back downstream by calling onError(Throwable)

I would think that it should call cancel() and then notify with a callback like you suggest (or perhaps the default onSubscriberException implementation should be cancel?).

So yes, I think the impl needs to be improved in this regards, but I also think this is a good example how not having a proper outlet for subscriber failures in the spec has confused an implementation.... but I'll take that discussion back to the issue.

cheers










On 25 July 2015 at 06:26, Doug Lea <[hidden email]> wrote:

I've been encouraging people to discuss spec issues at
  https://github.com/reactive-streams/reactive-streams-jvm/issues
just to keep them in one place.

But this one might impact SubmissionPublisher (a potential j.u.c
class, not the spec itself):


On 07/23/2015 06:35 PM, Greg Wilkins wrote:

Specifically that it is asymmetric and an error in the
middle of a chain of processors can be propagated downstream with
onError(Throwable) but can only be propagated upstream with cancel().


If a call to onNext throws an exception, the only thing guaranteed
is that the subscription will be cancelled. (There are also some
words saying that the Subscriber is at that point non-compliant.)
But I don't see anything that stops any publisher/subscription
from doing something with that exception before/during the
cancellation. So should there be a SubmissionPublisher method
along the lines of:

  onSubscriberException(Consumer<? extends Throwable> handler);


-Doug






_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest


_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest




--
Cheers,

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: Upcoming jdk9 j.u.c JEP

Doug Lea
In reply to this post by Doug Lea
On 07/23/2015 10:23 AM, Doug Lea wrote:
> API specs: http://gee.cs.oswego.edu/dl/jsr166/dist/docs/
> jar file: http://gee.cs.oswego.edu/dl/jsr166/dist/jsr166.jar (compiled using
>  Java8 javac).
> Browsable CVS sources:
> http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/src/main/java/util/
>

Thanks to those sending comments and suggestions! A few follow-ups:

>
> * If you are trying out updates with jdk9 early access,
> (https://jdk9.java.net/download/) notice that as of b73 last week, it by
> default uses the G1 collector, which is (currently, but hopefully just
> transiently) not usually the best choice for high-throughput concurrency. You
> probably want to use switches -XX:+UseParallelGC -XX:+UseCondCardMark
> -XX:-UseBiasedLocking

To elaborate a little: The G1 collector sometimes places memory
fences after reference writes. These tend to be most common where
they are least desirable in concurrent applications; for example
during producer-consumer element handoffs, which can significantly
impact performance. Some GC developers are looking into alternatives.

>
> ForkJoinPool ...  "Async" FJP mode is now probabilistically fair;

Thanks especially to Viktor Klang and Ron Pressler for helping
to refine this, and tease apart into two aspects (both of which
might be subject to further updates).

(1) Introducing a failsafe bound to avoid indefinite starvation
with unbounded loopy recursive tasks like ...
   class MyTask { ... void compute() { ... new MyTask().fork(); ...} }

The bound (of 1K tasks) forces worker threads to eventually try
looking elsewhere for tasks to execute.
By intent, eventually can be a long time.
The motivation is similar to spare-thread bounds -- to reduce
or avoid unbounded long-term resource issues without impacting
performance of well-behaved usages. There doesn't seem to be
a good reason to expose the bound as a tunable parameter.

(2) Providing a way to define tasks that might sometimes
prefer to poll external submissions rather than internal actions
like those in the above recursive loop. This amounts to exposing
ForkJoinPool.pollSubmission in ForkJoinTask, which in retrospect
was an oversight: pollSubmission is "protected" in ForkJoinPool,
but by Java protection rules was inaccessible from ForkJoinTask
subclasses, so needs an explicit relay method.

On 07/24/2015 02:52 PM, Doug Lea wrote:
> On 07/23/2015 02:55 PM, Dávid Karnok wrote:
>> Line 230: In RxJava, if the Subject is terminated with an exception, late
>> Subscribers will receive the exception but here, they just get completed.
>> Maybe it is worth considering this exception-replaying behavior.
>
> Thanks. I'm going to sit on this for the moment. One alternative is to add
> method isClosedExceptionally so that only those new Subscribers that care
> how it was closed need to arrange special handling.

I added the exception-replay -- it simplifies the closeExceptionally spec.
I also added an accessor for the exception for use in other contexts.

On 07/24/2015 04:26 PM, Doug Lea wrote:
> On 07/23/2015 06:35 PM, Greg Wilkins wrote:
>> Specifically that it is asymmetric and an error in the middle of a chain of
>> processors ....
>
> So should there be a SubmissionPublisher method along the lines of:
>
> onSubscriberException(Consumer<? extends Throwable> handler);
>

Actually, the handler (with two arguments) must be provided in
the constructor to be effective. Added.

I know that Greg still doesn't love some of the reactive-stream error
protocols, but adding this method covers the cases where a Publisher
or Processor knows how to cope with an onNext exception.

-Doug


_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: Upcoming jdk9 j.u.c JEP

oleksandr otenko
In reply to this post by Roland Kuhn-2
The direction of the data flow is not important, so upstream vs downstream are irrelevant. The important aspect is the control flow.

There are push-streams and there are pull-streams. The error should propagate to the initiator.

Alex

On 24/07/2015 08:12, Roland Kuhn wrote:
Hi Greg,

my reply has obviously opened two different discussions (namely “why are things as they are?” and “what is the suggested change all about”), I think it would be most fruitful if we stash the first one for now and come back to it after the second one has been understood better—at least by myself. That will put us into a better situation for judging the big picture.

Considering the flow of data from the DB via application and framework processors into the Servlet container, at any point along this line failures can happen. The component emitting the failure will use whatever means it has outside of Reactive Streams to log/audit/monitor and provide metrics, I assume that that is just part of all reasonable code; the database will do that, the application will do it, the framework will probably allow the application to configure how to do that, and the application server will be configured how to do that. This means that everyone can debug their own failures.

Data are flowing towards the Servlet (destined for whichever client made the request) and it is important to signal abnormal termination differently from normal termination, hence the onError propagation in this direction. This also allows downstream components to see failures coming from upstream, but this is a byproduct of needing to generate the right kind of final response to the external client. Now the interesting question is: why would the database need to know that some downstream component choked on the data it emitted? How exactly would this information be used by the database or its operators/programmers? Arguably the data exist and are “correct” by definition, guarded by Java types, and any validation errors that occur are not stream failures (cf. this definition) and should be treated as normal data elements and sent downstream (or filtered out, depending on the requirements & protocol).

I am deliberately painting with high contrast colors here in order to better understand what exactly it is that you want to achieve instead of just discussing the proposed solution, thanks for your patience!

Regards,

Roland

24 jul 2015 kl. 08:31 skrev Greg Wilkins <[hidden email]>:

Roland,

thanks for the response.

But I don't understand why you consider a terminal exception being notified upstream as a data flow?   It is data, but it is not a flow because it is terminal and cannot be used as a back channel.

Implementations of the API are already required to send data upstream:  Cancellation is a terminal boolean data state that must be sent upstream, and request(int) is a flow of integers that must be sent upstream [and as an aside, it is not beyond imagination that request(int) will be misused as a back channel for data - hey it might even get used to send an error code immediately prior/post to a cancel! ]

Thus I don't see that there is any significant additional complexity with that cancellation having a reason associated with it.   Implementations must already support upward bound data and any sequencing and/or race conditions that exist with cancel(Throwable) also exist with just cancel().
 
I also dispute that a Subscriber will be under the control of the Publisher.     In the example cited and application is providing a Processor, that is using a Publisher provided by a 3rd party database and an Subscriber provided by the Servlet container, with perhaps some framework provided Processors for serialization.   In this example there is the possibility of components from at least 4 difference code sources being combined in a chain that crosses deployment administration boundaries of: database, application and server.     The log & cancel handling of errors is going to be very difficult because many different log mechanism may be in use and access may not be easily achieved.  ie applications developers may not have full viability of database logs or servlet container logs.

The type of error I'm concerned about are all terminal style errors and not intended to be a back flow of data, nor acknowledgement of messages sent.   It is probably that the implementers of cancel(Throwable) would just log, cancel themselves and pass on the cancel(Throwable) to any of their Subscripions.   However the point being that would allow the reason for the failure to cross the administrative boundaries so that it can be known to all.

I think that any argument that can be made for not sending a Throwable upstream can equally be made for not sending one downstream (or for not having any exceptions in the java language).   Exceptions are very rarely handled in any meaningful way, but are extremely useful for passing details of a failure so that they may be known to all who may need to know.   

Without exceptions  I'm imagining many many  stack over flow questions like "Why was my Subscription cancelled?" followed by obligatory "RTFLog Stupid!" responses!

cheers

















On 24 July 2015 at 15:55, Roland Kuhn <[hidden email]> wrote:
Hi Greg,

the reasoning behind the asymmetric RS design is that this communication primitive targets unidirectional communication, bidirectional conversations would utilize two such streams running in opposite directions. This means that for a single stream data elements (of which onError is but a special one) flow downstream and only demand flows upstream. Publishers only need to know about when and if to produce the next element(s), hence we didn’t see a use-case for propagating more information than “N elements needed” and “no more elements needed”. 

If a single Reactive Stream could transport data upstream then we would need to implement back-pressure on that back channel as well, leading to the same complexity as having two RS running in opposite directions. Another reason why we made this separation lies in not burdening the API designers of conforming implementations with an impossible task: the combinators offered on stream transformation APIs flow with the (English) language from left to right and describe sequences of transformation stages but with data flowing upstream there would be the need for also describing how to handle that—even if it is “only” an error channel—and since these data flow in the opposite direction there would be no natural way to write this down.

Learning about the reason behind cancellation seems geared towards recovery in the sense that the Publisher would then construct and attach a different Subscriber afterwards—please let me know if you have something else in mind—and if you want to do that then the Subscriber will in any case be under the Publisher’s control and can use a different channel to communicate the onError signal back to the data source. Since that channel would transport data it would be a separate one flowing in the opposite direction as mentioned above, at least conceptually; with a single element like you describe it could well be a simpler callback mechanism and might not need full back-pressure.

I hope this clarifies some of the background behind the RS design. Please share more of your intended use of an error back-channel so that we can understand what exactly the upstream components would do with that data in the example case you mention.

Regards,

Roland

24 jul 2015 kl. 00:35 skrev Greg Wilkins <[hidden email]>:



On 24 July 2015 at 00:23, Doug Lea <[hidden email]> wrote:

* Reactive-stream users may be disappointed that we do not include any
net/IO-based Flow.Publisher/Subscriber classes, considering that
reactive-streams are mainly motivated by net-based frameworks.  The
reasons for triaging these out are that (1) IO generally falls outside
of java.util.concurrent (2) Most net-based frameworks seem to use
custom data representation etc (e.g., JSON) that are even further out
of scope.  However class SubmissionPublisher can be used as an adaptor
to turn just about any kind of source into a Publisher, so provides a
nearly universal way of constructing a good non-custom Publisher even
from IO-based sources.  (Also notice that SubmissionPublisher can
serve as the basis of other actor-like frameworks, including those
turning off back-pressure by calling
subscription.request(Long.MAX_VALUE) in onSubscribe).



Doug et al,

The Jetty project has been experimenting with the reactive streams API: https://github.com/jetty-project/jetty-reactive albiet not with the JDK-9 version of it, but inspired by the proposed inclusion of it.

We very much like the API and what it can bring to our space.  We don't see that it needs direct IO support and that it's power is actually bridging domains with a good asynchronous model that supports flow control.  

We've also begun some preliminary discussions about developing RS based proposal for the Servlet 4.0 specification.    Currently the Servlet API does well support asynchronous IO and behaviour, but the API is deceptively difficult to use correctly and gives no support for back pressure.   With RS's we can envisage solutions that look like:
  • A database provides a RS Producer that provides the large results of a query asynchronously from a remote database server
  • Some business logic is encapsulated as a RS Processor subscribed to the database producer
  • Some framework provided  Porocessors subscribe to the business logic Processor to perform a chain of functions such as serialization, compression
  • A container provided Subscriber terminates the chain and sends the resulting byte out over HTTP/HTTP2 or Websocket.   The flow control mechanisms of these protocols would be the basis of the RS back pressure. 
In such solutions, a full HTTP/2 flow control window would result in back pressure on the remote database server, allowing threadless waiting without unlimited queuing of data.

However, we have a significant concern with the API in that we do not like it's error handling design.  Specifically that it is asymmetric and an error in the middle of a chain of processors can be propagated downstream with onError(Throwable) but can only be propagated upstream with cancel().

We believe that cancel without reason is an insufficient semantic to build a robust ecosystem of RS Processors that can be used to build applications.   Consider the above example, it would be ideal if the object serialization was handled by a 3rd party Processor (let's say JSONEncodingProcessor). If the business logic erroneously sent an non-jsonable object, or if the JSON converter was incorrectly configured then the JSONEcondiingProcessor could encounter an error during its onNext(Object item) handling and it's only permitted handling of that is to cancel the stream, without explanation.

I have raised this as an issue on the RS github and it the current recommendation is to log and cancel: https://github.com/reactive-streams/reactive-streams-jvm/issues/271#issuecomment-121974544   However I believe that log and cancel is a insufficient semantic.   Logging in assembled applications is often fraught as each component provider will fight over which logging framework is best.  RS chains may cross jurisdictional boundaries and logs may not even be readily available.

The solution we see is to replace/augment cancel() with either cancel(Throwable reason) or an upstream onError(Throwable reason).  I acknowledge that the passed reason may not always be meaningful to the upstream processors and publishers, but it is better to ignore a meaningless reason than to be ignorant of a meaningful one.

When considering this API, we have to look beyond usages that work well and consider usages that will fail well also!

cheers

--
Greg Wilkins <[hidden email]>

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest

--
I'm a physicist: I have a basic working knowledge of the universe and everything it contains!
    - Sheldon Cooper (The Big Bang Theory)



--
I'm a physicist: I have a basic working knowledge of the universe and everything it contains!
    - Sheldon Cooper (The Big Bang Theory)



_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest


_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: Upcoming jdk9 j.u.c JEP

Viktor Klang
#define initiator

On Mon, Aug 10, 2015 at 12:00 PM, Oleksandr Otenko <[hidden email]> wrote:
The direction of the data flow is not important, so upstream vs downstream are irrelevant. The important aspect is the control flow.

There are push-streams and there are pull-streams. The error should propagate to the initiator.

Alex


On 24/07/2015 08:12, Roland Kuhn wrote:
Hi Greg,

my reply has obviously opened two different discussions (namely “why are things as they are?” and “what is the suggested change all about”), I think it would be most fruitful if we stash the first one for now and come back to it after the second one has been understood better—at least by myself. That will put us into a better situation for judging the big picture.

Considering the flow of data from the DB via application and framework processors into the Servlet container, at any point along this line failures can happen. The component emitting the failure will use whatever means it has outside of Reactive Streams to log/audit/monitor and provide metrics, I assume that that is just part of all reasonable code; the database will do that, the application will do it, the framework will probably allow the application to configure how to do that, and the application server will be configured how to do that. This means that everyone can debug their own failures.

Data are flowing towards the Servlet (destined for whichever client made the request) and it is important to signal abnormal termination differently from normal termination, hence the onError propagation in this direction. This also allows downstream components to see failures coming from upstream, but this is a byproduct of needing to generate the right kind of final response to the external client. Now the interesting question is: why would the database need to know that some downstream component choked on the data it emitted? How exactly would this information be used by the database or its operators/programmers? Arguably the data exist and are “correct” by definition, guarded by Java types, and any validation errors that occur are not stream failures (cf. this definition) and should be treated as normal data elements and sent downstream (or filtered out, depending on the requirements & protocol).

I am deliberately painting with high contrast colors here in order to better understand what exactly it is that you want to achieve instead of just discussing the proposed solution, thanks for your patience!

Regards,

Roland

24 jul 2015 kl. 08:31 skrev Greg Wilkins <[hidden email][hidden email]>:

Roland,

thanks for the response.

But I don't understand why you consider a terminal exception being notified upstream as a data flow?   It is data, but it is not a flow because it is terminal and cannot be used as a back channel.

Implementations of the API are already required to send data upstream:  Cancellation is a terminal boolean data state that must be sent upstream, and request(int) is a flow of integers that must be sent upstream [and as an aside, it is not beyond imagination that request(int) will be misused as a back channel for data - hey it might even get used to send an error code immediately prior/post to a cancel! ]

Thus I don't see that there is any significant additional complexity with that cancellation having a reason associated with it.   Implementations must already support upward bound data and any sequencing and/or race conditions that exist with cancel(Throwable) also exist with just cancel().
 
I also dispute that a Subscriber will be under the control of the Publisher.     In the example cited and application is providing a Processor, that is using a Publisher provided by a 3rd party database and an Subscriber provided by the Servlet container, with perhaps some framework provided Processors for serialization.   In this example there is the possibility of components from at least 4 difference code sources being combined in a chain that crosses deployment administration boundaries of: database, application and server.     The log & cancel handling of errors is going to be very difficult because many different log mechanism may be in use and access may not be easily achieved.  ie applications developers may not have full viability of database logs or servlet container logs.

The type of error I'm concerned about are all terminal style errors and not intended to be a back flow of data, nor acknowledgement of messages sent.   It is probably that the implementers of cancel(Throwable) would just log, cancel themselves and pass on the cancel(Throwable) to any of their Subscripions.   However the point being that would allow the reason for the failure to cross the administrative boundaries so that it can be known to all.

I think that any argument that can be made for not sending a Throwable upstream can equally be made for not sending one downstream (or for not having any exceptions in the java language).   Exceptions are very rarely handled in any meaningful way, but are extremely useful for passing details of a failure so that they may be known to all who may need to know.   

Without exceptions  I'm imagining many many  stack over flow questions like "Why was my Subscription cancelled?" followed by obligatory "RTFLog Stupid!" responses!

cheers

















On 24 July 2015 at 15:55, Roland Kuhn <[hidden email][hidden email]> wrote:
Hi Greg,

the reasoning behind the asymmetric RS design is that this communication primitive targets unidirectional communication, bidirectional conversations would utilize two such streams running in opposite directions. This means that for a single stream data elements (of which onError is but a special one) flow downstream and only demand flows upstream. Publishers only need to know about when and if to produce the next element(s), hence we didn’t see a use-case for propagating more information than “N elements needed” and “no more elements needed”. 

If a single Reactive Stream could transport data upstream then we would need to implement back-pressure on that back channel as well, leading to the same complexity as having two RS running in opposite directions. Another reason why we made this separation lies in not burdening the API designers of conforming implementations with an impossible task: the combinators offered on stream transformation APIs flow with the (English) language from left to right and describe sequences of transformation stages but with data flowing upstream there would be the need for also describing how to handle that—even if it is “only” an error channel—and since these data flow in the opposite direction there would be no natural way to write this down.

Learning about the reason behind cancellation seems geared towards recovery in the sense that the Publisher would then construct and attach a different Subscriber afterwards—please let me know if you have something else in mind—and if you want to do that then the Subscriber will in any case be under the Publisher’s control and can use a different channel to communicate the onError signal back to the data source. Since that channel would transport data it would be a separate one flowing in the opposite direction as mentioned above, at least conceptually; with a single element like you describe it could well be a simpler callback mechanism and might not need full back-pressure.

I hope this clarifies some of the background behind the RS design. Please share more of your intended use of an error back-channel so that we can understand what exactly the upstream components would do with that data in the example case you mention.

Regards,

Roland

24 jul 2015 kl. 00:35 skrev Greg Wilkins <[hidden email][hidden email]>:



On 24 July 2015 at 00:23, Doug Lea <[hidden email][hidden email]> wrote:

* Reactive-stream users may be disappointed that we do not include any
net/IO-based Flow.Publisher/Subscriber classes, considering that
reactive-streams are mainly motivated by net-based frameworks.  The
reasons for triaging these out are that (1) IO generally falls outside
of java.util.concurrent (2) Most net-based frameworks seem to use
custom data representation etc (e.g., JSON) that are even further out
of scope.  However class SubmissionPublisher can be used as an adaptor
to turn just about any kind of source into a Publisher, so provides a
nearly universal way of constructing a good non-custom Publisher even
from IO-based sources.  (Also notice that SubmissionPublisher can
serve as the basis of other actor-like frameworks, including those
turning off back-pressure by calling
subscription.request(Long.MAX_VALUE) in onSubscribe).



Doug et al,

The Jetty project has been experimenting with the reactive streams API: https://github.com/jetty-project/jetty-reactive albiet not with the JDK-9 version of it, but inspired by the proposed inclusion of it.

We very much like the API and what it can bring to our space.  We don't see that it needs direct IO support and that it's power is actually bridging domains with a good asynchronous model that supports flow control.  

We've also begun some preliminary discussions about developing RS based proposal for the Servlet 4.0 specification.    Currently the Servlet API does well support asynchronous IO and behaviour, but the API is deceptively difficult to use correctly and gives no support for back pressure.   With RS's we can envisage solutions that look like:
  • A database provides a RS Producer that provides the large results of a query asynchronously from a remote database server
  • Some business logic is encapsulated as a RS Processor subscribed to the database producer
  • Some framework provided  Porocessors subscribe to the business logic Processor to perform a chain of functions such as serialization, compression
  • A container provided Subscriber terminates the chain and sends the resulting byte out over HTTP/HTTP2 or Websocket.   The flow control mechanisms of these protocols would be the basis of the RS back pressure. 
In such solutions, a full HTTP/2 flow control window would result in back pressure on the remote database server, allowing threadless waiting without unlimited queuing of data.

However, we have a significant concern with the API in that we do not like it's error handling design.  Specifically that it is asymmetric and an error in the middle of a chain of processors can be propagated downstream with onError(Throwable) but can only be propagated upstream with cancel().

We believe that cancel without reason is an insufficient semantic to build a robust ecosystem of RS Processors that can be used to build applications.   Consider the above example, it would be ideal if the object serialization was handled by a 3rd party Processor (let's say JSONEncodingProcessor). If the business logic erroneously sent an non-jsonable object, or if the JSON converter was incorrectly configured then the JSONEcondiingProcessor could encounter an error during its onNext(Object item) handling and it's only permitted handling of that is to cancel the stream, without explanation.

I have raised this as an issue on the RS github and it the current recommendation is to log and cancel: https://github.com/reactive-streams/reactive-streams-jvm/issues/271#issuecomment-121974544   However I believe that log and cancel is a insufficient semantic.   Logging in assembled applications is often fraught as each component provider will fight over which logging framework is best.  RS chains may cross jurisdictional boundaries and logs may not even be readily available.

The solution we see is to replace/augment cancel() with either cancel(Throwable reason) or an upstream onError(Throwable reason).  I acknowledge that the passed reason may not always be meaningful to the upstream processors and publishers, but it is better to ignore a meaningless reason than to be ignorant of a meaningful one.

When considering this API, we have to look beyond usages that work well and consider usages that will fail well also!

cheers

--

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest

--
I'm a physicist: I have a basic working knowledge of the universe and everything it contains!
    - Sheldon Cooper (The Big Bang Theory)



--
I'm a physicist: I have a basic working knowledge of the universe and everything it contains!
    - Sheldon Cooper (The Big Bang Theory)



_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest


_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest




--
Cheers,

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: Upcoming jdk9 j.u.c JEP

oleksandr otenko
The initiator of the next data transfer. Push-stream - the producer initiates the transfer of the next portion of the data. Pull-stream - the consumer initiates the transfer of the next portion of the data. There are mixes of these, too (parser: token detected - producer pushes; now the token handling code pulls the rest of expression, but only the expression - the producer can't do that, because it doesn't know what the expression is).


Alex

On 10/08/2015 12:38, Viktor Klang wrote:
#define initiator

On Mon, Aug 10, 2015 at 12:00 PM, Oleksandr Otenko <[hidden email]> wrote:
The direction of the data flow is not important, so upstream vs downstream are irrelevant. The important aspect is the control flow.

There are push-streams and there are pull-streams. The error should propagate to the initiator.

Alex


On 24/07/2015 08:12, Roland Kuhn wrote:
Hi Greg,

my reply has obviously opened two different discussions (namely “why are things as they are?” and “what is the suggested change all about”), I think it would be most fruitful if we stash the first one for now and come back to it after the second one has been understood better—at least by myself. That will put us into a better situation for judging the big picture.

Considering the flow of data from the DB via application and framework processors into the Servlet container, at any point along this line failures can happen. The component emitting the failure will use whatever means it has outside of Reactive Streams to log/audit/monitor and provide metrics, I assume that that is just part of all reasonable code; the database will do that, the application will do it, the framework will probably allow the application to configure how to do that, and the application server will be configured how to do that. This means that everyone can debug their own failures.

Data are flowing towards the Servlet (destined for whichever client made the request) and it is important to signal abnormal termination differently from normal termination, hence the onError propagation in this direction. This also allows downstream components to see failures coming from upstream, but this is a byproduct of needing to generate the right kind of final response to the external client. Now the interesting question is: why would the database need to know that some downstream component choked on the data it emitted? How exactly would this information be used by the database or its operators/programmers? Arguably the data exist and are “correct” by definition, guarded by Java types, and any validation errors that occur are not stream failures (cf. this definition) and should be treated as normal data elements and sent downstream (or filtered out, depending on the requirements & protocol).

I am deliberately painting with high contrast colors here in order to better understand what exactly it is that you want to achieve instead of just discussing the proposed solution, thanks for your patience!

Regards,

Roland

24 jul 2015 kl. 08:31 skrev Greg Wilkins <[hidden email]>:

Roland,

thanks for the response.

But I don't understand why you consider a terminal exception being notified upstream as a data flow?   It is data, but it is not a flow because it is terminal and cannot be used as a back channel.

Implementations of the API are already required to send data upstream:  Cancellation is a terminal boolean data state that must be sent upstream, and request(int) is a flow of integers that must be sent upstream [and as an aside, it is not beyond imagination that request(int) will be misused as a back channel for data - hey it might even get used to send an error code immediately prior/post to a cancel! ]

Thus I don't see that there is any significant additional complexity with that cancellation having a reason associated with it.   Implementations must already support upward bound data and any sequencing and/or race conditions that exist with cancel(Throwable) also exist with just cancel().
 
I also dispute that a Subscriber will be under the control of the Publisher.     In the example cited and application is providing a Processor, that is using a Publisher provided by a 3rd party database and an Subscriber provided by the Servlet container, with perhaps some framework provided Processors for serialization.   In this example there is the possibility of components from at least 4 difference code sources being combined in a chain that crosses deployment administration boundaries of: database, application and server.     The log & cancel handling of errors is going to be very difficult because many different log mechanism may be in use and access may not be easily achieved.  ie applications developers may not have full viability of database logs or servlet container logs.

The type of error I'm concerned about are all terminal style errors and not intended to be a back flow of data, nor acknowledgement of messages sent.   It is probably that the implementers of cancel(Throwable) would just log, cancel themselves and pass on the cancel(Throwable) to any of their Subscripions.   However the point being that would allow the reason for the failure to cross the administrative boundaries so that it can be known to all.

I think that any argument that can be made for not sending a Throwable upstream can equally be made for not sending one downstream (or for not having any exceptions in the java language).   Exceptions are very rarely handled in any meaningful way, but are extremely useful for passing details of a failure so that they may be known to all who may need to know.   

Without exceptions  I'm imagining many many  stack over flow questions like "Why was my Subscription cancelled?" followed by obligatory "RTFLog Stupid!" responses!

cheers

















On 24 July 2015 at 15:55, Roland Kuhn <[hidden email]> wrote:
Hi Greg,

the reasoning behind the asymmetric RS design is that this communication primitive targets unidirectional communication, bidirectional conversations would utilize two such streams running in opposite directions. This means that for a single stream data elements (of which onError is but a special one) flow downstream and only demand flows upstream. Publishers only need to know about when and if to produce the next element(s), hence we didn’t see a use-case for propagating more information than “N elements needed” and “no more elements needed”. 

If a single Reactive Stream could transport data upstream then we would need to implement back-pressure on that back channel as well, leading to the same complexity as having two RS running in opposite directions. Another reason why we made this separation lies in not burdening the API designers of conforming implementations with an impossible task: the combinators offered on stream transformation APIs flow with the (English) language from left to right and describe sequences of transformation stages but with data flowing upstream there would be the need for also describing how to handle that—even if it is “only” an error channel—and since these data flow in the opposite direction there would be no natural way to write this down.

Learning about the reason behind cancellation seems geared towards recovery in the sense that the Publisher would then construct and attach a different Subscriber afterwards—please let me know if you have something else in mind—and if you want to do that then the Subscriber will in any case be under the Publisher’s control and can use a different channel to communicate the onError signal back to the data source. Since that channel would transport data it would be a separate one flowing in the opposite direction as mentioned above, at least conceptually; with a single element like you describe it could well be a simpler callback mechanism and might not need full back-pressure.

I hope this clarifies some of the background behind the RS design. Please share more of your intended use of an error back-channel so that we can understand what exactly the upstream components would do with that data in the example case you mention.

Regards,

Roland

24 jul 2015 kl. 00:35 skrev Greg Wilkins <[hidden email]>:



On 24 July 2015 at 00:23, Doug Lea <[hidden email]> wrote:

* Reactive-stream users may be disappointed that we do not include any
net/IO-based Flow.Publisher/Subscriber classes, considering that
reactive-streams are mainly motivated by net-based frameworks.  The
reasons for triaging these out are that (1) IO generally falls outside
of java.util.concurrent (2) Most net-based frameworks seem to use
custom data representation etc (e.g., JSON) that are even further out
of scope.  However class SubmissionPublisher can be used as an adaptor
to turn just about any kind of source into a Publisher, so provides a
nearly universal way of constructing a good non-custom Publisher even
from IO-based sources.  (Also notice that SubmissionPublisher can
serve as the basis of other actor-like frameworks, including those
turning off back-pressure by calling
subscription.request(Long.MAX_VALUE) in onSubscribe).



Doug et al,

The Jetty project has been experimenting with the reactive streams API: https://github.com/jetty-project/jetty-reactive albiet not with the JDK-9 version of it, but inspired by the proposed inclusion of it.

We very much like the API and what it can bring to our space.  We don't see that it needs direct IO support and that it's power is actually bridging domains with a good asynchronous model that supports flow control.  

We've also begun some preliminary discussions about developing RS based proposal for the Servlet 4.0 specification.    Currently the Servlet API does well support asynchronous IO and behaviour, but the API is deceptively difficult to use correctly and gives no support for back pressure.   With RS's we can envisage solutions that look like:
  • A database provides a RS Producer that provides the large results of a query asynchronously from a remote database server
  • Some business logic is encapsulated as a RS Processor subscribed to the database producer
  • Some framework provided  Porocessors subscribe to the business logic Processor to perform a chain of functions such as serialization, compression
  • A container provided Subscriber terminates the chain and sends the resulting byte out over HTTP/HTTP2 or Websocket.   The flow control mechanisms of these protocols would be the basis of the RS back pressure. 
In such solutions, a full HTTP/2 flow control window would result in back pressure on the remote database server, allowing threadless waiting without unlimited queuing of data.

However, we have a significant concern with the API in that we do not like it's error handling design.  Specifically that it is asymmetric and an error in the middle of a chain of processors can be propagated downstream with onError(Throwable) but can only be propagated upstream with cancel().

We believe that cancel without reason is an insufficient semantic to build a robust ecosystem of RS Processors that can be used to build applications.   Consider the above example, it would be ideal if the object serialization was handled by a 3rd party Processor (let's say JSONEncodingProcessor). If the business logic erroneously sent an non-jsonable object, or if the JSON converter was incorrectly configured then the JSONEcondiingProcessor could encounter an error during its onNext(Object item) handling and it's only permitted handling of that is to cancel the stream, without explanation.

I have raised this as an issue on the RS github and it the current recommendation is to log and cancel: https://github.com/reactive-streams/reactive-streams-jvm/issues/271#issuecomment-121974544   However I believe that log and cancel is a insufficient semantic.   Logging in assembled applications is often fraught as each component provider will fight over which logging framework is best.  RS chains may cross jurisdictional boundaries and logs may not even be readily available.

The solution we see is to replace/augment cancel() with either cancel(Throwable reason) or an upstream onError(Throwable reason).  I acknowledge that the passed reason may not always be meaningful to the upstream processors and publishers, but it is better to ignore a meaningless reason than to be ignorant of a meaningful one.

When considering this API, we have to look beyond usages that work well and consider usages that will fail well also!

cheers

--
Greg Wilkins <[hidden email]>

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest

--
I'm a physicist: I have a basic working knowledge of the universe and everything it contains!
    - Sheldon Cooper (The Big Bang Theory)



--
I'm a physicist: I have a basic working knowledge of the universe and everything it contains!
    - Sheldon Cooper (The Big Bang Theory)



_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest


_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest




--
Cheers,


_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
12