Re: jdk9 Candidate classes Flow and SubmissionPublisher

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Re: jdk9 Candidate classes Flow and SubmissionPublisher

Doug Lea
On 03/18/2016 02:22 PM, Marcos Boyington wrote:
> Hi Doug,
>
> So I'm a big fan of the new reactive streams interfaces coming in java9, and I
> know I'm a little bit late to the party as it's already been ratified; but we've
> been playing with the interfaces from http://www.reactive-streams.org/ (on which
> the java9 stuff is based), and we've come up with a couple of interesting issues
> which I wanted to discuss with you.

... and with other reactive-stream contributors on this list.

>
> I see the Flow classes as the equivalent of multi-value ListenableFuture's (or
> CompletableFuture, in java8 terms), with the difference being that it's of
> course multi-valued.  Much like futures, a Processor<T1> can be transformed
> (element-by-element) to Processor<T2>, etc.
>
> The multi-value aspect of course brings some complications, which is where we've
> run into some problems.
>
> The first being value retainment: unlike a future, you can't just cache the
> value of a transformation, because the stream could be very large.

I think you are asking for backlog control? (see below.)

>
> The second being stream cancellation: as you probably know, ListenableFuture
> propagates cancellations (I haven't played with CompletableFuture yet so unsure
> what it does).  This means that if I cancel the result of a transformation, it
> propagates to the original source stream.
>
> Flow.Publisher of course does not have any way to do this, because there is no
> way to "cancel" the stream, only unsubscribe from it. (Cancellations are useful,
> for example, if we have transformToX -> transformToY -> make an RPC to some
> service, and we cancel transformToX).

Not all Publishers will be able to support this, but the
SubmissionPublisher utility class does, so you could use it
as a basis for classes that do this.

>
> A colleague of mine came up with a novel solution to both of these: in essence
> adding a method ~like this to Flow.Publisher:
>
> CompletableFuture<Void> start();
>
> Which solves both problems: it removes the need for caching, because one can
> make certain that all subscribers are added before calling start() (with an
> exception being thrown if a subscriber is added after start()), and it allows
> the stream to be cancelled by calling cancel() on the resulting future.

There are several policy issues here surrounding submissions that occur
before start() is invoked.

Unless you are content with throwing an exception, the publisher must
maintain a replay backlog.

I contemplated also including a ReplayablePublisher utility class that
could be used along with a SubmissionPublisher to deal with many of
these scenarios. But we didn't yet have enough experience about the
range of forms that might be needed: full replay on subscribe?
Bounded caching? Batched catchup? Ideas would be welcome.

-Doug

>
> Of course I understand this wouldn't make it to java9; perhaps it doesn't even
> make sense to add it directly to Publisher, but rather have a new
> ControllablePublisher interface for this.
>
> What are your thoughts?
> Thanks,
> Marcos

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: jdk9 Candidate classes Flow and SubmissionPublisher

Dávid Karnok
Hi Marcos,

2016-03-19 12:20 GMT+01:00 Doug Lea <[hidden email]>:
On 03/18/2016 02:22 PM, Marcos Boyington wrote:
Hi Doug,

So I'm a big fan of the new reactive streams interfaces coming in java9, and I
know I'm a little bit late to the party as it's already been ratified; but we've
been playing with the interfaces from http://www.reactive-streams.org/ (on which
the java9 stuff is based), and we've come up with a couple of interesting issues
which I wanted to discuss with you.

... and with other reactive-stream contributors on this list.

such as myself.
 

I see the Flow classes as the equivalent of multi-value ListenableFuture's (or
CompletableFuture, in java8 terms), with the difference being that it's of
course multi-valued. 

It's a bit of a stretch but okay.
 
Much like futures, a Processor<T1> can be transformed
(element-by-element) to Processor<T2>, etc

Processor itself is some sort of transformation from T to U, element-wise. But transforming Processor<T1, T2> to Processor<T3, T4> is more involved as it requires a mapping from T3 to T1 and T2 to T4. 
 

The multi-value aspect of course brings some complications, which is where we've
run into some problems. 
The first being value retainment: unlike a future, you can't just cache the
value of a transformation, because the stream could be very large.

I think you are asking for backlog control? (see below.)

I guess you mean caching where some source is evaluated once and then replayed to subscribers online/offline without triggering the evaluation all over again. Caching long streams or large data needs consideration and handling, such as bounded caching (fixed amount, fixed timespan, etc.)
 

The second being stream cancellation: as you probably know, ListenableFuture
propagates cancellations (I haven't played with CompletableFuture yet so unsure
what it does).  This means that if I cancel the result of a transformation, it
propagates to the original source stream.

Yes. 

Flow.Publisher of course does not have any way to do this, because there is no
way to "cancel" the stream, only unsubscribe from it. (Cancellations are useful,
for example, if we have transformToX -> transformToY -> make an RPC to some
service, and we cancel transformToX).

This is what the Subscription interface is for. A Publisher creates a Subscription, calls onSubscribe on the Subscriber and then the Publisher watches its Subscription (which usually a subclass with extra behavior) to see if the downstream Subscriber had enough. The downstream subscriber then can call cancel() at any time. Of course, if you chain operations, the cancellation has to be chained as well.

 
Not all Publishers will be able to support this, but the
SubmissionPublisher utility class does, so you could use it
as a basis for classes that do this.


A colleague of mine came up with a novel solution to both of these: in essence
adding a method ~like this to Flow.Publisher:

CompletableFuture<Void> start();

No. I feel you misunderstand the Publisher. It represents a deferred emission of values and if you have a Publisher (usually) nothing happens until you subscribe to it. Thus, there is no sense cancelling just a Publisher.
 
Which solves both problems: it removes the need for caching, because one can
make certain that all subscribers are added before calling start() (with an
exception being thrown if a subscriber is added after start()), and it allows
the stream to be cancelled by calling cancel() on the resulting future.


What you describe here is a cold-hot Publisher conversion indeed may use an extra imperative method to establish the connection. But not all Publishers need this feature. In addition, your method returns a future which doesn't work with synchronous cancellation in a multicast use.
 
There are several policy issues here surrounding submissions that occur
before start() is invoked.

Unless you are content with throwing an exception, the publisher must
maintain a replay backlog.

I contemplated also including a ReplayablePublisher utility class that
could be used along with a SubmissionPublisher to deal with many of
these scenarios. But we didn't yet have enough experience about the
range of forms that might be needed: full replay on subscribe?
Bounded caching? Batched catchup? Ideas would be welcome.

-Doug

Just use Reactor-Core or RxJava 2.x (as inspiration). The issues mentioned so far are neatly handled in them. (Plus, there is also https://github.com/akarnokd/RxJavaUtilConcurrentFlow )


Of course I understand this wouldn't make it to java9; perhaps it doesn't even
make sense to add it directly to Publisher, but rather have a new
ControllablePublisher interface for this.
What are your thoughts?

I don't think Java 9 should have too many utilities over Flow for two reasons: 1) it may take years to improve upon them whereas libraries allow much frequent updates 2) reuse vs. performance where I'm afraid many start thinking in chains of SubmissionPublishers with mandatory async boundaries, queues and blocking.
 
Thanks,
Marcos

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest



--
Best regards,
David Karnok

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: jdk9 Candidate classes Flow and SubmissionPublisher

Roland Kuhn-2

19 mar 2016 kl. 13:30 skrev Dávid Karnok <[hidden email]>:

Hi Marcos,

2016-03-19 12:20 GMT+01:00 Doug Lea <[hidden email]>:
On 03/18/2016 02:22 PM, Marcos Boyington wrote:
Hi Doug,

So I'm a big fan of the new reactive streams interfaces coming in java9, and I
know I'm a little bit late to the party as it's already been ratified; but we've
been playing with the interfaces from http://www.reactive-streams.org/ (on which
the java9 stuff is based), and we've come up with a couple of interesting issues
which I wanted to discuss with you.

... and with other reactive-stream contributors on this list.

such as myself.
 

I see the Flow classes as the equivalent of multi-value ListenableFuture's (or
CompletableFuture, in java8 terms), with the difference being that it's of
course multi-valued.  

This is not a helpful way to look at this abstraction, you will find yourself disappointed and confused if you insist on setting out from this starting point.

A Publisher represents the ability to create a stream of elements that can be obtained asynchronously. A Subscriber represents an element stream receiver that is in full control of its incoming data rate. The Subscription is the link between these two via which the data exchange is mediated.

This means that Reactive Streams are inherently about a communication protocol, whereas composable Futures model dependency-driven evaluation of computation chains for a single value—these are very different abstractions with very different goals.


It's a bit of a stretch but okay.
 
Much like futures, a Processor<T1> can be transformed
(element-by-element) to Processor<T2>, etc

Processor itself is some sort of transformation from T to U, element-wise.

Small correction: a Processor consumes elements of one type (by virtue of being a Subscriber) and it produces elements of a possibly different type (by virtue of being a Publisher), but there is no prescription about the relationship between the ingested and emitted data streams—they could be entirely unrelated and they often are not 1:1 element transformations (for a very simple example consider the `filter()` combinator of any stream processing library near you).

But transforming Processor<T1, T2> to Processor<T3, T4> is more involved as it requires a mapping from T3 to T1 and T2 to T4. 
 

The multi-value aspect of course brings some complications, which is where we've
run into some problems. 
The first being value retainment: unlike a future, you can't just cache the
value of a transformation, because the stream could be very large.

I think you are asking for backlog control? (see below.)

I guess you mean caching where some source is evaluated once and then replayed to subscribers online/offline without triggering the evaluation all over again. Caching long streams or large data needs consideration and handling, such as bounded caching (fixed amount, fixed timespan, etc.)
 

The second being stream cancellation: as you probably know, ListenableFuture
propagates cancellations (I haven't played with CompletableFuture yet so unsure
what it does).  This means that if I cancel the result of a transformation, it
propagates to the original source stream.

Yes. 

Flow.Publisher of course does not have any way to do this, because there is no
way to "cancel" the stream, only unsubscribe from it. (Cancellations are useful,
for example, if we have transformToX -> transformToY -> make an RPC to some
service, and we cancel transformToX).

This is what the Subscription interface is for. A Publisher creates a Subscription, calls onSubscribe on the Subscriber and then the Publisher watches its Subscription (which usually a subclass with extra behavior) to see if the downstream Subscriber had enough. The downstream subscriber then can call cancel() at any time. Of course, if you chain operations, the cancellation has to be chained as well.

Exactly: the propagation of cancellation goes beyond the pure Publisher–Subscriber relationship that is specified for Reactive Streams, but by common sense all known implementations that support chains of Processors also make sure to pass along cancellation—just like completion or failure are passed in the other direction.


 
Not all Publishers will be able to support this, but the
SubmissionPublisher utility class does, so you could use it
as a basis for classes that do this.


A colleague of mine came up with a novel solution to both of these: in essence
adding a method ~like this to Flow.Publisher:

CompletableFuture<Void> start();

No. I feel you misunderstand the Publisher. It represents a deferred emission of values and if you have a Publisher (usually) nothing happens until you subscribe to it. Thus, there is no sense cancelling just a Publisher.
 
Which solves both problems: it removes the need for caching, because one can
make certain that all subscribers are added before calling start() (with an
exception being thrown if a subscriber is added after start()), and it allows
the stream to be cancelled by calling cancel() on the resulting future.


What you describe here is a cold-hot Publisher conversion indeed may use an extra imperative method to establish the connection. But not all Publishers need this feature. In addition, your method returns a future which doesn't work with synchronous cancellation in a multicast use.
 
There are several policy issues here surrounding submissions that occur
before start() is invoked.

Unless you are content with throwing an exception, the publisher must
maintain a replay backlog.

I contemplated also including a ReplayablePublisher utility class that
could be used along with a SubmissionPublisher to deal with many of
these scenarios. But we didn't yet have enough experience about the
range of forms that might be needed: full replay on subscribe?
Bounded caching? Batched catchup? Ideas would be welcome.

-Doug

Just use Reactor-Core or RxJava 2.x (as inspiration). The issues mentioned so far are neatly handled in them. (Plus, there is also https://github.com/akarnokd/RxJavaUtilConcurrentFlow )

Since this list has been started: you may want to look at Akka Streams (possible also the design goals or the quickstart guide) for a rather different interpretation of the user-level API that still is solidly grounded in the Reactive Streams contract.

Regards,

Roland



Of course I understand this wouldn't make it to java9; perhaps it doesn't even
make sense to add it directly to Publisher, but rather have a new
ControllablePublisher interface for this.
What are your thoughts?

I don't think Java 9 should have too many utilities over Flow for two reasons: 1) it may take years to improve upon them whereas libraries allow much frequent updates 2) reuse vs. performance where I'm afraid many start thinking in chains of SubmissionPublishers with mandatory async boundaries, queues and blocking.
 
Thanks,
Marcos

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest



-- 
Best regards,
David Karnok
_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest

--
[scala-debate on 2009/10/2]
Viktor Klang: When will the days of numerical overflow be gone?
Ricky Clarkson: One second after 03:14:07 UTC on Tuesday, 19 January 2038


_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: jdk9 Candidate classes Flow and SubmissionPublisher

Doug Lea
In reply to this post by Doug Lea
On 03/21/2016 01:50 PM, Marcos Boyington wrote:

>
> Thanks! That makes a lot of sense; but when combining SubmissionPublisher
> with Processor is where things get interesting (and this is ~ what we're
> doing). If we transform a SubmissionPublisher<T> with a Processor<T, T2>, it
> would be nice to call close() on the resulting Processor and have it
> propagate to the source SubmissionPublisher.  Unfortunately, there's no
> shared interface which we can use here.
>
> That is ~ what I was suggesting with ControllablePublisher; an interface
> which is in essence Publisher + AutoCloseable, so that we have a shared
> interface between SubmissionPublisher and any other Publishers which are
> closeable.

As others mentioned, subscription.cancel propagates so that you
(eventually) stop getting items. I gather that you want to close here
to free up resources, but only if the source supports close().
Since you'd need to do type-testing anyway, adding an interface
wouldn't be better than:
   void closeIfCloseable(Object p) {
     if (p instanceof AutoCloseable) ((AutoCloseable)p).close();
   }

And even then, I'm not sure when you'd call this, because there may be
multiple subscribers or potential future subscribers.

>
>
> I contemplated also including a ReplayablePublisher utility class that could
> be used along with a SubmissionPublisher to deal with many of these
> scenarios. But we didn't yet have enough experience about the range of forms
> that might be needed: full replay on subscribe? Bounded caching? Batched
> catchup? Ideas would be welcome.
>
>
> Heh, yah we're having the same type of difficulties - unfortunately I don't
> have any ideas; we've played around with full replay as well as bounded
> caching, but of course each has its disadvantages.

Right. If we knew of a single good API/class that would cover enough
of this ground, we'd probably include it, in the spirit of providing
components that do hard things well. But not.

Also...

On 03/19/2016 08:30 AM, Dávid Karnok wrote:
>
> I don't think Java 9 should have too many utilities over Flow for two
> reasons: 1) it may take years to improve upon them whereas libraries allow
> much frequent updates 2) reuse vs. performance where I'm afraid many start
> thinking in chains of SubmissionPublishers with mandatory async boundaries,
> queues and blocking.

Right. The goal was to include only what j.u.c can do best:
basic async/queueing components that others can build on.
(We even triaged out backlog support and interop with Streams etc.)
Doing only this encounters the usual j.u.c issues about exposing components
that most application programmers will not want to use directly.

-Doug



_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: jdk9 Candidate classes Flow and SubmissionPublisher

Viktor Klang
A great conversation, I hope all questions were answered by previous posters.

On Tue, Mar 22, 2016 at 3:49 PM, Doug Lea <[hidden email]> wrote:
On 03/21/2016 01:50 PM, Marcos Boyington wrote:


Thanks! That makes a lot of sense; but when combining SubmissionPublisher
with Processor is where things get interesting (and this is ~ what we're
doing). If we transform a SubmissionPublisher<T> with a Processor<T, T2>, it
would be nice to call close() on the resulting Processor and have it
propagate to the source SubmissionPublisher.  Unfortunately, there's no
shared interface which we can use here.

That is ~ what I was suggesting with ControllablePublisher; an interface
which is in essence Publisher + AutoCloseable, so that we have a shared
interface between SubmissionPublisher and any other Publishers which are
closeable.

As others mentioned, subscription.cancel propagates so that you
(eventually) stop getting items. I gather that you want to close here
to free up resources, but only if the source supports close().
Since you'd need to do type-testing anyway, adding an interface
wouldn't be better than:
  void closeIfCloseable(Object p) {
    if (p instanceof AutoCloseable) ((AutoCloseable)p).close();
  }

And even then, I'm not sure when you'd call this, because there may be
multiple subscribers or potential future subscribers.



I contemplated also including a ReplayablePublisher utility class that could
be used along with a SubmissionPublisher to deal with many of these
scenarios. But we didn't yet have enough experience about the range of forms
that might be needed: full replay on subscribe? Bounded caching? Batched
catchup? Ideas would be welcome.


Heh, yah we're having the same type of difficulties - unfortunately I don't
have any ideas; we've played around with full replay as well as bounded
caching, but of course each has its disadvantages.

Right. If we knew of a single good API/class that would cover enough
of this ground, we'd probably include it, in the spirit of providing
components that do hard things well. But not.

Also...

On 03/19/2016 08:30 AM, Dávid Karnok wrote:

I don't think Java 9 should have too many utilities over Flow for two
reasons: 1) it may take years to improve upon them whereas libraries allow
much frequent updates 2) reuse vs. performance where I'm afraid many start
thinking in chains of SubmissionPublishers with mandatory async boundaries,
queues and blocking.

Right. The goal was to include only what j.u.c can do best:
basic async/queueing components that others can build on.
(We even triaged out backlog support and interop with Streams etc.)
Doing only this encounters the usual j.u.c issues about exposing components
that most application programmers will not want to use directly.

-Doug




_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest



--
Cheers,

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest