Quantcast

9b147: SubmissionPublisher.closeExceptionally doesn't call onSubscribe() in time

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

9b147: SubmissionPublisher.closeExceptionally doesn't call onSubscribe() in time

Dávid Karnok
I'm writing an interop library between RxJava 2 and JDK 9 Flow and run into an odd situation.

The following test passes:

    SubmissionPublisher<Integer> sp = new SubmissionPublisher<>();

TestSubscriber<Integer> ts = FlowInterop.fromFlowPublisher(sp)
.test();

sp.submit(1);
sp.submit(2);
sp.submit(3);
sp.submit(4);
sp.submit(5);

sp.close();

ts.awaitDone(5, TimeUnit.SECONDS)
.assertResult(1, 2, 3, 4, 5);


but when I call closeExceptionally() instead:

    SubmissionPublisher<Integer> sp = new SubmissionPublisher<>();

TestSubscriber<Integer> ts = FlowInterop.fromFlowPublisher(sp)
.test();

sp.submit(1);
sp.submit(2);
sp.submit(3);
sp.submit(4);
sp.submit(5);

sp.closeExceptionally(new IOException());


ts.awaitDone(5, TimeUnit.SECONDS)
.assertFailure(IOException.class, 1, 2, 3, 4, 5);

The test fails because there was no call to Flow.Subscriber.onSubscribe in this case before calling Flow.Subscriber.onError with the IOException:

Caused by: java.lang.NullPointerException: onSubscribe not called in proper order
at io.reactivex.subscribers.TestSubscriber.onError(TestSubscriber.java:221)
at hu.akarnokd.rxjava2.interop.FlowableFromFlowPublisher$FromFlowPublisherSubscriber.onError(FlowableFromFlowPublisher.java:62)
at java.base/java.util.concurrent.SubmissionPublisher$BufferedSubscription.checkControl(SubmissionPublisher.java:1463)
at java.base/java.util.concurrent.SubmissionPublisher$BufferedSubscription.consume(SubmissionPublisher.java:1421)
at java.base/java.util.concurrent.SubmissionPublisher$ConsumerTask.exec(SubmissionPublisher.java:919)
at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:283)
at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1575)
at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:158)

Sleeping before the call to closeExceptionally() seems to make it work.

The relevant converter looks like this:

final class FlowableFromFlowPublisher<T> extends Flowable<T> {

final Flow.Publisher<T> source;

FlowableFromFlowPublisher(Flow.Publisher<T> source) {
this.source = source;
}

@Override
protected void subscribeActual(org.reactivestreams.Subscriber<? super T> s) {
source.subscribe(new FromFlowPublisherSubscriber<>(s));
}

static final class FromFlowPublisherSubscriber<T> implements Flow.Subscriber<T>, org.reactivestreams.Subscription {

final org.reactivestreams.Subscriber<? super T> actual;

Flow.Subscription s;

FromFlowPublisherSubscriber(org.reactivestreams.Subscriber<? super T> actual) {
this.actual = actual;
}

@Override
public void onSubscribe(Flow.Subscription subscription) {
this.s = subscription;
actual.onSubscribe(this);
}

@Override
public void onNext(T t) {
actual.onNext(t);
}

@Override
public void onError(Throwable throwable) {
actual.onError(throwable);
}

@Override
public void onComplete() {
actual.onComplete();
}

@Override
public void request(long n) {
s.request(n);
}

@Override
public void cancel() {
s.cancel();
}
}
}
It looks like if there is a closeExceptionally() call, the protocol is not held and onError is called without calling onSubscribe.

--
Best regards,
David Karnok

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: 9b147: SubmissionPublisher.closeExceptionally doesn't call onSubscribe() in time

Viktor Klang
Nice catch, Dávid!

On Mon, Dec 5, 2016 at 10:03 AM, Dávid Karnok <[hidden email]> wrote:
I'm writing an interop library between RxJava 2 and JDK 9 Flow and run into an odd situation.

The following test passes:

    SubmissionPublisher<Integer> sp = new SubmissionPublisher<>();

TestSubscriber<Integer> ts = FlowInterop.fromFlowPublisher(sp)
.test();

sp.submit(1);
sp.submit(2);
sp.submit(3);
sp.submit(4);
sp.submit(5);

sp.close();

ts.awaitDone(5, TimeUnit.SECONDS)
.assertResult(1, 2, 3, 4, 5);


but when I call closeExceptionally() instead:

    SubmissionPublisher<Integer> sp = new SubmissionPublisher<>();

TestSubscriber<Integer> ts = FlowInterop.fromFlowPublisher(sp)
.test();

sp.submit(1);
sp.submit(2);
sp.submit(3);
sp.submit(4);
sp.submit(5);

sp.closeExceptionally(new IOException());


ts.awaitDone(5, TimeUnit.SECONDS)
.assertFailure(IOException.class, 1, 2, 3, 4, 5);

The test fails because there was no call to Flow.Subscriber.onSubscribe in this case before calling Flow.Subscriber.onError with the IOException:

Caused by: java.lang.NullPointerException: onSubscribe not called in proper order
at io.reactivex.subscribers.TestSubscriber.onError(TestSubscriber.java:221)
at hu.akarnokd.rxjava2.interop.FlowableFromFlowPublisher$FromFlowPublisherSubscriber.onError(FlowableFromFlowPublisher.java:62)
at java.base/java.util.concurrent.SubmissionPublisher$BufferedSubscription.checkControl(SubmissionPublisher.java:1463)
at java.base/java.util.concurrent.SubmissionPublisher$BufferedSubscription.consume(SubmissionPublisher.java:1421)
at java.base/java.util.concurrent.SubmissionPublisher$ConsumerTask.exec(SubmissionPublisher.java:919)
at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:283)
at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1575)
at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:158)

Sleeping before the call to closeExceptionally() seems to make it work.

The relevant converter looks like this:

final class FlowableFromFlowPublisher<T> extends Flowable<T> {

final Flow.Publisher<T> source;

FlowableFromFlowPublisher(Flow.Publisher<T> source) {
this.source = source;
}

@Override
protected void subscribeActual(org.reactivestreams.Subscriber<? super T> s) {
source.subscribe(new FromFlowPublisherSubscriber<>(s));
}

static final class FromFlowPublisherSubscriber<T> implements Flow.Subscriber<T>, org.reactivestreams.Subscription {

final org.reactivestreams.Subscriber<? super T> actual;

Flow.Subscription s;

FromFlowPublisherSubscriber(org.reactivestreams.Subscriber<? super T> actual) {
this.actual = actual;
}

@Override
public void onSubscribe(Flow.Subscription subscription) {
this.s = subscription;
actual.onSubscribe(this);
}

@Override
public void onNext(T t) {
actual.onNext(t);
}

@Override
public void onError(Throwable throwable) {
actual.onError(throwable);
}

@Override
public void onComplete() {
actual.onComplete();
}

@Override
public void request(long n) {
s.request(n);
}

@Override
public void cancel() {
s.cancel();
}
}
}
It looks like if there is a closeExceptionally() call, the protocol is not held and onError is called without calling onSubscribe.

--
Best regards,
David Karnok

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest




--
Cheers,

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: 9b147: SubmissionPublisher.closeExceptionally doesn't call onSubscribe() in time

Doug Lea
In reply to this post by Dávid Karnok

Thanks! I agree that this it was inconsistent to give onError
precedence over onSubscribe. It is now changed to work in the
same way across close and closeExceptionally. (It will take a
while to get this into jdk9 builds though.)

-Doug


On 12/05/2016 04:03 AM, Dávid Karnok wrote:

> I'm writing an interop library between RxJava 2 and JDK 9 Flow and run
> into an odd situation.
>
> The following test passes:
>
>     SubmissionPublisher<Integer> sp = new SubmissionPublisher<>();
>
>     TestSubscriber<Integer> ts = FlowInterop.fromFlowPublisher(sp)
>     .test();
>
>     sp.submit(1);
>     sp.submit(2);
>     sp.submit(3);
>     sp.submit(4);
>     sp.submit(5);
>
>     sp.close();
>
>     ts.awaitDone(5, TimeUnit.SECONDS)
>         .assertResult(1, 2, 3, 4, 5);
>
>
>
> but when I call closeExceptionally() instead:
>
>     SubmissionPublisher<Integer> sp = new SubmissionPublisher<>();
>
>     TestSubscriber<Integer> ts = FlowInterop.fromFlowPublisher(sp)
>         .test();
>
>     sp.submit(1);
>     sp.submit(2);
>     sp.submit(3);
>     sp.submit(4);
>     sp.submit(5);
>
>     sp.closeExceptionally(new IOException());
>
>
>     ts.awaitDone(5, TimeUnit.SECONDS)
>       .assertFailure(IOException.class, 1, 2, 3, 4, 5);
>
>
> The test fails because there was no call to Flow.Subscriber.onSubscribe
> in this case before calling Flow.Subscriber.onError with the IOException:
>
> Caused by: java.lang.NullPointerException: onSubscribe not called in
> proper order
> at io.reactivex.subscribers.TestSubscriber.onError(TestSubscriber.java:221)
> at
> hu.akarnokd.rxjava2.interop.FlowableFromFlowPublisher$FromFlowPublisherSubscriber.onError(FlowableFromFlowPublisher.java:62)
> *at
> java.base/java.util.concurrent.SubmissionPublisher$BufferedSubscription.checkControl(SubmissionPublisher.java:1463)*
> *at
> java.base/java.util.concurrent.SubmissionPublisher$BufferedSubscription.consume(SubmissionPublisher.java:1421)*
> at
> java.base/java.util.concurrent.SubmissionPublisher$ConsumerTask.exec(SubmissionPublisher.java:919)
> at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:283)
> at
> java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1575)
> at
> java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:158)
>
> Sleeping before the call to closeExceptionally() seems to make it work.
>
> The relevant converter looks like this:
>
> final class FlowableFromFlowPublisher<T> extends Flowable<T> {
>
>     final Flow.Publisher<T> source;
>
>     FlowableFromFlowPublisher(Flow.Publisher<T> source) {
>         this.source = source;
>     }
>
>     @Override
> protected void subscribeActual(org.reactivestreams.Subscriber<? super T> s) {
>         source.subscribe(new FromFlowPublisherSubscriber<>(s));
>     }
>
>     static final class FromFlowPublisherSubscriber<T> implements Flow.Subscriber<T>, org.reactivestreams.Subscription {
>
>         final org.reactivestreams.Subscriber<? super T> actual;
>
>         Flow.Subscription s;
>
>         FromFlowPublisherSubscriber(org.reactivestreams.Subscriber<? super T> actual) {
>             this.actual = actual;
>         }
>
>         @Override
> public void onSubscribe(Flow.Subscription subscription) {
>             this.s = subscription;
>             actual.onSubscribe(this);
>         }
>
>         @Override
> public void onNext(T t) {
>             actual.onNext(t);
>         }
>
>         @Override
> public void onError(Throwable throwable) {
>             actual.onError(throwable);
>         }
>
>         @Override
> public void onComplete() {
>             actual.onComplete();
>         }
>
>         @Override
> public void request(long n) {
>             s.request(n);
>         }
>
>         @Override
> public void cancel() {
>             s.cancel();
>         }
>     }
> }
>
> It looks like if there is a closeExceptionally() call, the protocol is
> not held and onError is called without calling onSubscribe.
>
> (Repository: https://github.com/akarnokd/RxJava2Jdk9Interop )
> --
> Best regards,
> David Karnok
>
>
> _______________________________________________
> Concurrency-interest mailing list
> [hidden email]
> http://cs.oswego.edu/mailman/listinfo/concurrency-interest
>


_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Loading...