A race in SubmissionPublisher?

classic Classic list List threaded Threaded
13 messages Options
Reply | Threaded
Open this post in threaded view
|

A race in SubmissionPublisher?

Pavel Rappo
Hi,

I've been using SubmissionPublisher in my own publisher implementation in order
to reuse its complex state machine that serializes invocations to subscribers.

While testing my implementation I ran into what I believe might be a race
condition in SubmissionPublisher.

Consider the following example:

    public class SubmissionPublisherTest {

        private final static int N = 1 << 20;

        private final AtomicInteger numbers = new AtomicInteger();
        private final SubmissionPublisher<Integer> pub = new
SubmissionPublisher<>();
        private final ExecutorService pubExecutor =
Executors.newSingleThreadExecutor();
        private final CountDownLatch finished = new CountDownLatch(1);

        public static void main(String[] args) throws InterruptedException {
            new SubmissionPublisherTest().run();
        }

        private void run() throws InterruptedException {
            pub.subscribe(newSubscriber());
            try {
                finished.await(30, TimeUnit.SECONDS);
            } finally {
                pubExecutor.shutdownNow();
            }
            System.out.println("Finished");
        }

        private Flow.Subscriber<Integer> newSubscriber() {
            return new Flow.Subscriber<>() {

                Flow.Subscription sub;
                int received;

                @Override
                public void onSubscribe(Flow.Subscription s) {
                    (this.sub = s).request(N);
                    publish();
                }

                @Override
                public void onNext(Integer item) {
                    if (++received == N) finished.countDown();
                    publish();
                    System.out.println(item);
                }

                @Override public void onError(Throwable t) { }
                @Override public void onComplete() { }
            };
        }

        private void publish() {
            int number = numbers.incrementAndGet();
            BiPredicate<Flow.Subscriber<? super Integer>, Integer>
                    onDropReportError = (s, i) -> { throw new
InternalError(); };
            pubExecutor.execute(() -> pub.offer(number, onDropReportError));
    //        pub.offer(number, onDropReportError);
        }
    }

What happens here?

Subscriber.onSubscribe bulk-requests a number (N = 1048576) of integers from
SubmissionPublisher. Subscriber.onNext prints the integer it has received and
offers a single integer to the publisher. run() waits until all N integers have
been received and then returns.

onSubscribe offers an initial integer to the SubmissionPublisher. This
kick-starts
the "feedback loop".

If you run this snippet, chances are good you will not see all 1048576 integers
printed out to the console before the program terminates. Instead, the output
will stop at some number m < 1048576 for no apparent reason.

However, if you comment out the line:

    pubExecutor.execute(() -> pub.offer(number, onDropReportError));

and uncomment the previously commented line, everything will work as expected.
The difference is that in this case offers of integers happen synchronously
rather than from a separate thread.

P.S. I have debugged a little bit and my guess (I'm not a concurrency expert)
is that accesses to SubmissionPublisher.BufferedSubscription.tail field are
undersynchronized. In particular, it might be the case that updates to this
field are not always seen by the consumer task in
BufferedSubscription.checkEmpty.

Is there any chance `tail` field is simply missing volatile modifier?

Thanks,
-Pavel
_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: A race in SubmissionPublisher?

Dávid Karnok
Interesting. It hangs for me after ~4000 items.

For comparison, I've tried the test with my variant of the SubmissionPublisher (https://github.com/akarnokd/akarnokd-misc-java9/blob/master/src/main/java/hu/akarnokd/java9/benchmark/MulticastPublisher.java) and worked as expected in all cases.

2017-09-25 15:21 GMT+02:00 Pavel Rappo <[hidden email]>:
Hi,

I've been using SubmissionPublisher in my own publisher implementation in order
to reuse its complex state machine that serializes invocations to subscribers.

While testing my implementation I ran into what I believe might be a race
condition in SubmissionPublisher.

Consider the following example:

    public class SubmissionPublisherTest {

        private final static int N = 1 << 20;

        private final AtomicInteger numbers = new AtomicInteger();
        private final SubmissionPublisher<Integer> pub = new
SubmissionPublisher<>();
        private final ExecutorService pubExecutor =
Executors.newSingleThreadExecutor();
        private final CountDownLatch finished = new CountDownLatch(1);

        public static void main(String[] args) throws InterruptedException {
            new SubmissionPublisherTest().run();
        }

        private void run() throws InterruptedException {
            pub.subscribe(newSubscriber());
            try {
                finished.await(30, TimeUnit.SECONDS);
            } finally {
                pubExecutor.shutdownNow();
            }
            System.out.println("Finished");
        }

        private Flow.Subscriber<Integer> newSubscriber() {
            return new Flow.Subscriber<>() {

                Flow.Subscription sub;
                int received;

                @Override
                public void onSubscribe(Flow.Subscription s) {
                    (this.sub = s).request(N);
                    publish();
                }

                @Override
                public void onNext(Integer item) {
                    if (++received == N) finished.countDown();
                    publish();
                    System.out.println(item);
                }

                @Override public void onError(Throwable t) { }
                @Override public void onComplete() { }
            };
        }

        private void publish() {
            int number = numbers.incrementAndGet();
            BiPredicate<Flow.Subscriber<? super Integer>, Integer>
                    onDropReportError = (s, i) -> { throw new
InternalError(); };
            pubExecutor.execute(() -> pub.offer(number, onDropReportError));
    //        pub.offer(number, onDropReportError);
        }
    }

What happens here?

Subscriber.onSubscribe bulk-requests a number (N = 1048576) of integers from
SubmissionPublisher. Subscriber.onNext prints the integer it has received and
offers a single integer to the publisher. run() waits until all N integers have
been received and then returns.

onSubscribe offers an initial integer to the SubmissionPublisher. This
kick-starts
the "feedback loop".

If you run this snippet, chances are good you will not see all 1048576 integers
printed out to the console before the program terminates. Instead, the output
will stop at some number m < 1048576 for no apparent reason.

However, if you comment out the line:

    pubExecutor.execute(() -> pub.offer(number, onDropReportError));

and uncomment the previously commented line, everything will work as expected.
The difference is that in this case offers of integers happen synchronously
rather than from a separate thread.

P.S. I have debugged a little bit and my guess (I'm not a concurrency expert)
is that accesses to SubmissionPublisher.BufferedSubscription.tail field are
undersynchronized. In particular, it might be the case that updates to this
field are not always seen by the consumer task in
BufferedSubscription.checkEmpty.

Is there any chance `tail` field is simply missing volatile modifier?

Thanks,
-Pavel
_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest



--
Best regards,
David Karnok

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: A race in SubmissionPublisher?

Doug Lea
In reply to this post by Pavel Rappo
On 09/25/2017 09:21 AM, Pavel Rappo wrote:
> Hi,
>
> I've been using SubmissionPublisher in my own publisher implementation in order
> to reuse its complex state machine that serializes invocations to subscribers.
>
> While testing my implementation I ran into what I believe might be a race
> condition in SubmissionPublisher.

Thanks for finding a use case that at first doesn't even seem legal,
but I agree should work according to spec, and breaks assumptions
about produce-consumer relations that can cause a wakeup not to be
issued. I'll post a fix at the corresponding bug report:
  https://bugs.openjdk.java.net/browse/JDK-8187947
after deciding which of a couple of ways to address.

-Doug


_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: A race in SubmissionPublisher?

Viktor Klang
In reply to this post by Pavel Rappo
Hi Pavel,

I trust that the code you submitted is for test case reproduction primarily, but there are a few spec violations in your Subscriber implementation that you'll want to address.

We're currently in the process of releasing a bridge jar between org.reactivestreams and j.u.c.Flow so you can plug in your Flow.* to the RS TCK to have them verified.

On Mon, Sep 25, 2017 at 3:21 PM, Pavel Rappo <[hidden email]> wrote:
Hi,

I've been using SubmissionPublisher in my own publisher implementation in order
to reuse its complex state machine that serializes invocations to subscribers.

While testing my implementation I ran into what I believe might be a race
condition in SubmissionPublisher.

Consider the following example:

    public class SubmissionPublisherTest {

        private final static int N = 1 << 20;

        private final AtomicInteger numbers = new AtomicInteger();
        private final SubmissionPublisher<Integer> pub = new
SubmissionPublisher<>();
        private final ExecutorService pubExecutor =
Executors.newSingleThreadExecutor();
        private final CountDownLatch finished = new CountDownLatch(1);

        public static void main(String[] args) throws InterruptedException {
            new SubmissionPublisherTest().run();
        }

        private void run() throws InterruptedException {
            pub.subscribe(newSubscriber());
            try {
                finished.await(30, TimeUnit.SECONDS);
            } finally {
                pubExecutor.shutdownNow();
            }
            System.out.println("Finished");
        }

        private Flow.Subscriber<Integer> newSubscriber() {
            return new Flow.Subscriber<>() {

                Flow.Subscription sub;
                int received;

                @Override
                public void onSubscribe(Flow.Subscription s) {
                    (this.sub = s).request(N);
                    publish();
                }

                @Override
                public void onNext(Integer item) {
                    if (++received == N) finished.countDown();
                    publish();
                    System.out.println(item);
                }

                @Override public void onError(Throwable t) { }
                @Override public void onComplete() { }
            };
        }

        private void publish() {
            int number = numbers.incrementAndGet();
            BiPredicate<Flow.Subscriber<? super Integer>, Integer>
                    onDropReportError = (s, i) -> { throw new
InternalError(); };
            pubExecutor.execute(() -> pub.offer(number, onDropReportError));
    //        pub.offer(number, onDropReportError);
        }
    }

What happens here?

Subscriber.onSubscribe bulk-requests a number (N = 1048576) of integers from
SubmissionPublisher. Subscriber.onNext prints the integer it has received and
offers a single integer to the publisher. run() waits until all N integers have
been received and then returns.

onSubscribe offers an initial integer to the SubmissionPublisher. This
kick-starts
the "feedback loop".

If you run this snippet, chances are good you will not see all 1048576 integers
printed out to the console before the program terminates. Instead, the output
will stop at some number m < 1048576 for no apparent reason.

However, if you comment out the line:

    pubExecutor.execute(() -> pub.offer(number, onDropReportError));

and uncomment the previously commented line, everything will work as expected.
The difference is that in this case offers of integers happen synchronously
rather than from a separate thread.

P.S. I have debugged a little bit and my guess (I'm not a concurrency expert)
is that accesses to SubmissionPublisher.BufferedSubscription.tail field are
undersynchronized. In particular, it might be the case that updates to this
field are not always seen by the consumer task in
BufferedSubscription.checkEmpty.

Is there any chance `tail` field is simply missing volatile modifier?

Thanks,
-Pavel
_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest



--
Cheers,

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: A race in SubmissionPublisher?

Pavel Rappo
Hi Viktor,

On Tue, Sep 26, 2017 at 12:58 PM, Viktor Klang <[hidden email]> wrote:
> there are a few spec violations in your Subscriber implementation that
> you'll want to address.

What violations are you talking about? Maybe you could list
appropriate rule numbers from the RS spec? Thank you.
_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: A race in SubmissionPublisher?

Dávid Karnok
In reply to this post by Viktor Klang
Viktor, can you be more specific? It just looks uncommon to me but otherwise legitimate. The fact that there is no cancel() call when all requested data has arrived is irrelevant from the perspective of the underlying bug in SubmissionPublisher.

Here is a variant where the Subscriber is just consuming items and counting how many items have been received. The asynchronous sender reads this count to know when to send the next value to expose the internal race.

public class SpPublishAlt {
@Test
public void test() throws Exception {
SubmissionPublisher<Integer> sp = new SubmissionPublisher<>();

ExecutorService exec = Executors.newSingleThreadExecutor();
try {
SpConsumer c = new SpConsumer();
sp.subscribe(c);

exec.submit(() -> {
while (c.upstream == null) ;

int i = 0;
sp.offer(i, (a, b) -> { throw new RuntimeException(); });

while (i < SpConsumer.N) {
while (c.getAcquire() == i);
i++;
sp.offer(i, (a, b) -> { throw new RuntimeException(); });
}
});

if (!c.cdl.await(10, TimeUnit.SECONDS)) {
throw new AssertionError("Timed out " + c.getAcquire());
}
} finally {
exec.shutdownNow();
}
}

static final class SpConsumer extends AtomicInteger implements Flow.Subscriber<Object> {

static final int N = 1 << 20;

final CountDownLatch cdl = new CountDownLatch(1);

volatile Flow.Subscription upstream;

@Override
public void onSubscribe(Flow.Subscription subscription) {
upstream = subscription;
subscription.request(N);
}

@Override
public void onNext(Object item) {
System.out.println(item);
int i = getPlain() + 1;
setRelease(i);
if (i == N) {
upstream.cancel();
cdl.countDown();
}
}

@Override
public void onError(Throwable throwable) {
throwable.printStackTrace();
cdl.countDown();
}

@Override
public void onComplete() {
cdl.countDown();
}
}
}

2017-09-26 13:58 GMT+02:00 Viktor Klang <[hidden email]>:
Hi Pavel,

I trust that the code you submitted is for test case reproduction primarily, but there are a few spec violations in your Subscriber implementation that you'll want to address.

We're currently in the process of releasing a bridge jar between org.reactivestreams and j.u.c.Flow so you can plug in your Flow.* to the RS TCK to have them verified.

On Mon, Sep 25, 2017 at 3:21 PM, Pavel Rappo <[hidden email]> wrote:
Hi,

I've been using SubmissionPublisher in my own publisher implementation in order
to reuse its complex state machine that serializes invocations to subscribers.

While testing my implementation I ran into what I believe might be a race
condition in SubmissionPublisher.

Consider the following example:

    public class SubmissionPublisherTest {

        private final static int N = 1 << 20;

        private final AtomicInteger numbers = new AtomicInteger();
        private final SubmissionPublisher<Integer> pub = new
SubmissionPublisher<>();
        private final ExecutorService pubExecutor =
Executors.newSingleThreadExecutor();
        private final CountDownLatch finished = new CountDownLatch(1);

        public static void main(String[] args) throws InterruptedException {
            new SubmissionPublisherTest().run();
        }

        private void run() throws InterruptedException {
            pub.subscribe(newSubscriber());
            try {
                finished.await(30, TimeUnit.SECONDS);
            } finally {
                pubExecutor.shutdownNow();
            }
            System.out.println("Finished");
        }

        private Flow.Subscriber<Integer> newSubscriber() {
            return new Flow.Subscriber<>() {

                Flow.Subscription sub;
                int received;

                @Override
                public void onSubscribe(Flow.Subscription s) {
                    (this.sub = s).request(N);
                    publish();
                }

                @Override
                public void onNext(Integer item) {
                    if (++received == N) finished.countDown();
                    publish();
                    System.out.println(item);
                }

                @Override public void onError(Throwable t) { }
                @Override public void onComplete() { }
            };
        }

        private void publish() {
            int number = numbers.incrementAndGet();
            BiPredicate<Flow.Subscriber<? super Integer>, Integer>
                    onDropReportError = (s, i) -> { throw new
InternalError(); };
            pubExecutor.execute(() -> pub.offer(number, onDropReportError));
    //        pub.offer(number, onDropReportError);
        }
    }

What happens here?

Subscriber.onSubscribe bulk-requests a number (N = 1048576) of integers from
SubmissionPublisher. Subscriber.onNext prints the integer it has received and
offers a single integer to the publisher. run() waits until all N integers have
been received and then returns.

onSubscribe offers an initial integer to the SubmissionPublisher. This
kick-starts
the "feedback loop".

If you run this snippet, chances are good you will not see all 1048576 integers
printed out to the console before the program terminates. Instead, the output
will stop at some number m < 1048576 for no apparent reason.

However, if you comment out the line:

    pubExecutor.execute(() -> pub.offer(number, onDropReportError));

and uncomment the previously commented line, everything will work as expected.
The difference is that in this case offers of integers happen synchronously
rather than from a separate thread.

P.S. I have debugged a little bit and my guess (I'm not a concurrency expert)
is that accesses to SubmissionPublisher.BufferedSubscription.tail field are
undersynchronized. In particular, it might be the case that updates to this
field are not always seen by the consumer task in
BufferedSubscription.checkEmpty.

Is there any chance `tail` field is simply missing volatile modifier?

Thanks,
-Pavel
_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest



--
Cheers,

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest




--
Best regards,
David Karnok

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: A race in SubmissionPublisher?

Viktor Klang
In reply to this post by Pavel Rappo
Hi Pavel,

Absolutely, here you go: :)


The above list might not be exhaustive,

Cheers,

On Tue, Sep 26, 2017 at 2:09 PM, Pavel Rappo <[hidden email]> wrote:
Hi Viktor,

On Tue, Sep 26, 2017 at 12:58 PM, Viktor Klang <[hidden email]> wrote:
> there are a few spec violations in your Subscriber implementation that
> you'll want to address.

What violations are you talking about? Maybe you could list
appropriate rule numbers from the RS spec? Thank you.



--
Cheers,

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: A race in SubmissionPublisher?

Viktor Klang
In reply to this post by Dávid Karnok


On Tue, Sep 26, 2017 at 2:29 PM, Dávid Karnok <[hidden email]> wrote:
Viktor, can you be more specific? It just looks uncommon to me but otherwise legitimate. The fact that there is no cancel() call when all requested data has arrived is irrelevant from the perspective of the underlying bug in SubmissionPublisher.

Absolutely, my comment was not about the SubmissionPublisher bug. I was unsure whether the provided code was for reproduction only, or was written like that in general, in the latter case it needs a bit of adjustment to conform to spec.
 

Here is a variant where the Subscriber is just consuming items and counting how many items have been received. The asynchronous sender reads this count to know when to send the next value to expose the internal race.

public class SpPublishAlt {
@Test
public void test() throws Exception {
SubmissionPublisher<Integer> sp = new SubmissionPublisher<>();

ExecutorService exec = Executors.newSingleThreadExecutor();
try {
SpConsumer c = new SpConsumer();
sp.subscribe(c);

exec.submit(() -> {
while (c.upstream == null) ;

int i = 0;
sp.offer(i, (a, b) -> { throw new RuntimeException(); });

while (i < SpConsumer.N) {
while (c.getAcquire() == i);
i++;
sp.offer(i, (a, b) -> { throw new RuntimeException(); });
}
});

if (!c.cdl.await(10, TimeUnit.SECONDS)) {
throw new AssertionError("Timed out " + c.getAcquire());
}
} finally {
exec.shutdownNow();
}
}

static final class SpConsumer extends AtomicInteger implements Flow.Subscriber<Object> {

static final int N = 1 << 20;

final CountDownLatch cdl = new CountDownLatch(1);

volatile Flow.Subscription upstream;

@Override
public void onSubscribe(Flow.Subscription subscription) {
upstream = subscription;
subscription.request(N);
}

@Override
public void onNext(Object item) {
System.out.println(item);
int i = getPlain() + 1;
setRelease(i);
if (i == N) {
upstream.cancel();
cdl.countDown();
}
}

@Override
public void onError(Throwable throwable) {
throwable.printStackTrace();
cdl.countDown();
}

@Override
public void onComplete() {
cdl.countDown();
}
}
}

2017-09-26 13:58 GMT+02:00 Viktor Klang <[hidden email]>:
Hi Pavel,

I trust that the code you submitted is for test case reproduction primarily, but there are a few spec violations in your Subscriber implementation that you'll want to address.

We're currently in the process of releasing a bridge jar between org.reactivestreams and j.u.c.Flow so you can plug in your Flow.* to the RS TCK to have them verified.

On Mon, Sep 25, 2017 at 3:21 PM, Pavel Rappo <[hidden email]> wrote:
Hi,

I've been using SubmissionPublisher in my own publisher implementation in order
to reuse its complex state machine that serializes invocations to subscribers.

While testing my implementation I ran into what I believe might be a race
condition in SubmissionPublisher.

Consider the following example:

    public class SubmissionPublisherTest {

        private final static int N = 1 << 20;

        private final AtomicInteger numbers = new AtomicInteger();
        private final SubmissionPublisher<Integer> pub = new
SubmissionPublisher<>();
        private final ExecutorService pubExecutor =
Executors.newSingleThreadExecutor();
        private final CountDownLatch finished = new CountDownLatch(1);

        public static void main(String[] args) throws InterruptedException {
            new SubmissionPublisherTest().run();
        }

        private void run() throws InterruptedException {
            pub.subscribe(newSubscriber());
            try {
                finished.await(30, TimeUnit.SECONDS);
            } finally {
                pubExecutor.shutdownNow();
            }
            System.out.println("Finished");
        }

        private Flow.Subscriber<Integer> newSubscriber() {
            return new Flow.Subscriber<>() {

                Flow.Subscription sub;
                int received;

                @Override
                public void onSubscribe(Flow.Subscription s) {
                    (this.sub = s).request(N);
                    publish();
                }

                @Override
                public void onNext(Integer item) {
                    if (++received == N) finished.countDown();
                    publish();
                    System.out.println(item);
                }

                @Override public void onError(Throwable t) { }
                @Override public void onComplete() { }
            };
        }

        private void publish() {
            int number = numbers.incrementAndGet();
            BiPredicate<Flow.Subscriber<? super Integer>, Integer>
                    onDropReportError = (s, i) -> { throw new
InternalError(); };
            pubExecutor.execute(() -> pub.offer(number, onDropReportError));
    //        pub.offer(number, onDropReportError);
        }
    }

What happens here?

Subscriber.onSubscribe bulk-requests a number (N = 1048576) of integers from
SubmissionPublisher. Subscriber.onNext prints the integer it has received and
offers a single integer to the publisher. run() waits until all N integers have
been received and then returns.

onSubscribe offers an initial integer to the SubmissionPublisher. This
kick-starts
the "feedback loop".

If you run this snippet, chances are good you will not see all 1048576 integers
printed out to the console before the program terminates. Instead, the output
will stop at some number m < 1048576 for no apparent reason.

However, if you comment out the line:

    pubExecutor.execute(() -> pub.offer(number, onDropReportError));

and uncomment the previously commented line, everything will work as expected.
The difference is that in this case offers of integers happen synchronously
rather than from a separate thread.

P.S. I have debugged a little bit and my guess (I'm not a concurrency expert)
is that accesses to SubmissionPublisher.BufferedSubscription.tail field are
undersynchronized. In particular, it might be the case that updates to this
field are not always seen by the consumer task in
BufferedSubscription.checkEmpty.

Is there any chance `tail` field is simply missing volatile modifier?

Thanks,
-Pavel
_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest



--
Cheers,

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest




--
Best regards,
David Karnok



--
Cheers,

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: A race in SubmissionPublisher?

Dávid Karnok
Addendum to the rules listed:

§2.5: in practice, this should not happen with reasonable reactive libraries. Depending on how you trust the Publisher, omitting such checks is not a disaster. Plus, the JavaDoc's example is also hints about plain store of the Flow.Subscription:


class TransformProcessor<S,T> extends SubmissionPublisher<T>
   implements Flow.Processor<S,T> {
   final Function<? super S, ? extends T> function;
   Flow.Subscription subscription;
   TransformProcessor(Executor executor, int maxBufferCapacity,
                      Function<? super S, ? extends T> function) {
     super(executor, maxBufferCapacity);
     this.function = function;
   }
   public void onSubscribe(Flow.Subscription subscription) {
     (this.subscription = subscription).request(1);
   }
   public void onNext(S item) {
     subscription.request(1);
     submit(function.apply(item));
   }
   public void onError(Throwable ex) { closeExceptionally(ex); }
   public void onComplete() { close(); }
 }
§2.13: Yes, pubExecutor.execute() can throw a RejectedExecutionException in general, but often you can't do much about that case. Plus again, the example above uses submit() which can also throw: http://download.java.net/java/jdk9/docs/api/java/util/concurrent/SubmissionPublisher.html#submit-T-

2017-09-26 14:36 GMT+02:00 Viktor Klang <[hidden email]>:


On Tue, Sep 26, 2017 at 2:29 PM, Dávid Karnok <[hidden email]> wrote:
Viktor, can you be more specific? It just looks uncommon to me but otherwise legitimate. The fact that there is no cancel() call when all requested data has arrived is irrelevant from the perspective of the underlying bug in SubmissionPublisher.

Absolutely, my comment was not about the SubmissionPublisher bug. I was unsure whether the provided code was for reproduction only, or was written like that in general, in the latter case it needs a bit of adjustment to conform to spec.
 

Here is a variant where the Subscriber is just consuming items and counting how many items have been received. The asynchronous sender reads this count to know when to send the next value to expose the internal race.

public class SpPublishAlt {
@Test
public void test() throws Exception {
SubmissionPublisher<Integer> sp = new SubmissionPublisher<>();

ExecutorService exec = Executors.newSingleThreadExecutor();
try {
SpConsumer c = new SpConsumer();
sp.subscribe(c);

exec.submit(() -> {
while (c.upstream == null) ;

int i = 0;
sp.offer(i, (a, b) -> { throw new RuntimeException(); });

while (i < SpConsumer.N) {
while (c.getAcquire() == i);
i++;
sp.offer(i, (a, b) -> { throw new RuntimeException(); });
}
});

if (!c.cdl.await(10, TimeUnit.SECONDS)) {
throw new AssertionError("Timed out " + c.getAcquire());
}
} finally {
exec.shutdownNow();
}
}

static final class SpConsumer extends AtomicInteger implements Flow.Subscriber<Object> {

static final int N = 1 << 20;

final CountDownLatch cdl = new CountDownLatch(1);

volatile Flow.Subscription upstream;

@Override
public void onSubscribe(Flow.Subscription subscription) {
upstream = subscription;
subscription.request(N);
}

@Override
public void onNext(Object item) {
System.out.println(item);
int i = getPlain() + 1;
setRelease(i);
if (i == N) {
upstream.cancel();
cdl.countDown();
}
}

@Override
public void onError(Throwable throwable) {
throwable.printStackTrace();
cdl.countDown();
}

@Override
public void onComplete() {
cdl.countDown();
}
}
}

2017-09-26 13:58 GMT+02:00 Viktor Klang <[hidden email]>:
Hi Pavel,

I trust that the code you submitted is for test case reproduction primarily, but there are a few spec violations in your Subscriber implementation that you'll want to address.

We're currently in the process of releasing a bridge jar between org.reactivestreams and j.u.c.Flow so you can plug in your Flow.* to the RS TCK to have them verified.

On Mon, Sep 25, 2017 at 3:21 PM, Pavel Rappo <[hidden email]> wrote:
Hi,

I've been using SubmissionPublisher in my own publisher implementation in order
to reuse its complex state machine that serializes invocations to subscribers.

While testing my implementation I ran into what I believe might be a race
condition in SubmissionPublisher.

Consider the following example:

    public class SubmissionPublisherTest {

        private final static int N = 1 << 20;

        private final AtomicInteger numbers = new AtomicInteger();
        private final SubmissionPublisher<Integer> pub = new
SubmissionPublisher<>();
        private final ExecutorService pubExecutor =
Executors.newSingleThreadExecutor();
        private final CountDownLatch finished = new CountDownLatch(1);

        public static void main(String[] args) throws InterruptedException {
            new SubmissionPublisherTest().run();
        }

        private void run() throws InterruptedException {
            pub.subscribe(newSubscriber());
            try {
                finished.await(30, TimeUnit.SECONDS);
            } finally {
                pubExecutor.shutdownNow();
            }
            System.out.println("Finished");
        }

        private Flow.Subscriber<Integer> newSubscriber() {
            return new Flow.Subscriber<>() {

                Flow.Subscription sub;
                int received;

                @Override
                public void onSubscribe(Flow.Subscription s) {
                    (this.sub = s).request(N);
                    publish();
                }

                @Override
                public void onNext(Integer item) {
                    if (++received == N) finished.countDown();
                    publish();
                    System.out.println(item);
                }

                @Override public void onError(Throwable t) { }
                @Override public void onComplete() { }
            };
        }

        private void publish() {
            int number = numbers.incrementAndGet();
            BiPredicate<Flow.Subscriber<? super Integer>, Integer>
                    onDropReportError = (s, i) -> { throw new
InternalError(); };
            pubExecutor.execute(() -> pub.offer(number, onDropReportError));
    //        pub.offer(number, onDropReportError);
        }
    }

What happens here?

Subscriber.onSubscribe bulk-requests a number (N = 1048576) of integers from
SubmissionPublisher. Subscriber.onNext prints the integer it has received and
offers a single integer to the publisher. run() waits until all N integers have
been received and then returns.

onSubscribe offers an initial integer to the SubmissionPublisher. This
kick-starts
the "feedback loop".

If you run this snippet, chances are good you will not see all 1048576 integers
printed out to the console before the program terminates. Instead, the output
will stop at some number m < 1048576 for no apparent reason.

However, if you comment out the line:

    pubExecutor.execute(() -> pub.offer(number, onDropReportError));

and uncomment the previously commented line, everything will work as expected.
The difference is that in this case offers of integers happen synchronously
rather than from a separate thread.

P.S. I have debugged a little bit and my guess (I'm not a concurrency expert)
is that accesses to SubmissionPublisher.BufferedSubscription.tail field are
undersynchronized. In particular, it might be the case that updates to this
field are not always seen by the consumer task in
BufferedSubscription.checkEmpty.

Is there any chance `tail` field is simply missing volatile modifier?

Thanks,
-Pavel
_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest



--
Cheers,

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest




--
Best regards,
David Karnok



--
Cheers,



--
Best regards,
David Karnok

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: A race in SubmissionPublisher?

Viktor Klang


On Tue, Sep 26, 2017 at 2:49 PM, Dávid Karnok <[hidden email]> wrote:
Addendum to the rules listed:

§2.5: in practice, this should not happen with reasonable reactive libraries. Depending on how you trust the Publisher, omitting such checks is not a disaster.

It's not a disaster, it's only non-compliance—it is a MUST rule, after all.
 
Plus, the JavaDoc's example is also hints about plain store of the Flow.Subscription:


class TransformProcessor<S,T> extends SubmissionPublisher<T>
   implements Flow.Processor<S,T> {
   final Function<? super S, ? extends T> function;
   Flow.Subscription subscription;
   TransformProcessor(Executor executor, int maxBufferCapacity,
                      Function<? super S, ? extends T> function) {
     super(executor, maxBufferCapacity);
     this.function = function;
   }
   public void onSubscribe(Flow.Subscription subscription) {

I've been meaning to send some JavaDoc contributions, I'll keep this one in mind! :)
 

     (this.subscription = subscription).request(1);
   }
   public void onNext(S item) {
     subscription.request(1);
     submit(function.apply(item));
   }
   public void onError(Throwable ex) { closeExceptionally(ex); }
   public void onComplete() { close(); }
 }
§2.13: Yes, pubExecutor.execute() can throw a RejectedExecutionException in general, but often you can't do much about that case.

Most definitely, but that exception should most definitely not infect the issuer of the signal. The spec is unambiguous about this (for a reason!). :)
 
Plus again, the example above uses submit() which can also throw: http://download.java.net/java/jdk9/docs/api/java/util/concurrent/SubmissionPublisher.html#submit-T-

2017-09-26 14:36 GMT+02:00 Viktor Klang <[hidden email]>:


On Tue, Sep 26, 2017 at 2:29 PM, Dávid Karnok <[hidden email]> wrote:
Viktor, can you be more specific? It just looks uncommon to me but otherwise legitimate. The fact that there is no cancel() call when all requested data has arrived is irrelevant from the perspective of the underlying bug in SubmissionPublisher.

Absolutely, my comment was not about the SubmissionPublisher bug. I was unsure whether the provided code was for reproduction only, or was written like that in general, in the latter case it needs a bit of adjustment to conform to spec.
 

Here is a variant where the Subscriber is just consuming items and counting how many items have been received. The asynchronous sender reads this count to know when to send the next value to expose the internal race.

public class SpPublishAlt {
@Test
public void test() throws Exception {
SubmissionPublisher<Integer> sp = new SubmissionPublisher<>();

ExecutorService exec = Executors.newSingleThreadExecutor();
try {
SpConsumer c = new SpConsumer();
sp.subscribe(c);

exec.submit(() -> {
while (c.upstream == null) ;

int i = 0;
sp.offer(i, (a, b) -> { throw new RuntimeException(); });

while (i < SpConsumer.N) {
while (c.getAcquire() == i);
i++;
sp.offer(i, (a, b) -> { throw new RuntimeException(); });
}
});

if (!c.cdl.await(10, TimeUnit.SECONDS)) {
throw new AssertionError("Timed out " + c.getAcquire());
}
} finally {
exec.shutdownNow();
}
}

static final class SpConsumer extends AtomicInteger implements Flow.Subscriber<Object> {

static final int N = 1 << 20;

final CountDownLatch cdl = new CountDownLatch(1);

volatile Flow.Subscription upstream;

@Override
public void onSubscribe(Flow.Subscription subscription) {
upstream = subscription;
subscription.request(N);
}

@Override
public void onNext(Object item) {
System.out.println(item);
int i = getPlain() + 1;
setRelease(i);
if (i == N) {
upstream.cancel();
cdl.countDown();
}
}

@Override
public void onError(Throwable throwable) {
throwable.printStackTrace();
cdl.countDown();
}

@Override
public void onComplete() {
cdl.countDown();
}
}
}

2017-09-26 13:58 GMT+02:00 Viktor Klang <[hidden email]>:
Hi Pavel,

I trust that the code you submitted is for test case reproduction primarily, but there are a few spec violations in your Subscriber implementation that you'll want to address.

We're currently in the process of releasing a bridge jar between org.reactivestreams and j.u.c.Flow so you can plug in your Flow.* to the RS TCK to have them verified.

On Mon, Sep 25, 2017 at 3:21 PM, Pavel Rappo <[hidden email]> wrote:
Hi,

I've been using SubmissionPublisher in my own publisher implementation in order
to reuse its complex state machine that serializes invocations to subscribers.

While testing my implementation I ran into what I believe might be a race
condition in SubmissionPublisher.

Consider the following example:

    public class SubmissionPublisherTest {

        private final static int N = 1 << 20;

        private final AtomicInteger numbers = new AtomicInteger();
        private final SubmissionPublisher<Integer> pub = new
SubmissionPublisher<>();
        private final ExecutorService pubExecutor =
Executors.newSingleThreadExecutor();
        private final CountDownLatch finished = new CountDownLatch(1);

        public static void main(String[] args) throws InterruptedException {
            new SubmissionPublisherTest().run();
        }

        private void run() throws InterruptedException {
            pub.subscribe(newSubscriber());
            try {
                finished.await(30, TimeUnit.SECONDS);
            } finally {
                pubExecutor.shutdownNow();
            }
            System.out.println("Finished");
        }

        private Flow.Subscriber<Integer> newSubscriber() {
            return new Flow.Subscriber<>() {

                Flow.Subscription sub;
                int received;

                @Override
                public void onSubscribe(Flow.Subscription s) {
                    (this.sub = s).request(N);
                    publish();
                }

                @Override
                public void onNext(Integer item) {
                    if (++received == N) finished.countDown();
                    publish();
                    System.out.println(item);
                }

                @Override public void onError(Throwable t) { }
                @Override public void onComplete() { }
            };
        }

        private void publish() {
            int number = numbers.incrementAndGet();
            BiPredicate<Flow.Subscriber<? super Integer>, Integer>
                    onDropReportError = (s, i) -> { throw new
InternalError(); };
            pubExecutor.execute(() -> pub.offer(number, onDropReportError));
    //        pub.offer(number, onDropReportError);
        }
    }

What happens here?

Subscriber.onSubscribe bulk-requests a number (N = 1048576) of integers from
SubmissionPublisher. Subscriber.onNext prints the integer it has received and
offers a single integer to the publisher. run() waits until all N integers have
been received and then returns.

onSubscribe offers an initial integer to the SubmissionPublisher. This
kick-starts
the "feedback loop".

If you run this snippet, chances are good you will not see all 1048576 integers
printed out to the console before the program terminates. Instead, the output
will stop at some number m < 1048576 for no apparent reason.

However, if you comment out the line:

    pubExecutor.execute(() -> pub.offer(number, onDropReportError));

and uncomment the previously commented line, everything will work as expected.
The difference is that in this case offers of integers happen synchronously
rather than from a separate thread.

P.S. I have debugged a little bit and my guess (I'm not a concurrency expert)
is that accesses to SubmissionPublisher.BufferedSubscription.tail field are
undersynchronized. In particular, it might be the case that updates to this
field are not always seen by the consumer task in
BufferedSubscription.checkEmpty.

Is there any chance `tail` field is simply missing volatile modifier?

Thanks,
-Pavel
_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest



--
Cheers,

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest




--
Best regards,
David Karnok



--
Cheers,



--
Best regards,
David Karnok



--
Cheers,

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: A race in SubmissionPublisher?

Dávid Karnok
Well, the spec is the spec. (Consequently, I guess you'll have much fun with this one: https://examples.javacodegeeks.com/core-java/java-9-reactive-streams-example/)

2017-09-26 15:42 GMT+02:00 Viktor Klang <[hidden email]>:


On Tue, Sep 26, 2017 at 2:49 PM, Dávid Karnok <[hidden email]> wrote:
Addendum to the rules listed:

§2.5: in practice, this should not happen with reasonable reactive libraries. Depending on how you trust the Publisher, omitting such checks is not a disaster.

It's not a disaster, it's only non-compliance—it is a MUST rule, after all.
 
Plus, the JavaDoc's example is also hints about plain store of the Flow.Subscription:


class TransformProcessor<S,T> extends SubmissionPublisher<T>
   implements Flow.Processor<S,T> {
   final Function<? super S, ? extends T> function;
   Flow.Subscription subscription;
   TransformProcessor(Executor executor, int maxBufferCapacity,
                      Function<? super S, ? extends T> function) {
     super(executor, maxBufferCapacity);
     this.function = function;
   }
   public void onSubscribe(Flow.Subscription subscription) {

I've been meaning to send some JavaDoc contributions, I'll keep this one in mind! :)
 

     (this.subscription = subscription).request(1);
   }
   public void onNext(S item) {
     subscription.request(1);
     submit(function.apply(item));
   }
   public void onError(Throwable ex) { closeExceptionally(ex); }
   public void onComplete() { close(); }
 }
§2.13: Yes, pubExecutor.execute() can throw a RejectedExecutionException in general, but often you can't do much about that case.

Most definitely, but that exception should most definitely not infect the issuer of the signal. The spec is unambiguous about this (for a reason!). :)
 
Plus again, the example above uses submit() which can also throw: http://download.java.net/java/jdk9/docs/api/java/util/concurrent/SubmissionPublisher.html#submit-T-

2017-09-26 14:36 GMT+02:00 Viktor Klang <[hidden email]>:


On Tue, Sep 26, 2017 at 2:29 PM, Dávid Karnok <[hidden email]> wrote:
Viktor, can you be more specific? It just looks uncommon to me but otherwise legitimate. The fact that there is no cancel() call when all requested data has arrived is irrelevant from the perspective of the underlying bug in SubmissionPublisher.

Absolutely, my comment was not about the SubmissionPublisher bug. I was unsure whether the provided code was for reproduction only, or was written like that in general, in the latter case it needs a bit of adjustment to conform to spec.
 

Here is a variant where the Subscriber is just consuming items and counting how many items have been received. The asynchronous sender reads this count to know when to send the next value to expose the internal race.

public class SpPublishAlt {
@Test
public void test() throws Exception {
SubmissionPublisher<Integer> sp = new SubmissionPublisher<>();

ExecutorService exec = Executors.newSingleThreadExecutor();
try {
SpConsumer c = new SpConsumer();
sp.subscribe(c);

exec.submit(() -> {
while (c.upstream == null) ;

int i = 0;
sp.offer(i, (a, b) -> { throw new RuntimeException(); });

while (i < SpConsumer.N) {
while (c.getAcquire() == i);
i++;
sp.offer(i, (a, b) -> { throw new RuntimeException(); });
}
});

if (!c.cdl.await(10, TimeUnit.SECONDS)) {
throw new AssertionError("Timed out " + c.getAcquire());
}
} finally {
exec.shutdownNow();
}
}

static final class SpConsumer extends AtomicInteger implements Flow.Subscriber<Object> {

static final int N = 1 << 20;

final CountDownLatch cdl = new CountDownLatch(1);

volatile Flow.Subscription upstream;

@Override
public void onSubscribe(Flow.Subscription subscription) {
upstream = subscription;
subscription.request(N);
}

@Override
public void onNext(Object item) {
System.out.println(item);
int i = getPlain() + 1;
setRelease(i);
if (i == N) {
upstream.cancel();
cdl.countDown();
}
}

@Override
public void onError(Throwable throwable) {
throwable.printStackTrace();
cdl.countDown();
}

@Override
public void onComplete() {
cdl.countDown();
}
}
}

2017-09-26 13:58 GMT+02:00 Viktor Klang <[hidden email]>:
Hi Pavel,

I trust that the code you submitted is for test case reproduction primarily, but there are a few spec violations in your Subscriber implementation that you'll want to address.

We're currently in the process of releasing a bridge jar between org.reactivestreams and j.u.c.Flow so you can plug in your Flow.* to the RS TCK to have them verified.

On Mon, Sep 25, 2017 at 3:21 PM, Pavel Rappo <[hidden email]> wrote:
Hi,

I've been using SubmissionPublisher in my own publisher implementation in order
to reuse its complex state machine that serializes invocations to subscribers.

While testing my implementation I ran into what I believe might be a race
condition in SubmissionPublisher.

Consider the following example:

    public class SubmissionPublisherTest {

        private final static int N = 1 << 20;

        private final AtomicInteger numbers = new AtomicInteger();
        private final SubmissionPublisher<Integer> pub = new
SubmissionPublisher<>();
        private final ExecutorService pubExecutor =
Executors.newSingleThreadExecutor();
        private final CountDownLatch finished = new CountDownLatch(1);

        public static void main(String[] args) throws InterruptedException {
            new SubmissionPublisherTest().run();
        }

        private void run() throws InterruptedException {
            pub.subscribe(newSubscriber());
            try {
                finished.await(30, TimeUnit.SECONDS);
            } finally {
                pubExecutor.shutdownNow();
            }
            System.out.println("Finished");
        }

        private Flow.Subscriber<Integer> newSubscriber() {
            return new Flow.Subscriber<>() {

                Flow.Subscription sub;
                int received;

                @Override
                public void onSubscribe(Flow.Subscription s) {
                    (this.sub = s).request(N);
                    publish();
                }

                @Override
                public void onNext(Integer item) {
                    if (++received == N) finished.countDown();
                    publish();
                    System.out.println(item);
                }

                @Override public void onError(Throwable t) { }
                @Override public void onComplete() { }
            };
        }

        private void publish() {
            int number = numbers.incrementAndGet();
            BiPredicate<Flow.Subscriber<? super Integer>, Integer>
                    onDropReportError = (s, i) -> { throw new
InternalError(); };
            pubExecutor.execute(() -> pub.offer(number, onDropReportError));
    //        pub.offer(number, onDropReportError);
        }
    }

What happens here?

Subscriber.onSubscribe bulk-requests a number (N = 1048576) of integers from
SubmissionPublisher. Subscriber.onNext prints the integer it has received and
offers a single integer to the publisher. run() waits until all N integers have
been received and then returns.

onSubscribe offers an initial integer to the SubmissionPublisher. This
kick-starts
the "feedback loop".

If you run this snippet, chances are good you will not see all 1048576 integers
printed out to the console before the program terminates. Instead, the output
will stop at some number m < 1048576 for no apparent reason.

However, if you comment out the line:

    pubExecutor.execute(() -> pub.offer(number, onDropReportError));

and uncomment the previously commented line, everything will work as expected.
The difference is that in this case offers of integers happen synchronously
rather than from a separate thread.

P.S. I have debugged a little bit and my guess (I'm not a concurrency expert)
is that accesses to SubmissionPublisher.BufferedSubscription.tail field are
undersynchronized. In particular, it might be the case that updates to this
field are not always seen by the consumer task in
BufferedSubscription.checkEmpty.

Is there any chance `tail` field is simply missing volatile modifier?

Thanks,
-Pavel
_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest



--
Cheers,

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest




--
Best regards,
David Karnok



--
Cheers,



--
Best regards,
David Karnok



--
Cheers,



--
Best regards,
David Karnok

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: A race in SubmissionPublisher?

Viktor Klang
Thanks for the link.

I think the most scalable approach to help interoperability is to evangelize the use of the TCK whenever possible, helps people catch subtle bugs quickly.


On Tue, Sep 26, 2017 at 4:13 PM, Dávid Karnok <[hidden email]> wrote:
Well, the spec is the spec. (Consequently, I guess you'll have much fun with this one: https://examples.javacodegeeks.com/core-java/java-9-reactive-streams-example/)

2017-09-26 15:42 GMT+02:00 Viktor Klang <[hidden email]>:


On Tue, Sep 26, 2017 at 2:49 PM, Dávid Karnok <[hidden email]> wrote:
Addendum to the rules listed:

§2.5: in practice, this should not happen with reasonable reactive libraries. Depending on how you trust the Publisher, omitting such checks is not a disaster.

It's not a disaster, it's only non-compliance—it is a MUST rule, after all.
 
Plus, the JavaDoc's example is also hints about plain store of the Flow.Subscription:


class TransformProcessor<S,T> extends SubmissionPublisher<T>
   implements Flow.Processor<S,T> {
   final Function<? super S, ? extends T> function;
   Flow.Subscription subscription;
   TransformProcessor(Executor executor, int maxBufferCapacity,
                      Function<? super S, ? extends T> function) {
     super(executor, maxBufferCapacity);
     this.function = function;
   }
   public void onSubscribe(Flow.Subscription subscription) {

I've been meaning to send some JavaDoc contributions, I'll keep this one in mind! :)
 

     (this.subscription = subscription).request(1);
   }
   public void onNext(S item) {
     subscription.request(1);
     submit(function.apply(item));
   }
   public void onError(Throwable ex) { closeExceptionally(ex); }
   public void onComplete() { close(); }
 }
§2.13: Yes, pubExecutor.execute() can throw a RejectedExecutionException in general, but often you can't do much about that case.

Most definitely, but that exception should most definitely not infect the issuer of the signal. The spec is unambiguous about this (for a reason!). :)
 
Plus again, the example above uses submit() which can also throw: http://download.java.net/java/jdk9/docs/api/java/util/concurrent/SubmissionPublisher.html#submit-T-

2017-09-26 14:36 GMT+02:00 Viktor Klang <[hidden email]>:


On Tue, Sep 26, 2017 at 2:29 PM, Dávid Karnok <[hidden email]> wrote:
Viktor, can you be more specific? It just looks uncommon to me but otherwise legitimate. The fact that there is no cancel() call when all requested data has arrived is irrelevant from the perspective of the underlying bug in SubmissionPublisher.

Absolutely, my comment was not about the SubmissionPublisher bug. I was unsure whether the provided code was for reproduction only, or was written like that in general, in the latter case it needs a bit of adjustment to conform to spec.
 

Here is a variant where the Subscriber is just consuming items and counting how many items have been received. The asynchronous sender reads this count to know when to send the next value to expose the internal race.

public class SpPublishAlt {
@Test
public void test() throws Exception {
SubmissionPublisher<Integer> sp = new SubmissionPublisher<>();

ExecutorService exec = Executors.newSingleThreadExecutor();
try {
SpConsumer c = new SpConsumer();
sp.subscribe(c);

exec.submit(() -> {
while (c.upstream == null) ;

int i = 0;
sp.offer(i, (a, b) -> { throw new RuntimeException(); });

while (i < SpConsumer.N) {
while (c.getAcquire() == i);
i++;
sp.offer(i, (a, b) -> { throw new RuntimeException(); });
}
});

if (!c.cdl.await(10, TimeUnit.SECONDS)) {
throw new AssertionError("Timed out " + c.getAcquire());
}
} finally {
exec.shutdownNow();
}
}

static final class SpConsumer extends AtomicInteger implements Flow.Subscriber<Object> {

static final int N = 1 << 20;

final CountDownLatch cdl = new CountDownLatch(1);

volatile Flow.Subscription upstream;

@Override
public void onSubscribe(Flow.Subscription subscription) {
upstream = subscription;
subscription.request(N);
}

@Override
public void onNext(Object item) {
System.out.println(item);
int i = getPlain() + 1;
setRelease(i);
if (i == N) {
upstream.cancel();
cdl.countDown();
}
}

@Override
public void onError(Throwable throwable) {
throwable.printStackTrace();
cdl.countDown();
}

@Override
public void onComplete() {
cdl.countDown();
}
}
}

2017-09-26 13:58 GMT+02:00 Viktor Klang <[hidden email]>:
Hi Pavel,

I trust that the code you submitted is for test case reproduction primarily, but there are a few spec violations in your Subscriber implementation that you'll want to address.

We're currently in the process of releasing a bridge jar between org.reactivestreams and j.u.c.Flow so you can plug in your Flow.* to the RS TCK to have them verified.

On Mon, Sep 25, 2017 at 3:21 PM, Pavel Rappo <[hidden email]> wrote:
Hi,

I've been using SubmissionPublisher in my own publisher implementation in order
to reuse its complex state machine that serializes invocations to subscribers.

While testing my implementation I ran into what I believe might be a race
condition in SubmissionPublisher.

Consider the following example:

    public class SubmissionPublisherTest {

        private final static int N = 1 << 20;

        private final AtomicInteger numbers = new AtomicInteger();
        private final SubmissionPublisher<Integer> pub = new
SubmissionPublisher<>();
        private final ExecutorService pubExecutor =
Executors.newSingleThreadExecutor();
        private final CountDownLatch finished = new CountDownLatch(1);

        public static void main(String[] args) throws InterruptedException {
            new SubmissionPublisherTest().run();
        }

        private void run() throws InterruptedException {
            pub.subscribe(newSubscriber());
            try {
                finished.await(30, TimeUnit.SECONDS);
            } finally {
                pubExecutor.shutdownNow();
            }
            System.out.println("Finished");
        }

        private Flow.Subscriber<Integer> newSubscriber() {
            return new Flow.Subscriber<>() {

                Flow.Subscription sub;
                int received;

                @Override
                public void onSubscribe(Flow.Subscription s) {
                    (this.sub = s).request(N);
                    publish();
                }

                @Override
                public void onNext(Integer item) {
                    if (++received == N) finished.countDown();
                    publish();
                    System.out.println(item);
                }

                @Override public void onError(Throwable t) { }
                @Override public void onComplete() { }
            };
        }

        private void publish() {
            int number = numbers.incrementAndGet();
            BiPredicate<Flow.Subscriber<? super Integer>, Integer>
                    onDropReportError = (s, i) -> { throw new
InternalError(); };
            pubExecutor.execute(() -> pub.offer(number, onDropReportError));
    //        pub.offer(number, onDropReportError);
        }
    }

What happens here?

Subscriber.onSubscribe bulk-requests a number (N = 1048576) of integers from
SubmissionPublisher. Subscriber.onNext prints the integer it has received and
offers a single integer to the publisher. run() waits until all N integers have
been received and then returns.

onSubscribe offers an initial integer to the SubmissionPublisher. This
kick-starts
the "feedback loop".

If you run this snippet, chances are good you will not see all 1048576 integers
printed out to the console before the program terminates. Instead, the output
will stop at some number m < 1048576 for no apparent reason.

However, if you comment out the line:

    pubExecutor.execute(() -> pub.offer(number, onDropReportError));

and uncomment the previously commented line, everything will work as expected.
The difference is that in this case offers of integers happen synchronously
rather than from a separate thread.

P.S. I have debugged a little bit and my guess (I'm not a concurrency expert)
is that accesses to SubmissionPublisher.BufferedSubscription.tail field are
undersynchronized. In particular, it might be the case that updates to this
field are not always seen by the consumer task in
BufferedSubscription.checkEmpty.

Is there any chance `tail` field is simply missing volatile modifier?

Thanks,
-Pavel
_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest



--
Cheers,

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest




--
Best regards,
David Karnok



--
Cheers,



--
Best regards,
David Karnok



--
Cheers,



--
Best regards,
David Karnok



--
Cheers,

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: A race in SubmissionPublisher?

JSR166 Concurrency mailing list
In reply to this post by Doug Lea

Back to...

On 09/26/2017 07:51 AM, Doug Lea wrote:

> On 09/25/2017 09:21 AM, Pavel Rappo wrote:
>> Hi,
>>
>> I've been using SubmissionPublisher in my own publisher implementation in order
>> to reuse its complex state machine that serializes invocations to subscribers.
>>
>> While testing my implementation I ran into what I believe might be a race
>> condition in SubmissionPublisher.
>
> Thanks for finding a use case that at first doesn't even seem legal,
> but I agree should work according to spec, and breaks assumptions
> about produce-consumer relations that can cause a wakeup not to be
> issued. I'll post a fix at the corresponding bug report:
>   https://bugs.openjdk.java.net/browse/JDK-8187947
> after deciding which of a couple of ways to address.
>

It took a while to decide, but a fix is now in jsr166 and will
hopefully be reviewed for next JDK release. Reviews would be welcome
(see
http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/src/main/java/util/concurrent/SubmissionPublisher.java?view=log)

While I'm at it: One of the goals for SubmissionPublisher is to
be a good (often the best) choice for any producer-consumer
design, not only those plugging into existing Reactive frameworks.
Performance should be good, and the API makes them the easy
to express once you get past initial unfamiliarity.

-Doug



_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest