Re: jdk9 Candidate classes Flow and SubmissionPublisher

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Re: jdk9 Candidate classes Flow and SubmissionPublisher

Greg Wilkins

Hi,

I'm looking at the Flow classes as a potential better API for asynchronous IO than the current Servlet async API for the Jetty webserver.

However, it is not clear to me that the current proposed API is sufficient for types that have natural aggregations that would be natural to use.

Specifically to replace Servlet async IO, semantically I need a Flow.Subscriber<byte>  so that calls to Flow.Subscription.request(long n) can be expressed in terms of the number of bytes the subscriber is prepared to receive. However, delivering those bytes as successive calls to Flow.Subscriber.onNext(byte item) will be highly inefficient.

How should such aggregations be handled?   Note the same goes for characters in string Strings/CharSequences/Messages and probably for any Flow of objects where the size of the object can vary greatly and the ability to receive relies more on the size of the items rather than the number of them.


To me it makes little sense to actually declare the subscriber in terms of byte or character, as they are not the preferred delivery types.   Rather the type should be declared as the aggregate type (ByteBuffer, CharSequence, WebSocketMessage etc.)

What is then needed is for the subscriber to be able to request both the number and total size in a request. ie perhaps a method could be added to Flow.Subscription

  request(long items,long totalSize)

The semantic could be that a Subscriber could call either request method and a call to request(long n) would be equivalent to a call to request(n,Long.MAX_VALUE).

cheers





--
Greg Wilkins <[hidden email]>  - an Intalio.com subsidiary
http://eclipse.org/jetty HTTP, SPDY, Websocket server and client that scales
http://www.webtide.com  advice and support for jetty and cometd.

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: jdk9 Candidate classes Flow and SubmissionPublisher

Viktor Klang
Hi Greg,

May I suggest keeping the discussion in a single place, saves a lot of duplicate answers:


On Fri, May 29, 2015 at 2:52 AM, Greg Wilkins <[hidden email]> wrote:

Hi,

I'm looking at the Flow classes as a potential better API for asynchronous IO than the current Servlet async API for the Jetty webserver.

However, it is not clear to me that the current proposed API is sufficient for types that have natural aggregations that would be natural to use.

Specifically to replace Servlet async IO, semantically I need a Flow.Subscriber<byte>  so that calls to Flow.Subscription.request(long n) can be expressed in terms of the number of bytes the subscriber is prepared to receive. However, delivering those bytes as successive calls to Flow.Subscriber.onNext(byte item) will be highly inefficient.

How should such aggregations be handled?   Note the same goes for characters in string Strings/CharSequences/Messages and probably for any Flow of objects where the size of the object can vary greatly and the ability to receive relies more on the size of the items rather than the number of them.


To me it makes little sense to actually declare the subscriber in terms of byte or character, as they are not the preferred delivery types.   Rather the type should be declared as the aggregate type (ByteBuffer, CharSequence, WebSocketMessage etc.)

What is then needed is for the subscriber to be able to request both the number and total size in a request. ie perhaps a method could be added to Flow.Subscription

  request(long items,long totalSize)

The semantic could be that a Subscriber could call either request method and a call to request(long n) would be equivalent to a call to request(n,Long.MAX_VALUE).

cheers





--
Greg Wilkins <[hidden email]>  - an Intalio.com subsidiary
http://eclipse.org/jetty HTTP, SPDY, Websocket server and client that scales
http://www.webtide.com  advice and support for jetty and cometd.

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest




--
Cheers,

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: jdk9 Candidate classes Flow and SubmissionPublisher

Greg Wilkins

On 2 June 2015 at 20:56, Viktor Klang <[hidden email]> wrote:
Hi Greg,

May I suggest keeping the discussion in a single place, saves a lot of duplicate answers:



Viktor,

now that discussion on that issue has run to a reasonable conclusion on my initial concerns, is it more appropriate to continue general discussions here rather than as comments on a closed issue?

So with the jetty-reactive experiements we are doing, we are not looking at providing a full Reactive Streams implementation, but rather a library of Publishers, Subscribers and Processors aimed at adapting async services within jetty to the RS API.  This such as:
  • Servlet Async IO
  • Websockets
  • HTTP2 Push
  • Servlet Request and Responses as RSs

As the #270 issue shows, it is still very early days for us understanding the paradigm, but I think we've made a lot of progress and we're liking the API so far.

However, one concern that we currently have is the lack of acknowledgement of success or failure in the reverse direction to the RS.  

Receiving acknowledgement of success would make it possible for a publisher to recycle published items (eg reuse buffers or put them back in a buffer pool).   But perhaps more importantly, acknowledgement of failure is very important for making robust scalable servers.

Our experience with asynchronous code is that making it work well is the easy part, often taking a few short hours of coding. It is making it fail well that the really hard part and can take months of hard work to even understand all the failure modes, let alone deal with them properly without leaking resources etc.

So with the standard RS API, we can see how you can push failure notification down a stream using Subscriber.onError(Throwble),  but we can't see a standard way of sending failure notification up a stream.

For example, consider a Publisher connected to a chain of Processors, finishing with a final Subscriber that is actually an adapter over a servlet output stream.      Somewhere in that chain, there will be a processor that will serialize the item objects or convert them to JSON.   

Lets say there is an exception serializing one of these objects, then what should the processor do?   Well it can tell it's subscriber with onError(Throwable) that there was a problem, but in this case the subscriber can do very little with that knowledge other than log the throwable and close its servlet stream.    But there is no standard way for the processor to tell the publisher that send the bad object that there was a problem with the passed object.  

It can Subscription.cancel(), but that does not provide any information about what failed.  It can't just throw an exception as in an async environment, the processing of the objects passed may often end up being in the calling stack of the servlet container calling onWritePossible which calls subscription.request(n).  Thus throwing an exception may end up going down the stream rather than up.

So why is onError on the Subscriber and cancel on Subscription?   For our initial thinking on error handling it should be on the other way around so exceptions go up stream not down?  Our rough early thinking is that it would make more sense to have:

Publisher{
  void subscribe(Subscriber<T> s);
}

Subscriber{
  void onSubsciption(Subscription<T> s);
  void onNext(T item);
  void onCompleted();
  void cancel();
}

Subscription{
  void request(long n);
  void acknowledge(T item);
  void onError(Throwable x);
}

Thus exceptions would propagate upstream and cancellation would go downstream, which is the opposite of what the API has today.   I've also added an upstream acknowledge(T item) to support success notification that would allow transaction to be closed, buffers to be recycled etc. etc.

Now, having just been schooled in the whole way this API works asynchronously... I'm expecting that I've perhaps not understood how error handling is intended to be used.

cheers






















--
Greg Wilkins <[hidden email]>  - an Intalio.com subsidiary
http://eclipse.org/jetty HTTP, SPDY, Websocket server and client that scales
http://www.webtide.com  advice and support for jetty and cometd.

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: jdk9 Candidate classes Flow and SubmissionPublisher

Viktor Klang
Hi Greg,

Thanks for the feedback!

RS is as minimalistic as possible, focusing on solving a well-defined and narrow set of problems, so that more sophisticated solutions can be built on top. This is why communication is essentially uni-directional (you can have a pair of unidirectionals to get bidirectionality).

On Thu, Jun 4, 2015 at 1:19 AM, Greg Wilkins <[hidden email]> wrote:

On 2 June 2015 at 20:56, Viktor Klang <[hidden email]> wrote:
Hi Greg,

May I suggest keeping the discussion in a single place, saves a lot of duplicate answers:



Viktor,

now that discussion on that issue has run to a reasonable conclusion on my initial concerns, is it more appropriate to continue general discussions here rather than as comments on a closed issue?

Having read your email I'll have to disagree, since you/we are talking about why RS wasn't implemented in some other way I think it still belongs as a discussion over at RS—to keep things close to where they belong. Would you agree?
 

So with the jetty-reactive experiements we are doing, we are not looking at providing a full Reactive Streams implementation, but rather a library of Publishers, Subscribers and Processors

There's no such thing as a "full Reactive Streams" implementation, if you implement Publisher, Processor or Subscriber and it passes the TCK it is for all intents and purposes an implementation of Reactive Streams :)
 
aimed at adapting async services within jetty to the RS API.  This such as:
  • Servlet Async IO
  • Websockets
  • HTTP2 Push
  • Servlet Request and Responses as RSs

As the #270 issue shows, it is still very early days for us understanding the paradigm, but I think we've made a lot of progress and we're liking the API so far.


I'm delighted to hear this! Keep the feedback flowing
 

However, one concern that we currently have is the lack of acknowledgement of success or failure in the reverse direction to the RS.

This is by design. Acks are expensive as they double the number of messages that have to travel (think of Reactive Streams as a protocol), so if you need to ack every message you'll have to send twice as many messages. This is too constraining in the general sense. If you want that kind of functionality it is rather trivial to use 2 "flows" one downstream and one upstream.

 

Receiving acknowledgement of success would make it possible for a publisher to recycle published items (eg reuse buffers or put them back in a buffer pool).   But perhaps more importantly, acknowledgement of failure is very important for making robust scalable servers.

Since RS does not mandate that the Subscriber runs locally, assuming that elements are always shared without copying/marshalling is not appropriate or feasible. Also, for the "object pooling" use-case, you also need to go to great lengths to avoid aliasing of the elements.

 

Our experience with asynchronous code is that making it work well is the easy part, often taking a few short hours of coding. It is making it fail well that the really hard part and can take months of hard work to even understand all the failure modes, let alone deal with them properly without leaking resources etc.


Absolutely! Failure management strategy was at the very core of the spec. (As a sidenote, I think we as an industry have been focusing waaay too much on "blue skies" solutions)
 

So with the standard RS API, we can see how you can push failure notification down a stream using Subscriber.onError(Throwble),  but we can't see a standard way of sending failure notification up a stream.

Because it is not the responsibility of the Publisher to deal with the failures of the Subscriber—there is nothing it could do about it.
`onError` is the signal that travels downstream to notify the Subscriber that there will be no more elements coming, because of a failure, not because of End-Of-Stream.

 

For example, consider a Publisher connected to a chain of Processors, finishing with a final Subscriber that is actually an adapter over a servlet output stream.      Somewhere in that chain, there will be a processor that will serialize the item objects or convert them to JSON.   

Lets say there is an exception serializing one of these objects, then what should the processor do?   Well it can tell it's subscriber with onError(Throwable) that there was a problem, but in this case the subscriber can do very little with that knowledge other than log the throwable and close its servlet stream.    But there is no standard way for the processor to tell the publisher that send the bad object that there was a problem with the passed object.  

If serialization errors are to be considered fatal to the processing, it should immediately cancel its upstream subscription and signal onError downstream so that the entire pipeline can clean itself up.

If serialization is not to be considered fatal you have 2 choices:

A) Represent "errors" (i.e. non-fatal situations) in the element type downstream—this means that the stream will not have any gaps.
B) Divert "errors" to a side-channel (log etc)—this means that the stream will have "gaps".
 

It can Subscription.cancel(), but that does not provide any information about what failed.  It can't just throw an exception as in an async environment, the processing of the objects passed may often end up being in the calling stack of the servlet container calling onWritePossible which calls subscription.request(n).  Thus throwing an exception may end up going down the stream rather than up.


The spec prescribes where exceptions are and aren't allowed to be thrown: https://github.com/reactive-streams/reactive-streams-jvm#specification
 

So why is onError on the Subscriber and cancel on Subscription?

The Publisher can not do anything about -why- the Subscriber is unable to continue it; onError is a fast-path teardown signal whereas onComplete is an ordered teardown.

   For our initial thinking on error handling it should be on the other way around so exceptions go up stream not down?  Our rough early thinking is that it would make more sense to have:

Publisher{
  void subscribe(Subscriber<T> s);
}

Subscriber{
  void onSubsciption(Subscription<T> s);
  void onNext(T item);
  void onCompleted();
  void cancel();
}

Subscription{
  void request(long n);
  void acknowledge(T item);
  void onError(Throwable x);
}

Thus exceptions would propagate upstream and cancellation would go downstream, which is the opposite of what the API has today.  

If I udnerstand this proposal correctly it means that Subscribers will never know if there's an EOS or Error upstream?
 

I've also added an upstream acknowledge(T item) to support success notification that would allow transaction to be closed, buffers to be recycled etc. etc.

This not only means that traffic will double (sending items both back and forth across the medium), but it also means that Publishers will need to retain references to all elements they send that have not been acknowledged, which will have quite drastic effects on memory consumption, especially when the element traverses a binary boundary (serialization/marshalling).

Now, having just been schooled in the whole way this API works asynchronously... I'm expecting that I've perhaps not understood how error handling is intended to be used.

I hope things are clearer,
I suspect you'll find RS easier to reason about if you think of it as a protocol rather than "just" methods and interfaces.

Let's continue this discussion over at RS!
 

cheers






















--
Greg Wilkins <[hidden email]>  - an Intalio.com subsidiary
http://eclipse.org/jetty HTTP, SPDY, Websocket server and client that scales
http://www.webtide.com  advice and support for jetty and cometd.



--
Cheers,

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: jdk9 Candidate classes Flow and SubmissionPublisher

Viktor Klang

Noticed that I lost a sentence in editing, adding below

On 4 Jun 2015 12:55, "Viktor Klang" <[hidden email]> wrote:
>
> Hi Greg,
>
> Thanks for the feedback!
>
> RS is as minimalistic as possible, focusing on solving a well-defined and narrow set of problems, so that more sophisticated solutions can be built on top. This is why communication is essentially uni-directional (you can have a pair of unidirectionals to get bidirectionality).
>
> On Thu, Jun 4, 2015 at 1:19 AM, Greg Wilkins <[hidden email]> wrote:
>>
>>
>> On 2 June 2015 at 20:56, Viktor Klang <[hidden email]> wrote:
>>>
>>> Hi Greg,
>>>
>>> May I suggest keeping the discussion in a single place, saves a lot of duplicate answers:
>>>
>>> https://github.com/reactive-streams/reactive-streams-jvm/issues/270
>>>
>>
>> Viktor,
>>
>> now that discussion on that issue has run to a reasonable conclusion on my initial concerns, is it more appropriate to continue general discussions here rather than as comments on a closed issue?
>
>
> Having read your email I'll have to disagree, since you/we are talking about why RS wasn't implemented in some other way I think it still belongs as a discussion over at RS—to keep things close to where they belong. Would you agree?
>  
>>
>>
>> So with the jetty-reactive experiements we are doing, we are not looking at providing a full Reactive Streams implementation, but rather a library of Publishers, Subscribers and Processors
>
>
> There's no such thing as a "full Reactive Streams" implementation, if you implement Publisher, Processor or Subscriber and it passes the TCK it is for all intents and purposes an implementation of Reactive Streams :)
>  
>>
>> aimed at adapting async services within jetty to the RS API.  This such as:
>> Servlet Async IO
>> Websockets
>> HTTP2 Push
>> Servlet Request and Responses as RSs
>>
>> As the #270 issue shows, it is still very early days for us understanding the paradigm, but I think we've made a lot of progress and we're liking the API so far.
>
>
> I'm delighted to hear this! Keep the feedback flowing
>  
>>
>> However, one concern that we currently have is the lack of acknowledgement of success or failure in the reverse direction to the RS.
>
> This is by design. Acks are expensive as they double the number of messages that have to travel (think of Reactive Streams as a protocol), so if you need to ack every message you'll have to send twice as many messages. This is too constraining in the general sense. If you want that kind of functionality it is rather trivial to use 2 "flows" one downstream and one upstream.
>
>  
>>
>> Receiving acknowledgement of success would make it possible for a publisher to recycle published items (eg reuse buffers or put them back in a buffer pool).   But perhaps more importantly, acknowledgement of failure is very important for making robust scalable servers.
>
> Since RS does not mandate that the Subscriber runs locally, assuming that elements are always shared without copying/marshalling is not appropriate or feasible. Also, for the "object pooling" use-case, you also need to go to great lengths to avoid aliasing of the elements.
>
>  
>>
>> Our experience with asynchronous code is that making it work well is the easy part, often taking a few short hours of coding. It is making it fail well that the really hard part and can take months of hard work to even understand all the failure modes, let alone deal with them properly without leaking resources etc.
>
>
> Absolutely! Failure management strategy was at the very core of the spec. (As a sidenote, I think we as an industry have been focusing waaay too much on "blue skies" solutions)
>  
>>
>> So with the standard RS API, we can see how you can push failure notification down a stream using Subscriber.onError(Throwble),  but we can't see a standard way of sending failure notification up a stream.
>
> Because it is not the responsibility of the Publisher to deal with the failures of the Subscriber—there is nothing it could do about it.
> `onError` is the signal that travels downstream to notify the Subscriber that there will be no more elements coming, because of a failure, not because of End-Of-Stream.
>
>  
>>
>> For example, consider a Publisher connected to a chain of Processors, finishing with a final Subscriber that is actually an adapter over a servlet output stream.      Somewhere in that chain, there will be a processor that will serialize the item objects or convert them to JSON.   
>>
>> Lets say there is an exception serializing one of these objects, then what should the processor do?   Well it can tell it's subscriber with onError(Throwable) that there was a problem, but in this case the subscriber can do very little with that knowledge other than log the throwable and close its servlet stream.    But there is no standard way for the processor to tell the publisher that send the bad object that there was a problem with the passed object.  
>
> If serialization errors are to be considered fatal to the processing, it should immediately cancel its upstream subscription and signal onError downstream so that the entire pipeline can clean itself up.
>
> If serialization is not to be considered fatal you have 2 choices:
>
> A) Represent "errors" (i.e. non-fatal situations) in the element type downstream—this means that the stream will not have any gaps.
> B) Divert "errors" to a side-channel (log etc)—this means that the stream will have "gaps".
>  
>>
>> It can Subscription.cancel(), but that does not provide any information about what failed.  It can't just throw an exception as in an async environment, the processing of the objects passed may often end up being in the calling stack of the servlet container calling onWritePossible which calls subscription.request(n).  Thus throwing an exception may end up going down the stream rather than up.
>
>
> The spec prescribes where exceptions are and aren't allowed to be thrown: https://github.com/reactive-streams/reactive-streams-jvm#specification
>  
>>
>> So why is onError on the Subscriber and cancel on Subscription?
>
> The Publisher can not do anything about -why- the Subscriber is unable to continue it; onError is a fast-path teardown signal whereas onComplete is an ordered teardown.
>
>>    For our initial thinking on error handling it should be on the other way around so exceptions go up stream not down?  Our rough early thinking is that it would make more sense to have:
>>
>> Publisher{
>>   void subscribe(Subscriber<T> s);
>> }
>>
>> Subscriber{
>>   void onSubsciption(Subscription<T> s);
>>   void onNext(T item);
>>   void onCompleted();
>>   void cancel();
>> }
>>
>> Subscription{
>>   void request(long n);
>>   void acknowledge(T item);
>>   void onError(Throwable x);
>> }
>>
>> Thus exceptions would propagate upstream and cancellation would go downstream, which is the opposite of what the API has today.  
>
> If I udnerstand this proposal correctly it means that Subscribers will never know if there's an EOS or Error upstream?

Or does it mean that cancel == onError(new CancellationException)?

>  
>>
>> I've also added an upstream acknowledge(T item) to support success notification that would allow transaction to be closed, buffers to be recycled etc. etc.
>
> This not only means that traffic will double (sending items both back and forth across the medium), but it also means that Publishers will need to retain references to all elements they send that have not been acknowledged, which will have quite drastic effects on memory consumption, especially when the element traverses a binary boundary (serialization/marshalling).
>>
>> Now, having just been schooled in the whole way this API works asynchronously... I'm expecting that I've perhaps not understood how error handling is intended to be used.
>
> I hope things are clearer,
> I suspect you'll find RS easier to reason about if you think of it as a protocol rather than "just" methods and interfaces.
>
> Let's continue this discussion over at RS!
>  
>>
>> cheers
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> --
>> Greg Wilkins <[hidden email]>  - an Intalio.com subsidiary
>> http://eclipse.org/jetty HTTP, SPDY, Websocket server and client that scales
>> http://www.webtide.com  advice and support for jetty and cometd.
>
>
>
>
> --
> Cheers,
> √


_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: jdk9 Candidate classes Flow and SubmissionPublisher

Greg Wilkins
In reply to this post by Viktor Klang

On 4 June 2015 at 20:55, Viktor Klang <[hidden email]> wrote:
Having read your email I'll have to disagree, since you/we are talking about why RS wasn't implemented in some other way I think it still belongs as a discussion over at RS—to keep things close to where they belong. Would you agree?

cheers

--
Greg Wilkins <[hidden email]>  - an Intalio.com subsidiary
http://eclipse.org/jetty HTTP, SPDY, Websocket server and client that scales
http://www.webtide.com  advice and support for jetty and cometd.

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest