jdk9 Candidate classes Flow and SubmissionPublisher

classic Classic list List threaded Threaded
35 messages Options
12
Reply | Threaded
Open this post in threaded view
|

jdk9 Candidate classes Flow and SubmissionPublisher

Doug Lea

Here's the only set of candidates for new jdk9 j.u.c classes:

As discussed a few months ago, there is no single best fluent
async/parallel API. CompletableFuture/CompletionStage best supports
continuation-style programming on futures, and java.util.stream best
supports (multi-stage, possibly-parallel) "pull" style operations on
the elements of collections. Until now, one missing category was
"push" style operations on items as they become available from an
active source. We are not alone in wanting a standard way to support
this. Over the past year, the "reactive-streams"
(http://www.reactive-streams.org/) effort has been defining a minimal
set of interfaces expressing commonalities and allowing
interoperablility across frameworks (including Rx and Akka Play), that
is nearing release. These interfaces include provisions for a simple
form of async flow control allowing developers to address resource
control issues that can otherwise cause problems in push-based
systems. Supporting this mini-framework helps avoid unpleasant
surprises possible when trying to use pull-style APIs for "hot"
reactive sources (but conversely is not as good a choice as
java.util.Stream for "cold" sources like collections).

The four intertwined interfaces (Publisher, Subscriber, Subscription,
Processor) are defined within the same class "Flow", that also
includes the first of some planned support methods to establish and
use Flow components, including tie-ins to java.util.streams and
CompletableFutures. See
http://gee.cs.oswego.edu/dl/jsr166/dist/docs/java/util/concurrent/Flow.html

(Logistically, the only alternative to class Flow would have been
to introduce a subpackage, which unnecessarily complicates usage.  And
"import static java.util.concurrent.Flow;" is about as convenient as
"import java.util.concurrent.flow.*;" would be.)

Also, the new stand-alone class SubmissionPublisher can serve as a
bridge from various kinds of item producers to Flow components, and is
useful in its own right. It is a form of ordered multicaster that
people have complained that we don't support in j.u.c, but now do.
See
http://gee.cs.oswego.edu/dl/jsr166/dist/docs/java/util/concurrent/SubmissionPublisher.html

Disclaimers: These are only candidates for inclusion.  The are in
preliminary form and will change. But comments and suggestions would
be welcome. As with the other candidate additions, if you are brave,
you can try out snapshots on jdk8+ by getting
   http://gee.cs.oswego.edu/dl/jsr166/dist/jsr166.jar
and running java -Xbootclasspath/p:jsr166.jar

-Doug

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: jdk9 Candidate classes Flow and SubmissionPublisher

Kasper Nielsen-4


On Thu, Jan 15, 2015 at 6:25 PM, Doug Lea <[hidden email]> wrote:

Also, the new stand-alone class SubmissionPublisher can serve as a
bridge from various kinds of item producers to Flow components, and is
useful in its own right. It is a form of ordered multicaster that
people have complained that we don't support in j.u.c, but now do.
See
http://gee.cs.oswego.edu/dl/jsr166/dist/docs/java/util/concurrent/SubmissionPublisher.html


I don't if it is an oversight but shouldn't SubmissionPublished allow for the cancellation of subscriptions.
For example, by letting SubmissionPublished.subscribe() return a subscription object you can use?

- Kasper

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: jdk9 Candidate classes Flow and SubmissionPublisher

Doug Lea
On 01/15/2015 02:14 PM, Kasper Nielsen wrote:
>     <http://gee.cs.oswego.edu/dl/jsr166/dist/docs/java/util/concurrent/SubmissionPublisher.html>
>
>
> I don't if it is an oversight but shouldn't SubmissionPublished allow for the
> cancellation of subscriptions.
> For example, by letting SubmissionPublished.subscribe() return a subscription
> object you can use?

Publisher.subscribe doesn't return a subscription, it instead invokes
onSubscribe with one. See the usage example (pasted from javadoc) below
for Subscriber side usage.

The APIs have a few tradeoffs like this that make them usable
across both concurrent and distributed contexts. It would be
possible to add a second form of subscribe to the implementation class
SubmissionPublisher, that would sometimes be more convenient, but also
more confusing, so probably not worthwhile.


      class SampleSubscriber<T> implements Subscriber<T> {
        final Consumer<? super T> consumer;
        Subscription subscription;
        final long requestSize;
        long count;
        SampleSubscriber(long requestSize, Consumer<? super T> consumer) {
          this.requestSize = requestSize;
          this.consumer = consumer;
        }
        public void onSubscribe(Subscription subscription) {
          count = requestSize / 2; // re-request when half consumed
          (this.subscription = subscription).request(requestSize);
        }
        public void onNext(T item) {
          if (--count <= 0)
            subscription.request(count = requestSize);
          consumer.accept(item);
        }
        public void onError(Throwable ex) { ex.printStackTrace(); }
        public void onComplete() {}
      }


_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: jdk9 Candidate classes Flow and SubmissionPublisher

Viktor Klang

What Doug said :-)

--
Cheers,

On 15 Jan 2015 20:50, "Doug Lea" <[hidden email]> wrote:
On 01/15/2015 02:14 PM, Kasper Nielsen wrote:
    <http://gee.cs.oswego.edu/dl/jsr166/dist/docs/java/util/concurrent/SubmissionPublisher.html>


I don't if it is an oversight but shouldn't SubmissionPublished allow for the
cancellation of subscriptions.
For example, by letting SubmissionPublished.subscribe() return a subscription
object you can use?

Publisher.subscribe doesn't return a subscription, it instead invokes
onSubscribe with one. See the usage example (pasted from javadoc) below
for Subscriber side usage.

The APIs have a few tradeoffs like this that make them usable
across both concurrent and distributed contexts. It would be
possible to add a second form of subscribe to the implementation class
SubmissionPublisher, that would sometimes be more convenient, but also
more confusing, so probably not worthwhile.


     class SampleSubscriber<T> implements Subscriber<T> {
       final Consumer<? super T> consumer;
       Subscription subscription;
       final long requestSize;
       long count;
       SampleSubscriber(long requestSize, Consumer<? super T> consumer) {
         this.requestSize = requestSize;
         this.consumer = consumer;
       }
       public void onSubscribe(Subscription subscription) {
         count = requestSize / 2; // re-request when half consumed
         (this.subscription = subscription).request(requestSize);
       }
       public void onNext(T item) {
         if (--count <= 0)
           subscription.request(count = requestSize);
         consumer.accept(item);
       }
       public void onError(Throwable ex) { ex.printStackTrace(); }
       public void onComplete() {}
     }


_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: jdk9 Candidate classes Flow and SubmissionPublisher

Remi Forax
In reply to this post by Doug Lea
I think it's too soon to try to include an API like this in JDK9.
Currently, everybody seems to think it can come with a better API for a
reactive framework.
I think it's better to wait and see.

Rémi

On 01/15/2015 06:25 PM, Doug Lea wrote:

>
> Here's the only set of candidates for new jdk9 j.u.c classes:
>
> As discussed a few months ago, there is no single best fluent
> async/parallel API. CompletableFuture/CompletionStage best supports
> continuation-style programming on futures, and java.util.stream best
> supports (multi-stage, possibly-parallel) "pull" style operations on
> the elements of collections. Until now, one missing category was
> "push" style operations on items as they become available from an
> active source. We are not alone in wanting a standard way to support
> this. Over the past year, the "reactive-streams"
> (http://www.reactive-streams.org/) effort has been defining a minimal
> set of interfaces expressing commonalities and allowing
> interoperablility across frameworks (including Rx and Akka Play), that
> is nearing release. These interfaces include provisions for a simple
> form of async flow control allowing developers to address resource
> control issues that can otherwise cause problems in push-based
> systems. Supporting this mini-framework helps avoid unpleasant
> surprises possible when trying to use pull-style APIs for "hot"
> reactive sources (but conversely is not as good a choice as
> java.util.Stream for "cold" sources like collections).
>
> The four intertwined interfaces (Publisher, Subscriber, Subscription,
> Processor) are defined within the same class "Flow", that also
> includes the first of some planned support methods to establish and
> use Flow components, including tie-ins to java.util.streams and
> CompletableFutures. See
> http://gee.cs.oswego.edu/dl/jsr166/dist/docs/java/util/concurrent/Flow.html 
>
>
> (Logistically, the only alternative to class Flow would have been
> to introduce a subpackage, which unnecessarily complicates usage. And
> "import static java.util.concurrent.Flow;" is about as convenient as
> "import java.util.concurrent.flow.*;" would be.)
>
> Also, the new stand-alone class SubmissionPublisher can serve as a
> bridge from various kinds of item producers to Flow components, and is
> useful in its own right. It is a form of ordered multicaster that
> people have complained that we don't support in j.u.c, but now do.
> See
> http://gee.cs.oswego.edu/dl/jsr166/dist/docs/java/util/concurrent/SubmissionPublisher.html 
>
>
> Disclaimers: These are only candidates for inclusion.  The are in
> preliminary form and will change. But comments and suggestions would
> be welcome. As with the other candidate additions, if you are brave,
> you can try out snapshots on jdk8+ by getting
>   http://gee.cs.oswego.edu/dl/jsr166/dist/jsr166.jar
> and running java -Xbootclasspath/p:jsr166.jar
>
> -Doug
>
> _______________________________________________
> Concurrency-interest mailing list
> [hidden email]
> http://cs.oswego.edu/mailman/listinfo/concurrency-interest

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: jdk9 Candidate classes Flow and SubmissionPublisher

Doug Lea
On 01/15/2015 05:59 PM, Remi Forax wrote:
> I think it's too soon to try to include an API like this in JDK9.
> Currently, everybody seems to think it can come with a better API for a reactive
> framework.

Where a lot of those everybodies were involved in formulating
these APIs. See the list at http://www.reactive-streams.org/

> I think it's better to wait and see.

We waited. We saw :-)

-Doug


>
> Rémi
>
> On 01/15/2015 06:25 PM, Doug Lea wrote:
>>
>> Here's the only set of candidates for new jdk9 j.u.c classes:
>>
>> As discussed a few months ago, there is no single best fluent
>> async/parallel API. CompletableFuture/CompletionStage best supports
>> continuation-style programming on futures, and java.util.stream best
>> supports (multi-stage, possibly-parallel) "pull" style operations on
>> the elements of collections. Until now, one missing category was
>> "push" style operations on items as they become available from an
>> active source. We are not alone in wanting a standard way to support
>> this. Over the past year, the "reactive-streams"
>> (http://www.reactive-streams.org/) effort has been defining a minimal
>> set of interfaces expressing commonalities and allowing
>> interoperablility across frameworks (including Rx and Akka Play), that
>> is nearing release. These interfaces include provisions for a simple
>> form of async flow control allowing developers to address resource
>> control issues that can otherwise cause problems in push-based
>> systems. Supporting this mini-framework helps avoid unpleasant
>> surprises possible when trying to use pull-style APIs for "hot"
>> reactive sources (but conversely is not as good a choice as
>> java.util.Stream for "cold" sources like collections).
>>
>> The four intertwined interfaces (Publisher, Subscriber, Subscription,
>> Processor) are defined within the same class "Flow", that also
>> includes the first of some planned support methods to establish and
>> use Flow components, including tie-ins to java.util.streams and
>> CompletableFutures. See
>> http://gee.cs.oswego.edu/dl/jsr166/dist/docs/java/util/concurrent/Flow.html
>>
>> (Logistically, the only alternative to class Flow would have been
>> to introduce a subpackage, which unnecessarily complicates usage. And
>> "import static java.util.concurrent.Flow;" is about as convenient as
>> "import java.util.concurrent.flow.*;" would be.)
>>
>> Also, the new stand-alone class SubmissionPublisher can serve as a
>> bridge from various kinds of item producers to Flow components, and is
>> useful in its own right. It is a form of ordered multicaster that
>> people have complained that we don't support in j.u.c, but now do.
>> See
>> http://gee.cs.oswego.edu/dl/jsr166/dist/docs/java/util/concurrent/SubmissionPublisher.html
>>
>>
>> Disclaimers: These are only candidates for inclusion.  The are in
>> preliminary form and will change. But comments and suggestions would
>> be welcome. As with the other candidate additions, if you are brave,
>> you can try out snapshots on jdk8+ by getting
>>   http://gee.cs.oswego.edu/dl/jsr166/dist/jsr166.jar
>> and running java -Xbootclasspath/p:jsr166.jar
>>
>> -Doug
>>
>> _______________________________________________
>> Concurrency-interest mailing list
>> [hidden email]
>> http://cs.oswego.edu/mailman/listinfo/concurrency-interest
>
> _______________________________________________
> Concurrency-interest mailing list
> [hidden email]
> http://cs.oswego.edu/mailman/listinfo/concurrency-interest
>



_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: jdk9 Candidate classes Flow and SubmissionPublisher

Kasper Nielsen-4
In reply to this post by Doug Lea


On Thu, Jan 15, 2015 at 8:28 PM, Doug Lea <[hidden email]> wrote:
On 01/15/2015 02:14 PM, Kasper Nielsen wrote:
    <http://gee.cs.oswego.edu/dl/jsr166/dist/docs/java/util/concurrent/SubmissionPublisher.html>


I don't if it is an oversight but shouldn't SubmissionPublished allow for the
cancellation of subscriptions.
For example, by letting SubmissionPublished.subscribe() return a subscription
object you can use?

Publisher.subscribe doesn't return a subscription, it instead invokes
onSubscribe with one. See the usage example (pasted from javadoc) below
for Subscriber side usage.

The APIs have a few tradeoffs like this that make them usable
across both concurrent and distributed contexts. It would be
possible to add a second form of subscribe to the implementation class
SubmissionPublisher, that would sometimes be more convenient, but also
more confusing, so probably not worthwhile.

 
Right didn't get the API at first.

A couple of suggestions:
1)
Why not take the Subscription object for every parameter. 
This avoid having to store it in a field.

2)
make onComplete a default method.

3)
rename onSubscribe/onComplete to something like onStart/onStop..
onComplete is okay but onSubscribe does not immediately convey that it is the first mehod being invoked

Ending up with something like

interface Subscriber<T> {
  void onStart(Subscription subscription);
  void onNext(Subscription subscription, T item);
  void onError(Subscription subscription, Throwable cause);
  default void onStop(Subscription subscription) {}
}

- Kasper

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: jdk9 Candidate classes Flow and SubmissionPublisher

Remi Forax
In reply to this post by Doug Lea


On 01/16/2015 12:50 AM, Doug Lea wrote:
> On 01/15/2015 05:59 PM, Remi Forax wrote:
>> I think it's too soon to try to include an API like this in JDK9.
>> Currently, everybody seems to think it can come with a better API for
>> a reactive
>> framework.
>
> Where a lot of those everybodies were involved in formulating
> these APIs. See the list at http://www.reactive-streams.org/

yes, I know, having a lot of people hammering the API is great thing,
but why do you want to include that in the JDK,
it's better for the API to stay out of the JDK because the release cycle
is faster,
it's better for the JDK because the API is still moving.

>
>> I think it's better to wait and see.
>
> We waited. We saw :-)

I heard about Rx in 2009, and since that I have observed two things,
the use cases have changed, the APIs have changed, even the manifesto
has changed recently.
This is not a bad thing, it means the community around the reactive
stuff is vibrant so
why do you want to stop that rush of cool ideas.

And, if you think you want to freeze the API, I think a JSR is a better
vehicle.

>
> -Doug

Rémi

>
>
>>
>> Rémi
>>
>> On 01/15/2015 06:25 PM, Doug Lea wrote:
>>>
>>> Here's the only set of candidates for new jdk9 j.u.c classes:
>>>
>>> As discussed a few months ago, there is no single best fluent
>>> async/parallel API. CompletableFuture/CompletionStage best supports
>>> continuation-style programming on futures, and java.util.stream best
>>> supports (multi-stage, possibly-parallel) "pull" style operations on
>>> the elements of collections. Until now, one missing category was
>>> "push" style operations on items as they become available from an
>>> active source. We are not alone in wanting a standard way to support
>>> this. Over the past year, the "reactive-streams"
>>> (http://www.reactive-streams.org/) effort has been defining a minimal
>>> set of interfaces expressing commonalities and allowing
>>> interoperablility across frameworks (including Rx and Akka Play), that
>>> is nearing release. These interfaces include provisions for a simple
>>> form of async flow control allowing developers to address resource
>>> control issues that can otherwise cause problems in push-based
>>> systems. Supporting this mini-framework helps avoid unpleasant
>>> surprises possible when trying to use pull-style APIs for "hot"
>>> reactive sources (but conversely is not as good a choice as
>>> java.util.Stream for "cold" sources like collections).
>>>
>>> The four intertwined interfaces (Publisher, Subscriber, Subscription,
>>> Processor) are defined within the same class "Flow", that also
>>> includes the first of some planned support methods to establish and
>>> use Flow components, including tie-ins to java.util.streams and
>>> CompletableFutures. See
>>> http://gee.cs.oswego.edu/dl/jsr166/dist/docs/java/util/concurrent/Flow.html 
>>>
>>>
>>> (Logistically, the only alternative to class Flow would have been
>>> to introduce a subpackage, which unnecessarily complicates usage. And
>>> "import static java.util.concurrent.Flow;" is about as convenient as
>>> "import java.util.concurrent.flow.*;" would be.)
>>>
>>> Also, the new stand-alone class SubmissionPublisher can serve as a
>>> bridge from various kinds of item producers to Flow components, and is
>>> useful in its own right. It is a form of ordered multicaster that
>>> people have complained that we don't support in j.u.c, but now do.
>>> See
>>> http://gee.cs.oswego.edu/dl/jsr166/dist/docs/java/util/concurrent/SubmissionPublisher.html 
>>>
>>>
>>>
>>> Disclaimers: These are only candidates for inclusion.  The are in
>>> preliminary form and will change. But comments and suggestions would
>>> be welcome. As with the other candidate additions, if you are brave,
>>> you can try out snapshots on jdk8+ by getting
>>>   http://gee.cs.oswego.edu/dl/jsr166/dist/jsr166.jar
>>> and running java -Xbootclasspath/p:jsr166.jar
>>>
>>> -Doug
>>>
>>> _______________________________________________
>>> Concurrency-interest mailing list
>>> [hidden email]
>>> http://cs.oswego.edu/mailman/listinfo/concurrency-interest
>>
>> _______________________________________________
>> Concurrency-interest mailing list
>> [hidden email]
>> http://cs.oswego.edu/mailman/listinfo/concurrency-interest
>>
>
>
>
> _______________________________________________
> Concurrency-interest mailing list
> [hidden email]
> http://cs.oswego.edu/mailman/listinfo/concurrency-interest

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: jdk9 Candidate classes Flow and SubmissionPublisher

Dávid Karnok
I've been working in this Java reactive field for 4 years now. Having some base interfaces in JDK itself is nice, but libraries have lived quite well with their JDK-external interfaces/classes. Unless JDK itself capitalizes on this better observer-pattern (i.e., retrofitting Swing's event notification mechanism or NIO's async) there is not much gain; programmers can continue using the external libraries and interops can be built as needed. (I'd like to have yield return and extension methods instead but that's not going to happen anytime soon). 

Few questions/observations about these new classes:

- Will the executors be strict part of the base classes? I.e., will I be able to observe a publisher synchronously? (Rx has observeOn and subscribeOn to do async on demand.)
- subscribe() has to do some enforcing which can be quite heavy, but adding unsafeSubscribe (or safeSubscribe) makes Publisher no longer pure functional interface.
- In Rx, we have a rule that onXXX methods can't be called while holding a lock to avoid deadlocks. And in general, quite a lot of work went into doing atomic "magic" to be lock-free as much as possible.
- What default operators will be available on a producer? (map, merge, concat, zip, window, lift, etc).



2015-01-16 1:48 GMT+01:00 Remi Forax <[hidden email]>:


On 01/16/2015 12:50 AM, Doug Lea wrote:
On 01/15/2015 05:59 PM, Remi Forax wrote:
I think it's too soon to try to include an API like this in JDK9.
Currently, everybody seems to think it can come with a better API for a reactive
framework.

Where a lot of those everybodies were involved in formulating
these APIs. See the list at http://www.reactive-streams.org/

yes, I know, having a lot of people hammering the API is great thing,
but why do you want to include that in the JDK,
it's better for the API to stay out of the JDK because the release cycle is faster,
it's better for the JDK because the API is still moving.


I think it's better to wait and see.

We waited. We saw :-)

I heard about Rx in 2009, and since that I have observed two things,
the use cases have changed, the APIs have changed, even the manifesto has changed recently.
This is not a bad thing, it means the community around the reactive stuff is vibrant so
why do you want to stop that rush of cool ideas.

And, if you think you want to freeze the API, I think a JSR is a better vehicle.


-Doug

Rémi





Rémi

On 01/15/2015 06:25 PM, Doug Lea wrote:

Here's the only set of candidates for new jdk9 j.u.c classes:

As discussed a few months ago, there is no single best fluent
async/parallel API. CompletableFuture/CompletionStage best supports
continuation-style programming on futures, and java.util.stream best
supports (multi-stage, possibly-parallel) "pull" style operations on
the elements of collections. Until now, one missing category was
"push" style operations on items as they become available from an
active source. We are not alone in wanting a standard way to support
this. Over the past year, the "reactive-streams"
(http://www.reactive-streams.org/) effort has been defining a minimal
set of interfaces expressing commonalities and allowing
interoperablility across frameworks (including Rx and Akka Play), that
is nearing release. These interfaces include provisions for a simple
form of async flow control allowing developers to address resource
control issues that can otherwise cause problems in push-based
systems. Supporting this mini-framework helps avoid unpleasant
surprises possible when trying to use pull-style APIs for "hot"
reactive sources (but conversely is not as good a choice as
java.util.Stream for "cold" sources like collections).

The four intertwined interfaces (Publisher, Subscriber, Subscription,
Processor) are defined within the same class "Flow", that also
includes the first of some planned support methods to establish and
use Flow components, including tie-ins to java.util.streams and
CompletableFutures. See
http://gee.cs.oswego.edu/dl/jsr166/dist/docs/java/util/concurrent/Flow.html

(Logistically, the only alternative to class Flow would have been
to introduce a subpackage, which unnecessarily complicates usage. And
"import static java.util.concurrent.Flow;" is about as convenient as
"import java.util.concurrent.flow.*;" would be.)

Also, the new stand-alone class SubmissionPublisher can serve as a
bridge from various kinds of item producers to Flow components, and is
useful in its own right. It is a form of ordered multicaster that
people have complained that we don't support in j.u.c, but now do.
See
http://gee.cs.oswego.edu/dl/jsr166/dist/docs/java/util/concurrent/SubmissionPublisher.html


Disclaimers: These are only candidates for inclusion.  The are in
preliminary form and will change. But comments and suggestions would
be welcome. As with the other candidate additions, if you are brave,
you can try out snapshots on jdk8+ by getting
  http://gee.cs.oswego.edu/dl/jsr166/dist/jsr166.jar
and running java -Xbootclasspath/p:jsr166.jar

-Doug

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest




_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest



--
Best regards,
David Karnok

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: jdk9 Candidate classes Flow and SubmissionPublisher

Doug Lea
In reply to this post by Remi Forax
On 01/15/2015 07:48 PM, Remi Forax wrote:

> On 01/16/2015 12:50 AM, Doug Lea wrote:
>> On 01/15/2015 05:59 PM, Remi Forax wrote:
>>> I think it's too soon to try to include an API like this in JDK9.
>>> Currently, everybody seems to think it can come with a better API for a reactive
>>> framework.
>>
>> Where a lot of those everybodies were involved in formulating
>> these APIs. See the list at http://www.reactive-streams.org/
>
> yes, I know, having a lot of people hammering the API is great thing,
> but why do you want to include that in the JDK,

Flow provides the smallest set of small interfaces (only 7 methods
total) that enables creation of j.u.c implementation components like
SubmissionPublisher to serve this audience. Choosing these
APIs in particular (even if some people don't like the names
of some of the methods) seems to maximize impact.

There are no current plans to recreate full frameworks.
And for now, these are just candidates. If they don't
turn out to be helpful, they won't make it into jdk9.

-Doug

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: jdk9 Candidate classes Flow and SubmissionPublisher

Viktor Klang
In reply to this post by Remi Forax
Hi Rémi,

I'll try to address your concerns below:

On Fri, Jan 16, 2015 at 1:48 AM, Remi Forax <[hidden email]> wrote:


On 01/16/2015 12:50 AM, Doug Lea wrote:
On 01/15/2015 05:59 PM, Remi Forax wrote:
I think it's too soon to try to include an API like this in JDK9.
Currently, everybody seems to think it can come with a better API for a reactive
framework.

Where a lot of those everybodies were involved in formulating
these APIs. See the list at http://www.reactive-streams.org/

yes, I know, having a lot of people hammering the API is great thing,
but why do you want to include that in the JDK, 
it's better for the API to stay out of the JDK because the release cycle is faster, 
it's better for the JDK because the API is still moving.

The RS interfaces and method signatures has not seen an update in a long time and is considered done (RS is in 1.0.0.RC1 with an RC2 shipped soon with spec clarifications and improvements to the TCK and 1.0.0.final is expected shortly thereafter).
 



I think it's better to wait and see.

We waited. We saw :-)

I heard about Rx in 2009, and since that I have observed two things,
the use cases have changed, the APIs have changed, even the manifesto has changed recently.
This is not a bad thing, it means the community around the reactive stuff is vibrant so
why do you want to stop that rush of cool ideas.

RS is specifically for interop—a common vocabulary and semantics to build asynchronous, non-blocking streaming transformation with non-blocking back pressure. So from an API perspective it is targeted towards library developers, just as most of the things in java.util.concurrent. End user APIs are provided on top, through implementations like SubmissionPublisher or RxJava, Project Reactor, Akka Streams, Ratpack etc.
 

And, if you think you want to freeze the API, I think a JSR is a better vehicle.

Given the the target and scope of RS, it seems to me that it fits nicely within the scope of JSR166? 
There is a spec, 4 pure interfaces with a sum total of 7 methods and a TCK to validate implementations: https://github.com/reactive-streams/reactive-streams



-Doug

Rémi





Rémi

On 01/15/2015 06:25 PM, Doug Lea wrote:

Here's the only set of candidates for new jdk9 j.u.c classes:

As discussed a few months ago, there is no single best fluent
async/parallel API. CompletableFuture/CompletionStage best supports
continuation-style programming on futures, and java.util.stream best
supports (multi-stage, possibly-parallel) "pull" style operations on
the elements of collections. Until now, one missing category was
"push" style operations on items as they become available from an
active source. We are not alone in wanting a standard way to support
this. Over the past year, the "reactive-streams"
(http://www.reactive-streams.org/) effort has been defining a minimal
set of interfaces expressing commonalities and allowing
interoperablility across frameworks (including Rx and Akka Play), that
is nearing release. These interfaces include provisions for a simple
form of async flow control allowing developers to address resource
control issues that can otherwise cause problems in push-based
systems. Supporting this mini-framework helps avoid unpleasant
surprises possible when trying to use pull-style APIs for "hot"
reactive sources (but conversely is not as good a choice as
java.util.Stream for "cold" sources like collections).

The four intertwined interfaces (Publisher, Subscriber, Subscription,
Processor) are defined within the same class "Flow", that also
includes the first of some planned support methods to establish and
use Flow components, including tie-ins to java.util.streams and
CompletableFutures. See
http://gee.cs.oswego.edu/dl/jsr166/dist/docs/java/util/concurrent/Flow.html

(Logistically, the only alternative to class Flow would have been
to introduce a subpackage, which unnecessarily complicates usage. And
"import static java.util.concurrent.Flow;" is about as convenient as
"import java.util.concurrent.flow.*;" would be.)

Also, the new stand-alone class SubmissionPublisher can serve as a
bridge from various kinds of item producers to Flow components, and is
useful in its own right. It is a form of ordered multicaster that
people have complained that we don't support in j.u.c, but now do.
See
http://gee.cs.oswego.edu/dl/jsr166/dist/docs/java/util/concurrent/SubmissionPublisher.html


Disclaimers: These are only candidates for inclusion.  The are in
preliminary form and will change. But comments and suggestions would
be welcome. As with the other candidate additions, if you are brave,
you can try out snapshots on jdk8+ by getting
  http://gee.cs.oswego.edu/dl/jsr166/dist/jsr166.jar
and running java -Xbootclasspath/p:jsr166.jar

-Doug

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest




_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest



--
Cheers,

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: jdk9 Candidate classes Flow and SubmissionPublisher

Doug Lea
In reply to this post by Dávid Karnok
I'll try to minimize involvement in the meta-issues, but here are
some answers to...

On 01/16/2015 04:38 AM, Dávid Karnok wrote:
> Few questions/observations about these new classes:
>
> - Will the executors be strict part of the base classes? I.e., will I be able to
> observe a publisher synchronously? (Rx has observeOn and subscribeOn to do async
> on demand.)

The only j.u.c implementations are async-only.

> - subscribe() has to do some enforcing which can be quite heavy, but adding
> unsafeSubscribe (or safeSubscribe) makes Publisher no longer pure functional
> interface.

It's does require a presence check, but this is exploited in
SubmissionPublisher to also process lazy removals, so the
aggregate cost per subscription is fairly low.

> - In Rx, we have a rule that onXXX methods can't be called while holding a lock
> to avoid deadlocks. And in general, quite a lot of work went into doing atomic
> "magic" to be lock-free as much as possible.

SubmissionPublisher is lock-free except for an outer lock to
ensure only one submitter at a time. Because most usages will
only have one submitter anyway, we use a builtin lock that
can be biased by JVM in the common case and so practically free.

> - What default operators will be available on a producer? (map, merge, concat,
> zip, window, lift, etc).

None are planned, but we do have a java.util.Stream tie-in:
Flow.stream(pub, streamExpr), that will require a few jdk9
internal Stream extensions to work well. In the mean time it
it collects all items before executing.

-Doug




_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: jdk9 Candidate classes Flow and SubmissionPublisher

Remi Forax
In reply to this post by Viktor Klang
Viktor, Doug,

I see at least 3 changes that in my opinion should be done on the current API.
- Subscriber.onSubscribe and onNext/onError/onComplete should not be part of the same interface.
  This will make the workflow more explicit and avoid to have the subscription object mutable

- onComplete and onError should be one method and not two,
  let's call it onComplete(Throwable throwableOrNull).
  I think it's better two says that you have to way to complete, either normally or with an exception.
  And from my experience, the code of onError and onComplete tend to be fairly similar.
   
- onComplete(Throwable) should be a default method (the implementation do nothing),
  so Observer and Consumer will be equivalent in term of function type.

  Re-using the example of Doug, 

  long requestSize = ...
  Observer<T> observer = consumer::accept;   // auto-conversion from a Consumer
  publisher.subscribe(subscription -> {
      long half  = requestSize / 2; // re-request when half consumed
      subscription.request(half);
      return new Observer<T>() {
          private long count = half;
          public void onNext(T item) {
              if (--count <= 0) {
                  subscription.request(count = half);
              }
              observer.onNext(item);
          }
          public void onComplete(Throwable ex) {
            observer.onComplete(ex);
          }
      };
  });

  Note that in term of implementation you can still have an implementation that implements both
  Subscriber and Observer with onSubscribe() returning this.

and I'm still not convince that this should be integrated in JDK.

Rémi

On 01/16/2015 01:59 PM, Viktor Klang wrote:
Hi Rémi,

I'll try to address your concerns below:

On Fri, Jan 16, 2015 at 1:48 AM, Remi Forax <[hidden email]> wrote:


On 01/16/2015 12:50 AM, Doug Lea wrote:
On 01/15/2015 05:59 PM, Remi Forax wrote:
I think it's too soon to try to include an API like this in JDK9.
Currently, everybody seems to think it can come with a better API for a reactive
framework.

Where a lot of those everybodies were involved in formulating
these APIs. See the list at http://www.reactive-streams.org/

yes, I know, having a lot of people hammering the API is great thing,
but why do you want to include that in the JDK, 
it's better for the API to stay out of the JDK because the release cycle is faster, 
it's better for the JDK because the API is still moving.

The RS interfaces and method signatures has not seen an update in a long time and is considered done (RS is in 1.0.0.RC1 with an RC2 shipped soon with spec clarifications and improvements to the TCK and 1.0.0.final is expected shortly thereafter).
 



I think it's better to wait and see.

We waited. We saw :-)

I heard about Rx in 2009, and since that I have observed two things,
the use cases have changed, the APIs have changed, even the manifesto has changed recently.
This is not a bad thing, it means the community around the reactive stuff is vibrant so
why do you want to stop that rush of cool ideas.

RS is specifically for interop—a common vocabulary and semantics to build asynchronous, non-blocking streaming transformation with non-blocking back pressure. So from an API perspective it is targeted towards library developers, just as most of the things in java.util.concurrent. End user APIs are provided on top, through implementations like SubmissionPublisher or RxJava, Project Reactor, Akka Streams, Ratpack etc.
 

And, if you think you want to freeze the API, I think a JSR is a better vehicle.

Given the the target and scope of RS, it seems to me that it fits nicely within the scope of JSR166? 
There is a spec, 4 pure interfaces with a sum total of 7 methods and a TCK to validate implementations: https://github.com/reactive-streams/reactive-streams



-Doug

Rémi





Rémi

On 01/15/2015 06:25 PM, Doug Lea wrote:

Here's the only set of candidates for new jdk9 j.u.c classes:

As discussed a few months ago, there is no single best fluent
async/parallel API. CompletableFuture/CompletionStage best supports
continuation-style programming on futures, and java.util.stream best
supports (multi-stage, possibly-parallel) "pull" style operations on
the elements of collections. Until now, one missing category was
"push" style operations on items as they become available from an
active source. We are not alone in wanting a standard way to support
this. Over the past year, the "reactive-streams"
(http://www.reactive-streams.org/) effort has been defining a minimal
set of interfaces expressing commonalities and allowing
interoperablility across frameworks (including Rx and Akka Play), that
is nearing release. These interfaces include provisions for a simple
form of async flow control allowing developers to address resource
control issues that can otherwise cause problems in push-based
systems. Supporting this mini-framework helps avoid unpleasant
surprises possible when trying to use pull-style APIs for "hot"
reactive sources (but conversely is not as good a choice as
java.util.Stream for "cold" sources like collections).

The four intertwined interfaces (Publisher, Subscriber, Subscription,
Processor) are defined within the same class "Flow", that also
includes the first of some planned support methods to establish and
use Flow components, including tie-ins to java.util.streams and
CompletableFutures. See
http://gee.cs.oswego.edu/dl/jsr166/dist/docs/java/util/concurrent/Flow.html

(Logistically, the only alternative to class Flow would have been
to introduce a subpackage, which unnecessarily complicates usage. And
"import static java.util.concurrent.Flow;" is about as convenient as
"import java.util.concurrent.flow.*;" would be.)

Also, the new stand-alone class SubmissionPublisher can serve as a
bridge from various kinds of item producers to Flow components, and is
useful in its own right. It is a form of ordered multicaster that
people have complained that we don't support in j.u.c, but now do.
See
http://gee.cs.oswego.edu/dl/jsr166/dist/docs/java/util/concurrent/SubmissionPublisher.html


Disclaimers: These are only candidates for inclusion.  The are in
preliminary form and will change. But comments and suggestions would
be welcome. As with the other candidate additions, if you are brave,
you can try out snapshots on jdk8+ by getting
  http://gee.cs.oswego.edu/dl/jsr166/dist/jsr166.jar
and running java -Xbootclasspath/p:jsr166.jar

-Doug

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest




_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest



--
Cheers,


_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: jdk9 Candidate classes Flow and SubmissionPublisher

Viktor Klang
Hi Rémi,

Thanks for the great questions
(answering inline)

On Fri, Jan 16, 2015 at 4:03 PM, Remi Forax <[hidden email]> wrote:
Viktor, Doug,

I see at least 3 changes that in my opinion should be done on the current API.
- Subscriber.onSubscribe and onNext/onError/onComplete should not be part of the same interface.

Unfortunately that did not pan out (we've did a lot of research/PoCs, I'd be surprised if we have missed any permutation) for the reason that failure to create a Subscription due to a failure in the Publisher requires the possibility of issuing onError without a preceding onSubscribe. We are also considering allowing fast completion (avoiding having to call onSubscribe if the stream is already completed).
 
  This will make the workflow more explicit and avoid to have the subscription object mutable

The Subscription need not be mutable right now, did you mean Subscriber?
 

- onComplete and onError should be one method and not two,
  let's call it onComplete(Throwable throwableOrNull).
  I think it's better two says that you have to way to complete, either normally or with an exception.
  And from my experience, the code of onError and onComplete tend to be fairly similar.

That was an interesting suggestion & discussion we had quite early (1+ year ago), but we decided both that `null`-checking as being an anti-pattern, as well as mixing failure management code with success-management code is brittle and mixes concerns.
Also, a valid follow-on question was: where do you stop? onNext(Either<Throwable,T> elemOrCompleteOrError)? Function<Either<Throwable, T>, Void>? Function<Either<Subscription, Either<Throwable, T>>, Void>?

So it was deemed that having a distinct method not only made it easier to avoid NPEs, made it easy to separate the concerns in the implementations, easier to find what you were looking for in the javadoc, the spec etc.
 
   
- onComplete(Throwable) should be a default method (the implementation do nothing),
  so Observer and Consumer will be equivalent in term of function type.

Since the target is JDK9 onNext, onComplete and onError can definitely be default methods, the reason why they aren't currently in the RS spec is because we started before Java 8 shipped and keeping Java6-7 compatibility made sense.
 

  Re-using the example of Doug, 

  long requestSize = ...
  Observer<T> observer = consumer::accept;   // auto-conversion from a Consumer
  publisher.subscribe(subscription -> {
      long half  = requestSize / 2; // re-request when half consumed
      subscription.request(half);
      return new Observer<T>() {
          private long count = half;
          public void onNext(T item) {
              if (--count <= 0) {
                  subscription.request(count = half);
              }
              observer.onNext(item);
          }
          public void onComplete(Throwable ex) {
            observer.onComplete(ex);
          }
      };
  });

  Note that in term of implementation you can still have an implementation that implements both
  Subscriber and Observer with onSubscribe() returning this.

and I'm still not convince that this should be integrated in JDK.

I hope to be able to address those concerns :)
Anything that I missed to reply to?
 

Rémi


On 01/16/2015 01:59 PM, Viktor Klang wrote:
Hi Rémi,

I'll try to address your concerns below:

On Fri, Jan 16, 2015 at 1:48 AM, Remi Forax <[hidden email]> wrote:


On 01/16/2015 12:50 AM, Doug Lea wrote:
On 01/15/2015 05:59 PM, Remi Forax wrote:
I think it's too soon to try to include an API like this in JDK9.
Currently, everybody seems to think it can come with a better API for a reactive
framework.

Where a lot of those everybodies were involved in formulating
these APIs. See the list at http://www.reactive-streams.org/

yes, I know, having a lot of people hammering the API is great thing,
but why do you want to include that in the JDK, 
it's better for the API to stay out of the JDK because the release cycle is faster, 
it's better for the JDK because the API is still moving.

The RS interfaces and method signatures has not seen an update in a long time and is considered done (RS is in 1.0.0.RC1 with an RC2 shipped soon with spec clarifications and improvements to the TCK and 1.0.0.final is expected shortly thereafter).
 



I think it's better to wait and see.

We waited. We saw :-)

I heard about Rx in 2009, and since that I have observed two things,
the use cases have changed, the APIs have changed, even the manifesto has changed recently.
This is not a bad thing, it means the community around the reactive stuff is vibrant so
why do you want to stop that rush of cool ideas.

RS is specifically for interop—a common vocabulary and semantics to build asynchronous, non-blocking streaming transformation with non-blocking back pressure. So from an API perspective it is targeted towards library developers, just as most of the things in java.util.concurrent. End user APIs are provided on top, through implementations like SubmissionPublisher or RxJava, Project Reactor, Akka Streams, Ratpack etc.
 

And, if you think you want to freeze the API, I think a JSR is a better vehicle.

Given the the target and scope of RS, it seems to me that it fits nicely within the scope of JSR166? 
There is a spec, 4 pure interfaces with a sum total of 7 methods and a TCK to validate implementations: https://github.com/reactive-streams/reactive-streams



-Doug

Rémi





Rémi

On 01/15/2015 06:25 PM, Doug Lea wrote:

Here's the only set of candidates for new jdk9 j.u.c classes:

As discussed a few months ago, there is no single best fluent
async/parallel API. CompletableFuture/CompletionStage best supports
continuation-style programming on futures, and java.util.stream best
supports (multi-stage, possibly-parallel) "pull" style operations on
the elements of collections. Until now, one missing category was
"push" style operations on items as they become available from an
active source. We are not alone in wanting a standard way to support
this. Over the past year, the "reactive-streams"
(http://www.reactive-streams.org/) effort has been defining a minimal
set of interfaces expressing commonalities and allowing
interoperablility across frameworks (including Rx and Akka Play), that
is nearing release. These interfaces include provisions for a simple
form of async flow control allowing developers to address resource
control issues that can otherwise cause problems in push-based
systems. Supporting this mini-framework helps avoid unpleasant
surprises possible when trying to use pull-style APIs for "hot"
reactive sources (but conversely is not as good a choice as
java.util.Stream for "cold" sources like collections).

The four intertwined interfaces (Publisher, Subscriber, Subscription,
Processor) are defined within the same class "Flow", that also
includes the first of some planned support methods to establish and
use Flow components, including tie-ins to java.util.streams and
CompletableFutures. See
http://gee.cs.oswego.edu/dl/jsr166/dist/docs/java/util/concurrent/Flow.html

(Logistically, the only alternative to class Flow would have been
to introduce a subpackage, which unnecessarily complicates usage. And
"import static java.util.concurrent.Flow;" is about as convenient as
"import java.util.concurrent.flow.*;" would be.)

Also, the new stand-alone class SubmissionPublisher can serve as a
bridge from various kinds of item producers to Flow components, and is
useful in its own right. It is a form of ordered multicaster that
people have complained that we don't support in j.u.c, but now do.
See
http://gee.cs.oswego.edu/dl/jsr166/dist/docs/java/util/concurrent/SubmissionPublisher.html


Disclaimers: These are only candidates for inclusion.  The are in
preliminary form and will change. But comments and suggestions would
be welcome. As with the other candidate additions, if you are brave,
you can try out snapshots on jdk8+ by getting
  http://gee.cs.oswego.edu/dl/jsr166/dist/jsr166.jar
and running java -Xbootclasspath/p:jsr166.jar

-Doug

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest




_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest



--
Cheers,




--
Cheers,

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: jdk9 Candidate classes Flow and SubmissionPublisher

Gregg Wonderly-3
In reply to this post by Remi Forax
It’s always convenient to “wait and see”.  It’s always better to “do and adapt” when you are trying to actually accomplish something “better”.  While programming APIs can always be a problem once they are visible, the larger issue is the limitations that “nothing yet” imposes compared to “not complete”.  Of course a minimalist approach to API spec is always good early on, so that refinement is backward compatible for the steps that you can imagine.  Common APIs like this, still missing some 20 years after Java first appeared is pretty depressing.  I’ve rolled so many versions of this stuff into various applications in various places that it seems like old hat.  Would be great to have a name for something that is entirely about different pieces of software interacting with a common interface.

Gregg Wonderly

> On Jan 15, 2015, at 5:50 PM, Doug Lea <[hidden email]> wrote:
>
> On 01/15/2015 05:59 PM, Remi Forax wrote:
>> I think it's too soon to try to include an API like this in JDK9.
>> Currently, everybody seems to think it can come with a better API for a reactive
>> framework.
>
> Where a lot of those everybodies were involved in formulating
> these APIs. See the list at http://www.reactive-streams.org/
>
>> I think it's better to wait and see.
>
> We waited. We saw :-)
>
> -Doug
>
>
>>
>> Rémi
>>
>> On 01/15/2015 06:25 PM, Doug Lea wrote:
>>>
>>> Here's the only set of candidates for new jdk9 j.u.c classes:
>>>
>>> As discussed a few months ago, there is no single best fluent
>>> async/parallel API. CompletableFuture/CompletionStage best supports
>>> continuation-style programming on futures, and java.util.stream best
>>> supports (multi-stage, possibly-parallel) "pull" style operations on
>>> the elements of collections. Until now, one missing category was
>>> "push" style operations on items as they become available from an
>>> active source. We are not alone in wanting a standard way to support
>>> this. Over the past year, the "reactive-streams"
>>> (http://www.reactive-streams.org/) effort has been defining a minimal
>>> set of interfaces expressing commonalities and allowing
>>> interoperablility across frameworks (including Rx and Akka Play), that
>>> is nearing release. These interfaces include provisions for a simple
>>> form of async flow control allowing developers to address resource
>>> control issues that can otherwise cause problems in push-based
>>> systems. Supporting this mini-framework helps avoid unpleasant
>>> surprises possible when trying to use pull-style APIs for "hot"
>>> reactive sources (but conversely is not as good a choice as
>>> java.util.Stream for "cold" sources like collections).
>>>
>>> The four intertwined interfaces (Publisher, Subscriber, Subscription,
>>> Processor) are defined within the same class "Flow", that also
>>> includes the first of some planned support methods to establish and
>>> use Flow components, including tie-ins to java.util.streams and
>>> CompletableFutures. See
>>> http://gee.cs.oswego.edu/dl/jsr166/dist/docs/java/util/concurrent/Flow.html
>>>
>>> (Logistically, the only alternative to class Flow would have been
>>> to introduce a subpackage, which unnecessarily complicates usage. And
>>> "import static java.util.concurrent.Flow;" is about as convenient as
>>> "import java.util.concurrent.flow.*;" would be.)
>>>
>>> Also, the new stand-alone class SubmissionPublisher can serve as a
>>> bridge from various kinds of item producers to Flow components, and is
>>> useful in its own right. It is a form of ordered multicaster that
>>> people have complained that we don't support in j.u.c, but now do.
>>> See
>>> http://gee.cs.oswego.edu/dl/jsr166/dist/docs/java/util/concurrent/SubmissionPublisher.html
>>>
>>>
>>> Disclaimers: These are only candidates for inclusion.  The are in
>>> preliminary form and will change. But comments and suggestions would
>>> be welcome. As with the other candidate additions, if you are brave,
>>> you can try out snapshots on jdk8+ by getting
>>> http://gee.cs.oswego.edu/dl/jsr166/dist/jsr166.jar
>>> and running java -Xbootclasspath/p:jsr166.jar
>>>
>>> -Doug
>>>
>>> _______________________________________________
>>> Concurrency-interest mailing list
>>> [hidden email]
>>> http://cs.oswego.edu/mailman/listinfo/concurrency-interest
>>
>> _______________________________________________
>> Concurrency-interest mailing list
>> [hidden email]
>> http://cs.oswego.edu/mailman/listinfo/concurrency-interest
>>
>
>
>
> _______________________________________________
> Concurrency-interest mailing list
> [hidden email]
> http://cs.oswego.edu/mailman/listinfo/concurrency-interest


_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: jdk9 Candidate classes Flow and SubmissionPublisher

Remi Forax

On 01/17/2015 01:18 AM, Gregg Wonderly wrote:
> It’s always convenient to “wait and see”.  It’s always better to “do and adapt” when you are trying to actually accomplish something “better”.  While programming APIs can always be a problem once they are visible, the larger issue is the limitations that “nothing yet” imposes compared to “not complete”.  Of course a minimalist approach to API spec is always good early on, so that refinement is backward compatible for the steps that you can imagine.  Common APIs like this, still missing some 20 years after Java first appeared is pretty depressing.  I’ve rolled so many versions of this stuff into various applications in various places that it seems like old hat.  Would be great to have a name for something that is entirely about different pieces of software interacting with a common interface.
>
> Gregg Wonderly

I don't think the question is whether or not the reactive-streams API is
useful as a common interface
but more why this API has to be integrated in the JDK.

cheers,
Rémi

>
>> On Jan 15, 2015, at 5:50 PM, Doug Lea <[hidden email]> wrote:
>>
>> On 01/15/2015 05:59 PM, Remi Forax wrote:
>>> I think it's too soon to try to include an API like this in JDK9.
>>> Currently, everybody seems to think it can come with a better API for a reactive
>>> framework.
>> Where a lot of those everybodies were involved in formulating
>> these APIs. See the list at http://www.reactive-streams.org/
>>
>>> I think it's better to wait and see.
>> We waited. We saw :-)
>>
>> -Doug
>>
>>
>>> Rémi
>>>
>>> On 01/15/2015 06:25 PM, Doug Lea wrote:
>>>> Here's the only set of candidates for new jdk9 j.u.c classes:
>>>>
>>>> As discussed a few months ago, there is no single best fluent
>>>> async/parallel API. CompletableFuture/CompletionStage best supports
>>>> continuation-style programming on futures, and java.util.stream best
>>>> supports (multi-stage, possibly-parallel) "pull" style operations on
>>>> the elements of collections. Until now, one missing category was
>>>> "push" style operations on items as they become available from an
>>>> active source. We are not alone in wanting a standard way to support
>>>> this. Over the past year, the "reactive-streams"
>>>> (http://www.reactive-streams.org/) effort has been defining a minimal
>>>> set of interfaces expressing commonalities and allowing
>>>> interoperablility across frameworks (including Rx and Akka Play), that
>>>> is nearing release. These interfaces include provisions for a simple
>>>> form of async flow control allowing developers to address resource
>>>> control issues that can otherwise cause problems in push-based
>>>> systems. Supporting this mini-framework helps avoid unpleasant
>>>> surprises possible when trying to use pull-style APIs for "hot"
>>>> reactive sources (but conversely is not as good a choice as
>>>> java.util.Stream for "cold" sources like collections).
>>>>
>>>> The four intertwined interfaces (Publisher, Subscriber, Subscription,
>>>> Processor) are defined within the same class "Flow", that also
>>>> includes the first of some planned support methods to establish and
>>>> use Flow components, including tie-ins to java.util.streams and
>>>> CompletableFutures. See
>>>> http://gee.cs.oswego.edu/dl/jsr166/dist/docs/java/util/concurrent/Flow.html
>>>>
>>>> (Logistically, the only alternative to class Flow would have been
>>>> to introduce a subpackage, which unnecessarily complicates usage. And
>>>> "import static java.util.concurrent.Flow;" is about as convenient as
>>>> "import java.util.concurrent.flow.*;" would be.)
>>>>
>>>> Also, the new stand-alone class SubmissionPublisher can serve as a
>>>> bridge from various kinds of item producers to Flow components, and is
>>>> useful in its own right. It is a form of ordered multicaster that
>>>> people have complained that we don't support in j.u.c, but now do.
>>>> See
>>>> http://gee.cs.oswego.edu/dl/jsr166/dist/docs/java/util/concurrent/SubmissionPublisher.html
>>>>
>>>>
>>>> Disclaimers: These are only candidates for inclusion.  The are in
>>>> preliminary form and will change. But comments and suggestions would
>>>> be welcome. As with the other candidate additions, if you are brave,
>>>> you can try out snapshots on jdk8+ by getting
>>>> http://gee.cs.oswego.edu/dl/jsr166/dist/jsr166.jar
>>>> and running java -Xbootclasspath/p:jsr166.jar
>>>>
>>>> -Doug
>>>>
>>>> _______________________________________________
>>>> Concurrency-interest mailing list
>>>> [hidden email]
>>>> http://cs.oswego.edu/mailman/listinfo/concurrency-interest
>>> _______________________________________________
>>> Concurrency-interest mailing list
>>> [hidden email]
>>> http://cs.oswego.edu/mailman/listinfo/concurrency-interest
>>>
>>
>>
>> _______________________________________________
>> Concurrency-interest mailing list
>> [hidden email]
>> http://cs.oswego.edu/mailman/listinfo/concurrency-interest
>
> _______________________________________________
> Concurrency-interest mailing list
> [hidden email]
> http://cs.oswego.edu/mailman/listinfo/concurrency-interest

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: jdk9 Candidate classes Flow and SubmissionPublisher

Remi Forax
In reply to this post by Viktor Klang

On 01/16/2015 04:53 PM, Viktor Klang wrote:
Hi Rémi,

Thanks for the great questions
(answering inline)

On Fri, Jan 16, 2015 at 4:03 PM, Remi Forax <[hidden email]> wrote:
Viktor, Doug,

I see at least 3 changes that in my opinion should be done on the current API.
- Subscriber.onSubscribe and onNext/onError/onComplete should not be part of the same interface.

Unfortunately that did not pan out (we've did a lot of research/PoCs, I'd be surprised if we have missed any permutation) for the reason that failure to create a Subscription due to a failure in the Publisher requires the possibility of issuing onError without a preceding onSubscribe.

This seems plainly wrong to me, it means that you have no way to distinguish between a rejection of the subscription and an error in the process. So how can you write something useful in onError given that at that point you have no way to distinguish those two kinds of error anymore.




We are also considering allowing fast completion (avoiding having to call onSubscribe if the stream is already completed).
 
  This will make the workflow more explicit and avoid to have the subscription object mutable

The Subscription need not be mutable right now, did you mean Subscriber?

yes, the field that store the subscription is mutable which is a receipt for disaster because a Publisher like the SubmissionPublisher will expose the Subscriber but accessing the field that store the Subscription will lead to a race condition and an unsafe publication.

In my opinion, the Subscription object play the same role as a Future, you can control the observer from the inside or from the outside. So this object should be created before try to do any async work otherwise you mose the burden to access the Subscription object to the user instead of to the framework.

 

- onComplete and onError should be one method and not two,
  let's call it onComplete(Throwable throwableOrNull).
  I think it's better two says that you have to way to complete, either normally or with an exception.
  And from my experience, the code of onError and onComplete tend to be fairly similar.

That was an interesting suggestion & discussion we had quite early (1+ year ago), but we decided both that `null`-checking as being an anti-pattern, as well as mixing failure management code with success-management code is brittle and mixes concerns.
Also, a valid follow-on question was: where do you stop? onNext(Either<Throwable,T> elemOrCompleteOrError)? Function<Either<Throwable, T>, Void>? Function<Either<Subscription, Either<Throwable, T>>, Void>?

So it was deemed that having a distinct method not only made it easier to avoid NPEs, made it easy to separate the concerns in the implementations, easier to find what you were looking for in the javadoc, the spec etc.

I don't ask to mix onNext and onError (or onNext, onError, onComplete) but only onError and onComplete because from my own experience the code you write here is fairly similar.
Agree that null-checking is not great, but we have Optional now so onComplete(Optional<Throwable>) is IMO fine.

This remember me a point I've forgotten to mention, onError should not take a Throwable as parameter,
it means that if a java.lang.Error is raised like OutOfMemoryError instead of bailing out, the API suggest to try to recover on this kind of error, worst most of the time, the Error will be propagated to all the Subscriber along the chain, each of them trying to do something that may require to allocate more memory. 

 
   
- onComplete(Throwable) should be a default method (the implementation do nothing),
  so Observer and Consumer will be equivalent in term of function type.

Since the target is JDK9 onNext, onComplete and onError can definitely be default methods, the reason why they aren't currently in the RS spec is because we started before Java 8 shipped and keeping Java6-7 compatibility made sense.

Java6 compatibility make sense for the reactive-streams API not for the j.u.c.Flow API.

 

  Re-using the example of Doug, 

  long requestSize = ...
  Observer<T> observer = consumer::accept;   // auto-conversion from a Consumer
  publisher.subscribe(subscription -> {
      long half  = requestSize / 2; // re-request when half consumed
      subscription.request(half);
      return new Observer<T>() {
          private long count = half;
          public void onNext(T item) {
              if (--count <= 0) {
                  subscription.request(count = half);
              }
              observer.onNext(item);
          }
          public void onComplete(Throwable ex) {
            observer.onComplete(ex);
          }
      };
  });

  Note that in term of implementation you can still have an implementation that implements both
  Subscriber and Observer with onSubscribe() returning this.

and I'm still not convince that this should be integrated in JDK.

I hope to be able to address those concerns :)
Anything that I missed to reply to?

Why the reactive-streams API has to be integrated in JDK9,
the only compelling reason I see is the one given by Doug, people will try to use Stream for that otherwise.

In my opinion, integrated the reactive-streams API is a kind of loose-loose deal,
from the reactive-streams API, it will be set in stone so no v2 that overcome the shortcoming of the v1,
from the JDK perspective, having to integrated a sub-optimal API because the API has to be Java 6 compatible,
knowing that we will have to maintain forever an API that has changed several times in the past few years.

Rémi

 

Rémi


On 01/16/2015 01:59 PM, Viktor Klang wrote:
Hi Rémi,

I'll try to address your concerns below:

On Fri, Jan 16, 2015 at 1:48 AM, Remi Forax <[hidden email]> wrote:


On 01/16/2015 12:50 AM, Doug Lea wrote:
On 01/15/2015 05:59 PM, Remi Forax wrote:
I think it's too soon to try to include an API like this in JDK9.
Currently, everybody seems to think it can come with a better API for a reactive
framework.

Where a lot of those everybodies were involved in formulating
these APIs. See the list at http://www.reactive-streams.org/

yes, I know, having a lot of people hammering the API is great thing,
but why do you want to include that in the JDK, 
it's better for the API to stay out of the JDK because the release cycle is faster, 
it's better for the JDK because the API is still moving.

The RS interfaces and method signatures has not seen an update in a long time and is considered done (RS is in 1.0.0.RC1 with an RC2 shipped soon with spec clarifications and improvements to the TCK and 1.0.0.final is expected shortly thereafter).
 



I think it's better to wait and see.

We waited. We saw :-)

I heard about Rx in 2009, and since that I have observed two things,
the use cases have changed, the APIs have changed, even the manifesto has changed recently.
This is not a bad thing, it means the community around the reactive stuff is vibrant so
why do you want to stop that rush of cool ideas.

RS is specifically for interop—a common vocabulary and semantics to build asynchronous, non-blocking streaming transformation with non-blocking back pressure. So from an API perspective it is targeted towards library developers, just as most of the things in java.util.concurrent. End user APIs are provided on top, through implementations like SubmissionPublisher or RxJava, Project Reactor, Akka Streams, Ratpack etc.
 

And, if you think you want to freeze the API, I think a JSR is a better vehicle.

Given the the target and scope of RS, it seems to me that it fits nicely within the scope of JSR166? 
There is a spec, 4 pure interfaces with a sum total of 7 methods and a TCK to validate implementations: https://github.com/reactive-streams/reactive-streams



-Doug

Rémi





Rémi

On 01/15/2015 06:25 PM, Doug Lea wrote:

Here's the only set of candidates for new jdk9 j.u.c classes:

As discussed a few months ago, there is no single best fluent
async/parallel API. CompletableFuture/CompletionStage best supports
continuation-style programming on futures, and java.util.stream best
supports (multi-stage, possibly-parallel) "pull" style operations on
the elements of collections. Until now, one missing category was
"push" style operations on items as they become available from an
active source. We are not alone in wanting a standard way to support
this. Over the past year, the "reactive-streams"
(http://www.reactive-streams.org/) effort has been defining a minimal
set of interfaces expressing commonalities and allowing
interoperablility across frameworks (including Rx and Akka Play), that
is nearing release. These interfaces include provisions for a simple
form of async flow control allowing developers to address resource
control issues that can otherwise cause problems in push-based
systems. Supporting this mini-framework helps avoid unpleasant
surprises possible when trying to use pull-style APIs for "hot"
reactive sources (but conversely is not as good a choice as
java.util.Stream for "cold" sources like collections).

The four intertwined interfaces (Publisher, Subscriber, Subscription,
Processor) are defined within the same class "Flow", that also
includes the first of some planned support methods to establish and
use Flow components, including tie-ins to java.util.streams and
CompletableFutures. See
http://gee.cs.oswego.edu/dl/jsr166/dist/docs/java/util/concurrent/Flow.html

(Logistically, the only alternative to class Flow would have been
to introduce a subpackage, which unnecessarily complicates usage. And
"import static java.util.concurrent.Flow;" is about as convenient as
"import java.util.concurrent.flow.*;" would be.)

Also, the new stand-alone class SubmissionPublisher can serve as a
bridge from various kinds of item producers to Flow components, and is
useful in its own right. It is a form of ordered multicaster that
people have complained that we don't support in j.u.c, but now do.
See
http://gee.cs.oswego.edu/dl/jsr166/dist/docs/java/util/concurrent/SubmissionPublisher.html


Disclaimers: These are only candidates for inclusion.  The are in
preliminary form and will change. But comments and suggestions would
be welcome. As with the other candidate additions, if you are brave,
you can try out snapshots on jdk8+ by getting
  http://gee.cs.oswego.edu/dl/jsr166/dist/jsr166.jar
and running java -Xbootclasspath/p:jsr166.jar

-Doug

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest




_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest



--
Cheers,




--
Cheers,


_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: jdk9 Candidate classes Flow and SubmissionPublisher

Doug Lea
On 01/17/2015 06:44 AM, Remi Forax wrote:

>> Anything that I missed to reply to?
>
> Why the reactive-streams API has to be integrated in JDK9,
> the only compelling reason I see is the one given by Doug, people will try to
> use Stream for that otherwise.

This is one of several reasons, all along the lines of:
The primary mission of j.u,c is to provide efficient, well-specified,
standardized well-tested  basic concurrency support components.
If we don't introduce some pubsub-style interfaces, then we cannot
provide components that meet requirements of a growing segment
of our primary audience of infrastructure and middleware developers
(and occasionally wider audiences). For example SubmissionPublisher
is the kind of component that several people building async systems
have complained over the past few years that we do not provide.
(The situation is somewhat analogous to the introduction of
CompletionStage/CompletableFuture.)

The particular choice of Flow/reactive-stream interfaces is the
best way I know to carry this out. The interoperable APIs are bland
and awkward enough that many people will build on top of them for
application-programmer-visible usages. We can/should include a
few of these, like the Flow.consume and Flow.stream methods,
but don't have any kind of world-domination plan for reactive
middleware systems (or any other middleware systems. We just
provide support to them, and love them all equally.)

While I'm at it: One set of uncertainties is whether/how to
provide Flow components based on async IO. We mostly stay
out of the IO business in j.u.c, in part because concurrent
IO usages may use any of the many java.io/nio APIs and
policies surrounding them just to configure usage.  For example,
it would be possible (and not very hard) to add a utility to
use a Socket (or AsynchronousSocketChannel) as a Publisher.
But doing so would require mechanisms/policies for pooling
ByteBuffers, dealing with socket options, and so on. One possibility
(in the spirit of methods like Flow.consume) is to offer a
few simple ones, that can also serve as a model for others.

-Doug

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: jdk9 Candidate classes Flow and SubmissionPublisher

Viktor Klang
In reply to this post by Remi Forax
Hi Rémi,

Thanks for your reply, I'm sorry my response got delayed. (answers inline)

On Sat, Jan 17, 2015 at 12:44 PM, Remi Forax <[hidden email]> wrote:

On 01/16/2015 04:53 PM, Viktor Klang wrote:
Hi Rémi,

Thanks for the great questions
(answering inline)

On Fri, Jan 16, 2015 at 4:03 PM, Remi Forax <[hidden email]> wrote:
Viktor, Doug,

I see at least 3 changes that in my opinion should be done on the current API.
- Subscriber.onSubscribe and onNext/onError/onComplete should not be part of the same interface.

Unfortunately that did not pan out (we've did a lot of research/PoCs, I'd be surprised if we have missed any permutation) for the reason that failure to create a Subscription due to a failure in the Publisher requires the possibility of issuing onError without a preceding onSubscribe.

This seems plainly wrong to me, it means that you have no way to distinguish between a rejection of the subscription and an error in the process. So how can you write something useful in onError given that at that point you have no way to distinguish those two kinds of error anymore.

First of all, let me assure you that the RS spec has been thoughtfully and thoroughly put together—there's been a lot of time and hard work going into the semantics of all of the interfaces and methods.

RS Subscriber.onError is primarily for fail-fast cleanup, as such there is not a need for being able to distinguish the difference between subscription rejection (which can occur at any time) and "error in the process". I'd consider not leaking resources as a very useful thing, especially for building robust applications.

The ability to programmatically distinguish between subscription rejection and "error in the process" has "lo'-to-no" utility as they both have the exact same consequences for the Subscriber instance. Capabilities are slightly different for Processor (see spec).






We are also considering allowing fast completion (avoiding having to call onSubscribe if the stream is already completed).
 
  This will make the workflow more explicit and avoid to have the subscription object mutable

The Subscription need not be mutable right now, did you mean Subscriber?

yes, the field that store the subscription is mutable which is a receipt for disaster because a Publisher like the SubmissionPublisher will expose the Subscriber but accessing the field that store the Subscription will lead to a race condition and an unsafe publication.

A Publisher does not expose a Subscriber (`subscribe` returns void). A Subscription can have a final-field to the Subscriber and then subsequently publish itself to the Subscriber safely:

val s = new WhateverSubscription(someSubscriber)
someSubscriber.onSubscribe(s)
 

In my opinion, the Subscription object play the same role as a Future, you can control the observer from the inside or from the outside.

As per RS spec, it is to be used by the Subscriber.
 
So this object should be created before try to do any async work otherwise you mose the burden to access the Subscription object to the user instead of to the framework.

Referring to my previous snippet/example—the Subscription is created before. In RS the Subscription is the association between a Publisher and a Subscriber, it is not to be exposed to the outside.
 


 

- onComplete and onError should be one method and not two,
  let's call it onComplete(Throwable throwableOrNull).
  I think it's better two says that you have to way to complete, either normally or with an exception.
  And from my experience, the code of onError and onComplete tend to be fairly similar.

That was an interesting suggestion & discussion we had quite early (1+ year ago), but we decided both that `null`-checking as being an anti-pattern, as well as mixing failure management code with success-management code is brittle and mixes concerns.
Also, a valid follow-on question was: where do you stop? onNext(Either<Throwable,T> elemOrCompleteOrError)? Function<Either<Throwable, T>, Void>? Function<Either<Subscription, Either<Throwable, T>>, Void>?

So it was deemed that having a distinct method not only made it easier to avoid NPEs, made it easy to separate the concerns in the implementations, easier to find what you were looking for in the javadoc, the spec etc.

I don't ask to mix onNext and onError (or onNext, onError, onComplete) but only onError and onComplete because from my own experience the code you write here is fairly similar.

In my experience it is quite different (just as the code I write in a catch block typically looks different from the code I write in a try block).

A very important fact is that the semantics for onError and onComplete are quite dissimilar: onError is an ASAP signal, and onComplete is an ALAP signal.
 
Agree that null-checking is not great, but we have Optional now so onComplete(Optional<Throwable>) is IMO fine.


This suggestion is definitely better, but does not address the core problem of mixing concerns and dissimilar semantics, do you agree?
 
This remember me a point I've forgotten to mention, onError should not take a Throwable as parameter,
it means that if a java.lang.Error is raised like OutOfMemoryError instead of bailing out, the API suggest to try to recover on this kind of error, worst most of the time, the Error will be propagated to all the Subscriber along the chain, each of them trying to do something that may require to allocate more memory. 

(onError is not about error recovery, it is about fail-fast cleanup of resources.)

Whether or not a Publisher implementation catches OOME (which it typically shouldn't (IMO) as per: "An Error is a subclass of Throwable that indicates serious problems that a reasonable application should not try to catch." - https://docs.oracle.com/javase/6/docs/api/java/lang/Error.html)
is not really about RS at all—the same rules apply as whenever a try-catch is written in any piece of logic.

The reason for onError to take a Throwable rather than an Exception is: there exist Throwables that aren't Errors and aren't Exceptions that Publishers would want to be able to signal downstream, and enforcing onError to take an Exception rather than Throwable wouldn't solve anything, as a Publisher who would like to pass down an OOME would just pass it down as the cause of a wrapper Exception.

Besides that, there is quite a bit of precedence for having Throwables in parameters:

(For even more prior art, search for "with parameters of type Throwable" in http://docs.oracle.com/javase/8/docs/api/java/lang/class-use/Throwable.html)


 
   
- onComplete(Throwable) should be a default method (the implementation do nothing),
  so Observer and Consumer will be equivalent in term of function type.

Since the target is JDK9 onNext, onComplete and onError can definitely be default methods, the reason why they aren't currently in the RS spec is because we started before Java 8 shipped and keeping Java6-7 compatibility made sense.

Java6 compatibility make sense for the reactive-streams API not for the j.u.c.Flow API.

Default methods would not really add value to the RS interfaces (there isn't AFAICT anything that they could do by default) and since jdk9 would be compiled with at least Java 8 classfile format version, JDK 6-7 would still need to use a backport jar, just as jsr166 uses if you want to use FJP on pre JDK7 meaning that this [the compatibility argument] is not a problem for RS.
 


 

  Re-using the example of Doug, 

  long requestSize = ...
  Observer<T> observer = consumer::accept;   // auto-conversion from a Consumer
  publisher.subscribe(subscription -> {
      long half  = requestSize / 2; // re-request when half consumed
      subscription.request(half);
      return new Observer<T>() {
          private long count = half;
          public void onNext(T item) {
              if (--count <= 0) {
                  subscription.request(count = half);
              }
              observer.onNext(item);
          }
          public void onComplete(Throwable ex) {
            observer.onComplete(ex);
          }
      };
  });

  Note that in term of implementation you can still have an implementation that implements both
  Subscriber and Observer with onSubscribe() returning this.

and I'm still not convince that this should be integrated in JDK.

I hope to be able to address those concerns :)
Anything that I missed to reply to?

Why the reactive-streams API has to be integrated in JDK9,
the only compelling reason I see is the one given by Doug, people will try to use Stream for that otherwise.

For Doug's compelling reasons, but also as a way for library writers and advanced users to integrate with (the most common users of j.u.c, to have a common platform for interop in the JDK).
(see Greggs comment earlier)
 

In my opinion, integrated the reactive-streams API is a kind of loose-loose deal,
from the reactive-streams API, it will be set in stone so no v2 that overcome the shortcoming of the v1,

This is not an issue—RS has been developed with this in mind.
 
from the JDK perspective, having to integrated a sub-optimal API because the API has to be Java 6 compatible,

This is not an issue either—the RS as in j.u.c.Flow would not need to be Java 6 compatible since it's easy to keep a compat jar.
 
knowing that we will have to maintain forever an API that has changed several times in the past few years.

I'm not sure we're talking about the same thing here, are you talking about Rx and not RS?

Since the Reactive Streams effort started, the -only- API change to Publisher/Subscriber/Subscription/Processor has been switching from `int` to `long` for `Subscription.request`. (see: https://github.com/reactive-streams/reactive-streams/blob/e58fed62249ad6fbd36467d1bbe5c486f31a8c0e/spi/src/main/scala/asyncrx/spi/Publisher.scala)

Fortunately (well, to be honest, by design) the RS API is interfaces only—with a spec, a TCK and a set of example implementations that are not only documented but also of course pass the TCK.

With that in mind, I cannot imagine anything more easy to maintain besides not maintaining it at all, can you?
 

Rémi


 

Rémi


On 01/16/2015 01:59 PM, Viktor Klang wrote:
Hi Rémi,

I'll try to address your concerns below:

On Fri, Jan 16, 2015 at 1:48 AM, Remi Forax <[hidden email]> wrote:


On 01/16/2015 12:50 AM, Doug Lea wrote:
On 01/15/2015 05:59 PM, Remi Forax wrote:
I think it's too soon to try to include an API like this in JDK9.
Currently, everybody seems to think it can come with a better API for a reactive
framework.

Where a lot of those everybodies were involved in formulating
these APIs. See the list at http://www.reactive-streams.org/

yes, I know, having a lot of people hammering the API is great thing,
but why do you want to include that in the JDK, 
it's better for the API to stay out of the JDK because the release cycle is faster, 
it's better for the JDK because the API is still moving.

The RS interfaces and method signatures has not seen an update in a long time and is considered done (RS is in 1.0.0.RC1 with an RC2 shipped soon with spec clarifications and improvements to the TCK and 1.0.0.final is expected shortly thereafter).
 



I think it's better to wait and see.

We waited. We saw :-)

I heard about Rx in 2009, and since that I have observed two things,
the use cases have changed, the APIs have changed, even the manifesto has changed recently.
This is not a bad thing, it means the community around the reactive stuff is vibrant so
why do you want to stop that rush of cool ideas.

RS is specifically for interop—a common vocabulary and semantics to build asynchronous, non-blocking streaming transformation with non-blocking back pressure. So from an API perspective it is targeted towards library developers, just as most of the things in java.util.concurrent. End user APIs are provided on top, through implementations like SubmissionPublisher or RxJava, Project Reactor, Akka Streams, Ratpack etc.
 

And, if you think you want to freeze the API, I think a JSR is a better vehicle.

Given the the target and scope of RS, it seems to me that it fits nicely within the scope of JSR166? 
There is a spec, 4 pure interfaces with a sum total of 7 methods and a TCK to validate implementations: https://github.com/reactive-streams/reactive-streams



-Doug

Rémi





Rémi

On 01/15/2015 06:25 PM, Doug Lea wrote:

Here's the only set of candidates for new jdk9 j.u.c classes:

As discussed a few months ago, there is no single best fluent
async/parallel API. CompletableFuture/CompletionStage best supports
continuation-style programming on futures, and java.util.stream best
supports (multi-stage, possibly-parallel) "pull" style operations on
the elements of collections. Until now, one missing category was
"push" style operations on items as they become available from an
active source. We are not alone in wanting a standard way to support
this. Over the past year, the "reactive-streams"
(http://www.reactive-streams.org/) effort has been defining a minimal
set of interfaces expressing commonalities and allowing
interoperablility across frameworks (including Rx and Akka Play), that
is nearing release. These interfaces include provisions for a simple
form of async flow control allowing developers to address resource
control issues that can otherwise cause problems in push-based
systems. Supporting this mini-framework helps avoid unpleasant
surprises possible when trying to use pull-style APIs for "hot"
reactive sources (but conversely is not as good a choice as
java.util.Stream for "cold" sources like collections).

The four intertwined interfaces (Publisher, Subscriber, Subscription,
Processor) are defined within the same class "Flow", that also
includes the first of some planned support methods to establish and
use Flow components, including tie-ins to java.util.streams and
CompletableFutures. See
http://gee.cs.oswego.edu/dl/jsr166/dist/docs/java/util/concurrent/Flow.html

(Logistically, the only alternative to class Flow would have been
to introduce a subpackage, which unnecessarily complicates usage. And
"import static java.util.concurrent.Flow;" is about as convenient as
"import java.util.concurrent.flow.*;" would be.)

Also, the new stand-alone class SubmissionPublisher can serve as a
bridge from various kinds of item producers to Flow components, and is
useful in its own right. It is a form of ordered multicaster that
people have complained that we don't support in j.u.c, but now do.
See
http://gee.cs.oswego.edu/dl/jsr166/dist/docs/java/util/concurrent/SubmissionPublisher.html


Disclaimers: These are only candidates for inclusion.  The are in
preliminary form and will change. But comments and suggestions would
be welcome. As with the other candidate additions, if you are brave,
you can try out snapshots on jdk8+ by getting
  http://gee.cs.oswego.edu/dl/jsr166/dist/jsr166.jar
and running java -Xbootclasspath/p:jsr166.jar

-Doug

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest




_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest



--
Cheers,




--
Cheers,




--
Cheers,

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: jdk9 Candidate classes Flow and SubmissionPublisher

Doug Lea

I updated the documentation to address some of the questions
and concerns raised so far (please keep them coming) and
added some examples. See (also pasted below):
http://gee.cs.oswego.edu/dl/jsr166/dist/docs/java/util/concurrent/Flow.html
http://gee.cs.oswego.edu/dl/jsr166/dist/docs/java/util/concurrent/SubmissionPublisher.html

...

public final class Flow
extends Object

Interrelated interfaces and static methods for establishing flow-controlled
components in which Publishers produce items consumed by one or more
Subscribers, each managed by a Subscription.

These interfaces correspond to the reactive-streams specification. They apply in
both concurrent and distributed asynchronous settings: All (seven) methods are
defined in void "oneway" message style. Communication entails a simple form of
flow control (method Flow.Subscription.request(long)) that can be used to avoid
resource management problems that may otherwise occur in "push" based systems.

Examples. A Flow.Publisher usually defines its own Flow.Subscription
implementation; constructing one in method subscribe and issuing it to the
calling Flow.Subscriber. It publishes items to the subscriber asynchronously,
normally using an Executor. For example, here is a very simple publisher that
only issues (when requested) a single TRUE item to any subscriber. Because each
subscriber receives only the same single item, this class does not need the
buffering and ordering control required in most implementations (for example
SubmissionPublisher).


  class OneShotPublisher implements Publisher<Boolean> {
    final Executor executor = Executors.newSingleThreadExecutor();
    public void subscribe(Subscriber<? super Boolean> subscriber) {
        subscriber.onSubscribe(new OneShotSubscription(subscriber, executor));
    }
    static class OneShotSubscription implements Subscription {
      final Subscriber<? super Boolean> subscriber;
      final Executor executor;
      boolean completed;
      OneShotSubscription(Subscriber<? super Boolean> subscriber,
                          Executor executor) {
        this.subscriber = subscriber;
        this.executor = executor;
      }
      public synchronized void request(long n) {
        if (n > 0 && !completed) {
          completed = true;
          executor.execute(() -> {
                    subscriber.onNext(Boolean.TRUE);
                    subscriber.onComplete();
                });
        }
        else if (n < 0) {
          completed = true;
          subscriber.onError(new IllegalArgumentException());
          }
      }
      public synchronized void cancel() { completed = true; }
    }
  }

A Flow.Subscriber arranges that items be requested and processed. Items
(invocations of Flow.Subscriber.onNext(T)) are not issued unless requested, but
multiple items may be requested. Many Subscriber implementations can arrange
this in the style of the following example, where a buffer size of 1
single-steps, and larger sizes usually allow for more efficient overlapped
processing with less communication; for example with a value of 64, this keeps
total outstanding requests between 32 and 64. (See also consume(long, Publisher,
Consumer) that automates a common case.) Notice that because Subscriber method
invocations for a given Flow.Subscription are strictly ordered, there is no need
for these methods to use locks or volatiles unless a Subscriber maintains
multiple Subscriptions (in which case is it preferable to instead define
multiple Subscribers, each with its own Subscription).


  class SampleSubscriber<T> implements Subscriber<T> {
    final Consumer<? super T> consumer;
    Subscription subscription;
    final long bufferSize;
    long count;
    SampleSubscriber(long bufferSize, Consumer<? super T> consumer) {
      this.bufferSize = bufferSize;
      this.consumer = consumer;
    }
    public void onSubscribe(Subscription subscription) {
      (this.subscription = subscription).request(bufferSize);
      count = bufferSize - bufferSize / 2; // re-request when half consumed
    }
    public void onNext(T item) {
      if (--count <= 0)
        subscription.request(count = bufferSize - bufferSize / 2);
      consumer.accept(item);
    }
    public void onError(Throwable ex) { ex.printStackTrace(); }
    public void onComplete() {}
  }

When flow control is not needed, a subscriber may initially request an
effectively unbounded number of items, as in:


  class UnboundedSubscriber<T> implements Subscriber<T> {
    public void onSubscribe(Subscription subscription) {
      subscription.request(Long.MAX_VALUE); // effectively unbounded
    }
    public void onNext(T item) { use(item); }
    public void onError(Throwable ex) { ex.printStackTrace(); }
    public void onComplete() {}
    void use(T item) { ... }
  }


===================================

public class SubmissionPublisher<T>
extends Object
implements Flow.Publisher<T>, AutoCloseable

A Flow.Publisher that asynchronously issues submitted items to current
subscribers until it is closed. Each current subscriber receives newly submitted
items in the same order unless drops or exceptions are encountered. Using a
SubmissionPublisher allows item generators to act as Publishers, although
without integrated flow control. Instead they rely on drop handling and/or blocking.

A SubmissionPublisher uses the Executor supplied in its constructor for delivery
to subscribers. The best choice of Executor depends on expected usage. If the
generator(s) of submitted items run in separate threads, and the number of
subscribers can be estimated, consider using a
Executors.newFixedThreadPool(int). Otherwise consider using a work-stealing pool
(including ForkJoinPool.commonPool()).

Buffering allows producers and consumers to transiently operate at different
rates. Each subscriber uses an independent buffer. Buffers are created upon
first use with a given initial capacity, and are resized as needed up to the
maximum. (Capacity arguments may be rounded up to powers of two.) Invocations of
Flow.Subscription.request(long) do not directly result in buffer expansion, but
risk saturation if unfulfilled requests exceed the maximum capacity. Choices of
buffer parameters rely on expected rates, resources, and usages, that usually
benefit from empirical testing. As first guesses, consider initial 8 and maximum
1024.

Publication methods support different policies about what to do when buffers are
saturated. Method submit(T) blocks until resources are available. This is
simplest, but least responsive. The offer methods may either immediately, or
with bounded timeout, drop items, but provide an opportunity to interpose a
handler and then retry.

If any Subscriber method throws an exception, its subscription is cancelled. If
the supplied Executor throws RejectedExecutionException (or any other
RuntimeException or Error) when attempting to execute a task, or a drop handler
throws an exception when processing a dropped item, then the exception is
rethrown. In these cases, some but not all subscribers may have received items.
It is usually good practice to closeExceptionally in these cases.

This class may also serve as a convenient base for subclasses that generate
items, and use the methods in this class to publish them. For example here is a
class that periodically publishes the items generated from a supplier. (In
practice you might add methods to independently start and stop generation, to
share schedulers among publishers, and so on, or instead use a
SubmissionPublisher as a component rather than a superclass.)


  class PeriodicPublisher<T> extends SubmissionPublisher<T> {
    final ScheduledFuture<?> periodicTask;
    final ScheduledExecutorService scheduler;
    PeriodicPublisher(Executor executor, int initialBufferCapacity,
                      int maxBufferCapacity, Supplier<? extends T> supplier,
                      long period, TimeUnit unit) {
      super(executor, initialBufferCapacity, maxBufferCapacity);
      scheduler = new ScheduledThreadPoolExecutor(1);
      periodicTask = scheduler.scheduleAtFixedRate(
        () -> submit(supplier.get()), 0, period, unit);
    }
    public void close() {
      periodicTask.cancel(false);
      scheduler.shutdown();
      super.close();
    }
  }

Here is an example of Flow.Processor subclass (using single-step requests to its
publisher, for simplicity of illustration):


  class TransformProcessor<S,T> extends SubmissionPublisher<T>
    implements Flow.Processor<S,T> {
    final Function<? super S, ? extends T> function;
    Flow.Subscription subscription;
    TransformProcessor(Executor executor, int initialBufferCapacity,
                       int maxBufferCapacity,
                       Function<? super S, ? extends T> function) {
      super(executor, initialBufferCapacity, maxBufferCapacity);
      this.function = function;
    }
    public void onSubscribe(Flow.Subscription subscription) {
      (this.subscription = subscription).request(1);
    }
    public void onNext(S item) {
      subscription.request(1);
      submit(function.apply(item));
    }
    public void onError(Throwable ex) { closeExceptionally(ex); }
    public void onComplete() { close(); }
  }

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
12