A case of silent cancellation of SubmissionPublisher's tasks

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

A case of silent cancellation of SubmissionPublisher's tasks

Pavel Rappo
Hi,

I have a question regarding a behaviour of a SubmissionPublisher. The robustness
of its asynchronous machinery relies on the fact that a task being submitted to
an executor will either run or fail eventually. In both cases an appropriate
feedback will be provided.

The cases of RejectedExecutionException upon submitting, errors during delivery
and thread interruptions are taken care of.
But what about a subtle case when the task has been submitted and later removed
from the executor's internal queue? For example, a cancellation of the submitted
task or shutting down the executor. In this case the SubmissionPublisher won't
have any feedback might end silently waiting for nothing for an
arbitrary amount of time.

Consider the following code snippet.

    public static void main(String[] args) throws InterruptedException {

        ExecutorService testSupport = Executors.newCachedThreadPool();
        ExecutorService sharedExecutor = Executors.newFixedThreadPool(1);

        CountDownLatch latch = new CountDownLatch(1);

        SubmissionPublisher<Integer> p1 = new
SubmissionPublisher<>(sharedExecutor, 1024);
        SubmissionPublisher<Integer> p2 = new
SubmissionPublisher<>(sharedExecutor, 1024);

        testSupport.execute(() -> {
            p1.subscribe(createSubscriber(latch));
            p2.subscribe(createSubscriber(latch));

            for (int i = 0; i < 100; i++)
                p1.submit(i);

            for (int i = 0; i < 100; i++)
                p2.submit(i);
        });

        testSupport.execute(() -> {
            try {
                latch.await();
            } catch (InterruptedException ignored) { }
            sharedExecutor.shutdownNow();
        });

        testSupport.shutdown();
    }

    private static Flow.Subscriber<Integer>
createSubscriber(CountDownLatch latch) {
        return new Flow.Subscriber<Integer>() {
            @Override
            public void onSubscribe(Flow.Subscription subscription) {
                subscription.request(100);
                latch.countDown();
            }

            @Override
            public void onNext(Integer integer) {
                System.out.println(this + ":" + integer);
            }

            @Override
            public void onError(Throwable throwable) { }

            @Override
            public void onComplete() { }
        };
    }

It's very probable that during a run, the subscriber attached to 'p2' won't
receive anything. Moreover, 'p2' itself won't get any errors.

Is this expected? Thanks.

-Pavel
_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: A case of silent cancellation of SubmissionPublisher's tasks

Dávid Karnok

In RxJava (1.x, 2.x and its j.u.c.Flow port), it is expected that active flows are cancelled before the standard threadpools are shut down because tasks have no knowledge about what they drive. Besides, if they knew, firing onError could trigger more tasks to be scheduled and at that point, who knows what would get through and what would fail again and again.

2016. jan. 13. 12:54 ezt írta ("Pavel Rappo" <[hidden email]>):
Hi,

I have a question regarding a behaviour of a SubmissionPublisher. The robustness
of its asynchronous machinery relies on the fact that a task being submitted to
an executor will either run or fail eventually. In both cases an appropriate
feedback will be provided.

The cases of RejectedExecutionException upon submitting, errors during delivery
and thread interruptions are taken care of.
But what about a subtle case when the task has been submitted and later removed
from the executor's internal queue? For example, a cancellation of the submitted
task or shutting down the executor. In this case the SubmissionPublisher won't
have any feedback might end silently waiting for nothing for an
arbitrary amount of time.

Consider the following code snippet.

    public static void main(String[] args) throws InterruptedException {

        ExecutorService testSupport = Executors.newCachedThreadPool();
        ExecutorService sharedExecutor = Executors.newFixedThreadPool(1);

        CountDownLatch latch = new CountDownLatch(1);

        SubmissionPublisher<Integer> p1 = new
SubmissionPublisher<>(sharedExecutor, 1024);
        SubmissionPublisher<Integer> p2 = new
SubmissionPublisher<>(sharedExecutor, 1024);

        testSupport.execute(() -> {
            p1.subscribe(createSubscriber(latch));
            p2.subscribe(createSubscriber(latch));

            for (int i = 0; i < 100; i++)
                p1.submit(i);

            for (int i = 0; i < 100; i++)
                p2.submit(i);
        });

        testSupport.execute(() -> {
            try {
                latch.await();
            } catch (InterruptedException ignored) { }
            sharedExecutor.shutdownNow();
        });

        testSupport.shutdown();
    }

    private static Flow.Subscriber<Integer>
createSubscriber(CountDownLatch latch) {
        return new Flow.Subscriber<Integer>() {
            @Override
            public void onSubscribe(Flow.Subscription subscription) {
                subscription.request(100);
                latch.countDown();
            }

            @Override
            public void onNext(Integer integer) {
                System.out.println(this + ":" + integer);
            }

            @Override
            public void onError(Throwable throwable) { }

            @Override
            public void onComplete() { }
        };
    }

It's very probable that during a run, the subscriber attached to 'p2' won't
receive anything. Moreover, 'p2' itself won't get any errors.

Is this expected? Thanks.

-Pavel
_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: A case of silent cancellation of SubmissionPublisher's tasks

Viktor Klang
Apologies in advance for "butting in",

To generalise even further around "The robustness
of its asynchronous machinery relies on the fact that a task being submitted to
an executor will either run or fail eventually."
and
"But what about a subtle case when the task has been submitted and later removed
from the executor's internal queue?"

The only way to try to get around from that is to require an interface other than Executor, since the guarantees of Executor is too weak. An implementation of that other interface which violates its contract is indistinguishable from a bug.



On Wed, Jan 13, 2016 at 2:19 PM, Dávid Karnok <[hidden email]> wrote:

In RxJava (1.x, 2.x and its j.u.c.Flow port), it is expected that active flows are cancelled before the standard threadpools are shut down because tasks have no knowledge about what they drive. Besides, if they knew, firing onError could trigger more tasks to be scheduled and at that point, who knows what would get through and what would fail again and again.

2016. jan. 13. 12:54 ezt írta ("Pavel Rappo" <[hidden email]>):
Hi,

I have a question regarding a behaviour of a SubmissionPublisher. The robustness
of its asynchronous machinery relies on the fact that a task being submitted to
an executor will either run or fail eventually. In both cases an appropriate
feedback will be provided.

The cases of RejectedExecutionException upon submitting, errors during delivery
and thread interruptions are taken care of.
But what about a subtle case when the task has been submitted and later removed
from the executor's internal queue? For example, a cancellation of the submitted
task or shutting down the executor. In this case the SubmissionPublisher won't
have any feedback might end silently waiting for nothing for an
arbitrary amount of time.

Consider the following code snippet.

    public static void main(String[] args) throws InterruptedException {

        ExecutorService testSupport = Executors.newCachedThreadPool();
        ExecutorService sharedExecutor = Executors.newFixedThreadPool(1);

        CountDownLatch latch = new CountDownLatch(1);

        SubmissionPublisher<Integer> p1 = new
SubmissionPublisher<>(sharedExecutor, 1024);
        SubmissionPublisher<Integer> p2 = new
SubmissionPublisher<>(sharedExecutor, 1024);

        testSupport.execute(() -> {
            p1.subscribe(createSubscriber(latch));
            p2.subscribe(createSubscriber(latch));

            for (int i = 0; i < 100; i++)
                p1.submit(i);

            for (int i = 0; i < 100; i++)
                p2.submit(i);
        });

        testSupport.execute(() -> {
            try {
                latch.await();
            } catch (InterruptedException ignored) { }
            sharedExecutor.shutdownNow();
        });

        testSupport.shutdown();
    }

    private static Flow.Subscriber<Integer>
createSubscriber(CountDownLatch latch) {
        return new Flow.Subscriber<Integer>() {
            @Override
            public void onSubscribe(Flow.Subscription subscription) {
                subscription.request(100);
                latch.countDown();
            }

            @Override
            public void onNext(Integer integer) {
                System.out.println(this + ":" + integer);
            }

            @Override
            public void onError(Throwable throwable) { }

            @Override
            public void onComplete() { }
        };
    }

It's very probable that during a run, the subscriber attached to 'p2' won't
receive anything. Moreover, 'p2' itself won't get any errors.

Is this expected? Thanks.

-Pavel
_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest




--
Cheers,

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest