SubmissionPublisher closeExceptionally() may override close()

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

SubmissionPublisher closeExceptionally() may override close()

Dávid Karnok
Hi,

The following test fails (after a few rounds for me) because a concurrent call to close() and closeExceptionally() race for the terminal condition and different subscribers may see different terminal state:


Flow.Subscriber<Integer> createSub(Object[] result, int index, CountDownLatch cdl) {
return new Flow.Subscriber<Integer>() {
@Override
public void onSubscribe(Flow.Subscription subscription) {

}

@Override
public void onNext(Integer item) {

}

@Override
public void onError(Throwable throwable) {
result[index] = throwable;
cdl.countDown();
}

@Override
public void onComplete() {
result[index] = "complete";
cdl.countDown();
}
};
}

@Test
public void closeErrorRace() throws Exception {
ExecutorService exec = Executors.newSingleThreadExecutor();
try {
for (int i = 0; i < 1000; i++) {
System.out.println("Round " + i);
SubmissionPublisher<Integer> sp = new SubmissionPublisher<>();

CountDownLatch cdl = new CountDownLatch(2);

Object[] result = { null, null };

sp.subscribe(createSub(result, 0, cdl));

Flow.Subscriber<Integer> sb2 = createSub(result, 1, cdl);

Throwable ex = new RuntimeException();

AtomicInteger wip = new AtomicInteger(2);

Runnable r1 = () -> {
wip.decrementAndGet();
while (wip.get() != 0) ;

sp.closeExceptionally(ex);
sp.subscribe(sb2);
};

exec.submit(r1);

wip.decrementAndGet();
while (wip.get() != 0) ;

sp.close();

assertTrue(cdl.await(5, TimeUnit.SECONDS));

assertEquals(result[0], result[1]);
}
} finally {
exec.shutdownNow();
}
}
I'm assuming consistency is desirable here and the fix could be an if statement inside the synchronized block of closeExceptionally():

if (!closed) {
    BufferedSubscription b;
    synchronized (this) {
        b = clients;
        if (b == null) { // or if (closed) {
            return;
        }
        clients = null;
        closed = true;
        closedException = error;
    }
    // the rest
}

--
Best regards,
David Karnok

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: SubmissionPublisher closeExceptionally() may override close()

Doug Lea
On 03/02/2017 03:39 PM, Dávid Karnok wrote:
> Hi,
>
> The following test fails (after a few rounds for me) because a
> concurrent call to close() and closeExceptionally() race for the
> terminal condition and different subscribers may see different terminal
> state:

Thanks for the picky testing!
I agree that the onSubscribe error status is unnecessarily racy,
and it would be nicer to be consistent, and it is easy to do so.
It's now in our sources, but given jdk9  ramp-down rules, unlikely to
make it into initial jdk9 release, especially because it is a niceness
bug.

Thanks also for the post about request method. I hadn't noticed that
somehow r-s spec got passed requiring that the response to innocuous
calls to request(0) should be error. I'll further recheck that this
was not due to some miscommunication.

-Doug



_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: SubmissionPublisher closeExceptionally() may override close()

Dávid Karnok
No problem.

2017-03-03 1:47 GMT+01:00 Doug Lea <[hidden email]>:
On 03/02/2017 03:39 PM, Dávid Karnok wrote:
Hi,

The following test fails (after a few rounds for me) because a
concurrent call to close() and closeExceptionally() race for the
terminal condition and different subscribers may see different terminal
state:

Thanks for the picky testing!
I agree that the onSubscribe error status is unnecessarily racy,
and it would be nicer to be consistent, and it is easy to do so.
It's now in our sources, but given jdk9  ramp-down rules, unlikely to
make it into initial jdk9 release, especially because it is a niceness
bug.

Thanks also for the post about request method. I hadn't noticed that
somehow r-s spec got passed requiring that the response to innocuous
calls to request(0) should be error. I'll further recheck that this
was not due to some miscommunication.

-Doug



_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest



--
Best regards,
David Karnok

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest