Flow/SubmissionPublisher review

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Flow/SubmissionPublisher review

Dávid Karnok
Hello. Since the class(es) and the JEPS have been updated recently, I'd like to review them again:

---------------


"Also, one method in class Flow might benefit from additional support in java.util.streams, but it is usable as-is."

I don't see any j.u.s.Stream-related methods in v1.28 @ http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/src/main/java/util/concurrent/Flow.java?view=markup or in SubmissionPublisher.

------------

Lines 89-90:

 *     periodicTask = scheduler.scheduleAtFixedRate(
 *       () -> submit(supplier.get()), 0, period, unit);

Correct me if I'm wrong, but this may leak 'this' from the constructor and submit may end up being called before the constructor sealed the final fields.

---------------

Line 830: consume()
Line 839: ConsumerSubscriber

If one cancels the CompletableFuture after the subscribe() returns but before an async submit() call, the ConsumerSubscriber does not cancel its subscription up until said submit call reaches the ConsumerSubscriber.onNext (if no submit() call happens, the SubmissionPublisher will leak whatever is captured through the Consumer instance). What I'd do is to call

status.whenComplete((v, e) -> subscription.cancel());

in onSubscribe() which should issue a cancel the moment status is completed manually.



--
Best regards,
David Karnok

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: Flow/SubmissionPublisher review

Paul Sandoz

On 24 Sep 2015, at 10:13, Dávid Karnok <[hidden email]> wrote:

Hello. Since the class(es) and the JEPS have been updated recently, I'd like to review them again:

---------------


"Also, one method in class Flow might benefit from additional support in java.util.streams, but it is usable as-is."

I don't see any j.u.s.Stream-related methods in v1.28 @ http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/src/main/java/util/concurrent/Flow.java?view=markup or in SubmissionPublisher.


The stream-based methods were removed because we were not satisfied with current implementation approaches. We might be able to do something better at some point in the future. Before the JEP is targeted i will update the dependency section.

Paul.

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest

signature.asc (858 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Flow/SubmissionPublisher review

Doug Lea
In reply to this post by Dávid Karnok
On 09/24/2015 04:13 AM, Dávid Karnok wrote:
> Hello. Since the class(es) and the JEPS have been updated recently, I'dlike to
> review them again:

Thanks!

>
> ---------------
>
> http://openjdk.java.net/jeps/266
>
> "Also, one method in class |Flow |might benefit from additional supportin
> |java.util.streams|, but it is usable as-is."
>
> I don't see any j.u.s.Stream-related methodsin v1.28 @

As Paul mentioned, these got triaged out at least in the near term.
Without further j.u.s.Stream integration, the implementation
options are either to wait for publisher to complete, or to
treat as an IO-like item-by-item blocking stream,
neither of which are what people normally want or expect.
We felt that it was better not to introduce this or
related method into jdk yet until something better can be done.

> Lines 89-90:
>
>   *     periodicTask = scheduler.scheduleAtFixedRate(
>   *       () -> submit(supplier.get()), 0, period, unit);
>
> Correct me if I'm wrong, but this may leak 'this' from the constructor and
> submit may end up being called before the constructor sealed the final fields.
>

I would ordinarily agree that an example that starts a service or
thread in a constructor is too delicate for a javadoc example.
But this usage is convenient and OK because of the intrinsic
properties of ScheduledThreadPoolExecutor (all writes before
first thread creation happen-before reads by that thread, etc).

> Line 830: consume() ...
> What I'd do is to call
>
> status.whenComplete((v, e) -> subscription.cancel());

Good idea; thanks!

-Doug



_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest