java.util.concurrent.Flow with opposite direction of item flow

classic Classic list List threaded Threaded
20 messages Options
Reply | Threaded
Open this post in threaded view
|

java.util.concurrent.Flow with opposite direction of item flow

JSR166 Concurrency mailing list
Consider the following structure of a multithreaded program:
 - a single BlockingQueue
- several Producer threads which push their results to the queue
- several Consumer threads which take items from the queue

Now I want to replace some of the Producers and Consumers with their
asynchronous analogs: AsyncProducer and AsyncConsumer.
First, I need to add asynchronous interfaces to the BlockingQueue, making it
AsyncBlockingQueue. As a base, I can use my own implementation, not those
from java.util.concurrent.
For interaction between AsyncBlockingQueue and AsyncConsumer interfaces from
java.util.concurrent.Flow are sutable: AsyncBlockingQueue implements
Flow.Publisher and AsyncConsumer implements Flow.Subscriber.
But for communication between AsyncProducer and AsyncBlockingQueue
java.util.concurrent.Flow is not sutable.
Let AsyncProducer implements  Flow.Publisher and AsyncBlockingQueue
implements Flow.Subscriber.
Then, when the queue has room to store items, it should call
Flow.Subscription#request(n), where sum of all arguments 'n' shoud not
exceed the amount of available memory.
But as there are many subscriptions, which subscriptions to choose?
The queue should know which AsyncProducers are ready to submit data,
otherwise, the resource of its buffer memory is wasted.
The evident solution to this problem is, Producers must play active role and
so be like Subscribers, able to call Subscription#request() at their own
discretion,
Such a protocol I named ReverseFlow (native English speakers are invited to
propose a better name).
It is published  at
https://gist.github.com/akaigoro/9506659d7d87a85a2a58a647405d85d6

So I'd like to propose to include ReverseFlow interfaces in JDK, and provide
a reference implementation of AsyncBlockingQueue interface, which extends
BlockingQueue, ReverseFlow.Publisher, and Flow.Publisher.






--
Sent from: http://jsr166-concurrency.10961.n7.nabble.com/
_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: java.util.concurrent.Flow with opposite direction of item flow

JSR166 Concurrency mailing list
Hi.

Your use case can be done with existing operators from reactive libraries without introducing new protocols. Examples below use RxJava

For the multiple-producer part, you can merge() a set of known generators implemented as Publishers:

    Publisher<T> p1 = ...
    Publisher<T> p2 = ...  
    Publisher<T> p3 = ...    

    Flowable<T> merged = Flowable.mergeArray(p1, p2, p3);

If the generators can appear dynamically, you can submit them to a Processor and identity-flatMap the sequence:

    Processor<Publisher<T>> processor = PublishProcessor.create();

    Flowable<T> mergedDynamic = processor.flatMap(p -> p);

    processor.onNext(p1);
    processor.onNext(p2);
    // ...

For the multiple distinct consumer part, there is no standard RxJava component to do it. However, I have a separate library for uncommon or esoteric reactive components that has the DispatchWorkProcessor (https://github.com/akarnokd/RxJavaExtensions#dispatchworkprocessor) whose purpose is to dispatch each item to only one of the consumers ready to receive.

    DispatchWorkProcessor<T> dispatch = DispatchWorkProcessor.create(Schedulers.newThread());

    merged.subscribe(dispatch);

    dispatch.subscribe(consumer1);
    dispatch.subscribe(consumer2); 
    dispatch.subscribe(consumer3);

For interoperating with JDK Flow types, you can use my other library for Flow interop: https://github.com/akarnokd/RxJavaJdk9Interop#examples

Alexei Kaigorodov via Concurrency-interest <[hidden email]> ezt írta (időpont: 2019. dec. 19., Cs, 14:23):
Consider the following structure of a multithreaded program:
 - a single BlockingQueue
- several Producer threads which push their results to the queue
- several Consumer threads which take items from the queue

Now I want to replace some of the Producers and Consumers with their
asynchronous analogs: AsyncProducer and AsyncConsumer.
First, I need to add asynchronous interfaces to the BlockingQueue, making it
AsyncBlockingQueue. As a base, I can use my own implementation, not those
from java.util.concurrent.
For interaction between AsyncBlockingQueue and AsyncConsumer interfaces from
java.util.concurrent.Flow are sutable: AsyncBlockingQueue implements
Flow.Publisher and AsyncConsumer implements Flow.Subscriber.
But for communication between AsyncProducer and AsyncBlockingQueue
java.util.concurrent.Flow is not sutable.
Let AsyncProducer implements  Flow.Publisher and AsyncBlockingQueue
implements Flow.Subscriber.
Then, when the queue has room to store items, it should call
Flow.Subscription#request(n), where sum of all arguments 'n' shoud not
exceed the amount of available memory.
But as there are many subscriptions, which subscriptions to choose?
The queue should know which AsyncProducers are ready to submit data,
otherwise, the resource of its buffer memory is wasted.
The evident solution to this problem is, Producers must play active role and
so be like Subscribers, able to call Subscription#request() at their own
discretion,
Such a protocol I named ReverseFlow (native English speakers are invited to
propose a better name).
It is published  at
https://gist.github.com/akaigoro/9506659d7d87a85a2a58a647405d85d6

So I'd like to propose to include ReverseFlow interfaces in JDK, and provide
a reference implementation of AsyncBlockingQueue interface, which extends
BlockingQueue, ReverseFlow.Publisher, and Flow.Publisher.






--
Sent from: http://jsr166-concurrency.10961.n7.nabble.com/
_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest


--
Best regards,
David Karnok

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: java.util.concurrent.Flow with opposite direction of item flow

JSR166 Concurrency mailing list
In reply to this post by JSR166 Concurrency mailing list
How can it be both
Blocking and Async?

I think there is some misalignment with the intended uses of Producer and Consumer. The whole idea behind Flow protocol is that both Producers and Consumers are backpressure-aware. But the Blocking Queue in the middle implies that you want a Producer that is not backpressure-aware, so back pressure is essentially implemented through blocking.

Alex


On Thu, 19 Dec 2019, 14:24 Alexei Kaigorodov via Concurrency-interest, <[hidden email]> wrote:
Consider the following structure of a multithreaded program:
 - a single BlockingQueue
- several Producer threads which push their results to the queue
- several Consumer threads which take items from the queue

Now I want to replace some of the Producers and Consumers with their
asynchronous analogs: AsyncProducer and AsyncConsumer.
First, I need to add asynchronous interfaces to the BlockingQueue, making it
AsyncBlockingQueue. As a base, I can use my own implementation, not those
from java.util.concurrent.
For interaction between AsyncBlockingQueue and AsyncConsumer interfaces from
java.util.concurrent.Flow are sutable: AsyncBlockingQueue implements
Flow.Publisher and AsyncConsumer implements Flow.Subscriber.
But for communication between AsyncProducer and AsyncBlockingQueue
java.util.concurrent.Flow is not sutable.
Let AsyncProducer implements  Flow.Publisher and AsyncBlockingQueue
implements Flow.Subscriber.
Then, when the queue has room to store items, it should call
Flow.Subscription#request(n), where sum of all arguments 'n' shoud not
exceed the amount of available memory.
But as there are many subscriptions, which subscriptions to choose?
The queue should know which AsyncProducers are ready to submit data,
otherwise, the resource of its buffer memory is wasted.
The evident solution to this problem is, Producers must play active role and
so be like Subscribers, able to call Subscription#request() at their own
discretion,
Such a protocol I named ReverseFlow (native English speakers are invited to
propose a better name).
It is published  at
https://gist.github.com/akaigoro/9506659d7d87a85a2a58a647405d85d6

So I'd like to propose to include ReverseFlow interfaces in JDK, and provide
a reference implementation of AsyncBlockingQueue interface, which extends
BlockingQueue, ReverseFlow.Publisher, and Flow.Publisher.






--
Sent from: http://jsr166-concurrency.10961.n7.nabble.com/
_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: How can it be both Blocking and Async?

JSR166 Concurrency mailing list
/How can it be both Blocking and Async?/
Good question.
Consider a thread attempting to perform BlockinQueue#take(). Its algorithm
is blocked, In contrary to BlockinQueue#poll, where the algorithm is not
blocked.
Note the hardware processor is not blocked - the thread just leaves it.
So "blocking" means algorithm blocking and not blocking of underlying
processor.
In asynchronous case, if a task cannot get required data, it leaves the
working thread. The working thread (usually taken from a thread pool) is not
blocked. This is similar to the sync case, only the role of processor is
played by the working thread. Since the algorithm is blocked, we call this
asynchronous case blocking.



--
Sent from: http://jsr166-concurrency.10961.n7.nabble.com/
_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: java.util.concurrent.Flow with opposite direction of item flow

JSR166 Concurrency mailing list
In reply to this post by JSR166 Concurrency mailing list
"But the Blocking Queue in the middle implies that you want a Producer that
is not backpressure-aware"
No. In synchronous (pure multithreaded) case, Producer  is
backpressure-aware: it is blocked when the queue is full.
And I want to keep backpressure in async case, both for Producer writing to
the queue, and for Consumer reading from the queue.



--
Sent from: http://jsr166-concurrency.10961.n7.nabble.com/
_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: java.util.concurrent.Flow with opposite direction of item flow

JSR166 Concurrency mailing list
In reply to this post by JSR166 Concurrency mailing list
/   Publisher<T> p1 = ...
    Publisher<T> p2 = ...  
    Publisher<T> p3 = ...    
...

If the generators can appear dynamically, you can submit them to a Processor
and identity-flatMap the sequence:

    Processor<Publisher&lt;T>> processor = PublishProcessor.create();

    Flowable<T> mergedDynamic = processor.flatMap(p -> p);

    processor.onNext(p1);
    processor.onNext(p2);
  /
1. Does it mean that the output of Publishers p1 and p2 appear on the output
of processor in parallel, without delays?
2. Is there an opposite operation to  /processor.onNext(p1)/, when p1 wants
to stop sending data to the processor?




--
Sent from: http://jsr166-concurrency.10961.n7.nabble.com/
_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: java.util.concurrent.Flow with opposite direction of item flow

JSR166 Concurrency mailing list
>  1. Does it mean that the output of Publishers p1 and p2 appear on the output
of processor in parallel, without delays?

No. merge/flatMap produces a Publisher and as such, its onNext is serialized with respect to the sources. You can think of them as putting items into a concurrent queue but only one thread is ever polling the same queue.

> 2. Is there an opposite operation to  /processor.onNext(p1)/, when p1 wants
to stop sending data to the processor?

p1 should signal onComplete(). If you don't exactly control p1, there are operators (such as takeUntil) and ways to intervene:

Publisher<T> p1 = ...
Processor<Object> stopP1 = PublishProcessor.create();

processor.onNext(Flowable.fromPublisher(p1).takeUntil(stopP1));

// later
stopP1.onComplete(); 


Alexei Kaigorodov via Concurrency-interest <[hidden email]> ezt írta (időpont: 2019. dec. 19., Cs, 17:33):
/   Publisher<T> p1 = ...
    Publisher<T> p2 = ... 
    Publisher<T> p3 = ...   
...

If the generators can appear dynamically, you can submit them to a Processor
and identity-flatMap the sequence:

    Processor<Publisher&lt;T>> processor = PublishProcessor.create();

    Flowable<T> mergedDynamic = processor.flatMap(p -> p);

    processor.onNext(p1);
    processor.onNext(p2);
  /
1. Does it mean that the output of Publishers p1 and p2 appear on the output
of processor in parallel, without delays?
2. Is there an opposite operation to  /processor.onNext(p1)/, when p1 wants
to stop sending data to the processor?




--
Sent from: http://jsr166-concurrency.10961.n7.nabble.com/
_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest


--
Best regards,
David Karnok

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: java.util.concurrent.Flow with opposite direction of item flow

JSR166 Concurrency mailing list
/ merge/flatMap produces a Publisher and as such, its onNext is serialized
with respect to the sources/
so I assume RxJava cannot provide solution for asynchronous
Provider=>BlockingQueue communication pattern which behaves exactly as
traditional synchronous communication.



--
Sent from: http://jsr166-concurrency.10961.n7.nabble.com/
_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: java.util.concurrent.Flow with opposite direction of item flow

JSR166 Concurrency mailing list
Reactive Streams and JDK Flow is about non-blocking flow control so we in RxJava avoid blocking in our constructs to a greater extent. Hence we don't provide means to work with blocking queues directly; you have to lay the blocking pipes across operators for yourself.

    BlockingQueue q;

    generator.doOnNext(q::put).subscribe();

    Flowable.create(emitter -> emitter.onNext(q.take())).subscribe();


I'd suggest rethinking the prerequisite of blocking constructs to solve your original problem.

Alexei Kaigorodov via Concurrency-interest <[hidden email]> ezt írta (időpont: 2019. dec. 20., P, 7:39):
/ merge/flatMap produces a Publisher and as such, its onNext is serialized
with respect to the sources/
so I assume RxJava cannot provide solution for asynchronous
Provider=>BlockingQueue communication pattern which behaves exactly as
traditional synchronous communication.



--
Sent from: http://jsr166-concurrency.10961.n7.nabble.com/
_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest


--
Best regards,
David Karnok

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: java.util.concurrent.Flow with opposite direction of item flow

JSR166 Concurrency mailing list
In reply to this post by JSR166 Concurrency mailing list
Backpressure awareness is when the component can tell when it is going to get blocked. Blocking is the result of being unaware - the queue doesn't provide any API that the Producer can interact with to work out when it is going to get blocked. RxJava and JDK Flow provide such an API.

Alex

On Thu, 19 Dec 2019, 17:07 Alexei Kaigorodov via Concurrency-interest, <[hidden email]> wrote:
"But the Blocking Queue in the middle implies that you want a Producer that
is not backpressure-aware"
No. In synchronous (pure multithreaded) case, Producer  is
backpressure-aware: it is blocked when the queue is full.
And I want to keep backpressure in async case, both for Producer writing to
the queue, and for Consumer reading from the queue.



--
Sent from: http://jsr166-concurrency.10961.n7.nabble.com/
_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: java.util.concurrent.Flow with opposite direction of item flow

JSR166 Concurrency mailing list
In reply to this post by JSR166 Concurrency mailing list
/we don't provide means to work with blocking queues directly/
In order to solve the problem described in the first message, I need a queue
which implements both synchronous blocking interface and asynchronous
non-blocking interfaces with backpressure. Evidently, such a queue must have
2 different async interfaces: one for writing into the queue and one for
reading from it. I'd like to reuse existing interfaces, for
interoperability. I found that j.u.c.Flow is suitable for reading, but could
not find an existing interface for writing. However, it was easy to design
such an interface, and I provided a reference to my design. I do not need an
implementation, I already done one.




--
Sent from: http://jsr166-concurrency.10961.n7.nabble.com/
_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: java.util.concurrent.Flow with opposite direction of item flow

JSR166 Concurrency mailing list
In reply to this post by JSR166 Concurrency mailing list
/RxJava and JDK Flow provide such an API./
This is half true. They provide async interface for reading, but not for
writing. See my reply to Dávid above.



--
Sent from: http://jsr166-concurrency.10961.n7.nabble.com/
_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: java.util.concurrent.Flow with opposite direction of item flow

JSR166 Concurrency mailing list
I don't see why one would expect symmetry between consumers and producers.

The API you provided a gist for is hard to understand without further explanation of what behaviour is expected from the Producers and Consumers.

Alex

On Fri, 20 Dec 2019, 13:19 Alexei Kaigorodov via Concurrency-interest, <[hidden email]> wrote:
/RxJava and JDK Flow provide such an API./
This is half true. They provide async interface for reading, but not for
writing. See my reply to Dávid above.



--
Sent from: http://jsr166-concurrency.10961.n7.nabble.com/
_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: java.util.concurrent.Flow with opposite direction of item flow

JSR166 Concurrency mailing list
/I don't see why one would expect symmetry between consumers and producers./
This symmetry stems from the original synchronous interface: when
communicating with  BlockingQueue, both consumers and producers are callers
of the methods of the queue (take() and put(), respectively). When a
synchronous interface is converted to asynchronous, callers are transformed
to subscribers, and callees are transformed to publishers. So producers and
consumers become subscribers, and the queue becomes Publisher of 2 different
kinds: Flow.Publisher and ReverseFlow.Publisher. Since the signatures of the
methods put() and take() are different, the resulting asynchronous
interfaces are different: when all the preparations like subscribe() and
request() are done and actual transmission of information can be performed,
it is always done by the publisher side, and in our case the publisher side
is the queue on both ends. So to move information from producer to the
queue, the queue calls ReverseFlow.Subscriber.remove(), and to move
information from the queue to consumer, the queue calls
Flow.Subscriber.onNext(item).



--
Sent from: http://jsr166-concurrency.10961.n7.nabble.com/
_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: java.util.concurrent.Flow with opposite direction of item flow

JSR166 Concurrency mailing list
This symmetry is superficial. The methods block for different reasons. Which means they don't necessarily have the same representation in an asynchronous world.

If you were to try and add symmetry, first you need a primitive to replace the queue. Let's call it HalfQueue. It has just one method, Y put(X). Then you "entangle" two such half-queues so the argument of put() on one end is the return value of the put() of the other end. Then both the producers and the consumers call put() on different ends of the "entangled" half-queues, and block until a corresponding put() is called on the other end. The consumers "produce" Unit objects, and heed the result of such put(), the producers put nontrivial objects and discard the Unit "produced" by consumer. The asymmetry should become obvious: for this to represent a BlockingQueue with some sized buffer, the consumer end must "produce" some Unit objects ahead of time. This difference in where they are placed in time is the representation that the consumers drive the flow by requesting a number of items. This is the key difference between the Producers and the Consumers, whether it is a synchronous world with blocking or asynchronous world. 

Alex

On Sat, 21 Dec 2019, 03:40 Alexei Kaigorodov via Concurrency-interest, <[hidden email]> wrote:
/I don't see why one would expect symmetry between consumers and producers./
This symmetry stems from the original synchronous interface: when
communicating with  BlockingQueue, both consumers and producers are callers
of the methods of the queue (take() and put(), respectively). When a
synchronous interface is converted to asynchronous, callers are transformed
to subscribers, and callees are transformed to publishers. So producers and
consumers become subscribers, and the queue becomes Publisher of 2 different
kinds: Flow.Publisher and ReverseFlow.Publisher. Since the signatures of the
methods put() and take() are different, the resulting asynchronous
interfaces are different: when all the preparations like subscribe() and
request() are done and actual transmission of information can be performed,
it is always done by the publisher side, and in our case the publisher side
is the queue on both ends. So to move information from producer to the
queue, the queue calls ReverseFlow.Subscriber.remove(), and to move
information from the queue to consumer, the queue calls
Flow.Subscriber.onNext(item).



--
Sent from: http://jsr166-concurrency.10961.n7.nabble.com/
_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: java.util.concurrent.Flow with opposite direction of item flow

JSR166 Concurrency mailing list
/This symmetry is superficial./
Yes it is.
Note you were the first who used the word symmetry. I did not claim there
must be symmetry, I only said:
/Producers must play active role and so be like Subscribers, able to call
Subscription#request() at their own discretion/
So Producers must be able to call Subscription#request(), that's it. The
data transfer is made differently compared to communication between
Flow.Publishers and Flow.Subscribers.
/ the return value of the put()/
BlockingQueue#put() has no return value. Maybe you meant take()? Anyway, I
am sure we can stay in the bounds of BlockingQueue and introducing any other
blocking methods only makes life harder.



--
Sent from: http://jsr166-concurrency.10961.n7.nabble.com/
_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: java.util.concurrent.Flow with opposite direction of item flow

JSR166 Concurrency mailing list
Yes, I said symmetry, because that's what I can think of when justifying why producer "must" be able to "call request" "at their own discretion". In other words, if you object to symmetry, you need to find a justification for the claim that the Producer "must" be able to "call request".

Alex

On Sat, 21 Dec 2019, 07:17 Alexei Kaigorodov via Concurrency-interest, <[hidden email]> wrote:
/This symmetry is superficial./
Yes it is.
Note you were the first who used the word symmetry. I did not claim there
must be symmetry, I only said:
/Producers must play active role and so be like Subscribers, able to call
Subscription#request() at their own discretion/
So Producers must be able to call Subscription#request(), that's it. The
data transfer is made differently compared to communication between
Flow.Publishers and Flow.Subscribers.
/ the return value of the put()/
BlockingQueue#put() has no return value. Maybe you meant take()? Anyway, I
am sure we can stay in the bounds of BlockingQueue and introducing any other
blocking methods only makes life harder.



--
Sent from: http://jsr166-concurrency.10961.n7.nabble.com/
_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: java.util.concurrent.Flow with opposite direction of item flow

JSR166 Concurrency mailing list
The justification  is obvious: in synchronous case, the communication is
initiated by Producer: it calls BlockingQueue#put(item), while the queue
does not call anything like Producer#get(). We want to convert synchronous
communication to asynchronous with minimal efforts, so we still need active
Producer and passive Queue. And since in the pair Publisher/Subscriber
Publisher is passive and Subscriber is active, AsyncQueue must act like
Publisher and Producer must act like Subscriber.



--
Sent from: http://jsr166-concurrency.10961.n7.nabble.com/
_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: java.util.concurrent.Flow with opposite direction of item flow

JSR166 Concurrency mailing list
No, that's far from obvious. If you follow the half-queue example, you will see that the communication is initiated by the consumer initialising the buffer with Unit objects. (Ok, array full of nulls in Java world. Only without the half-queue example it is harder to see that it is the consumer that is doing it)

Alex

On Sat, 21 Dec 2019, 07:34 Alexei Kaigorodov via Concurrency-interest, <[hidden email]> wrote:
The justification  is obvious: in synchronous case, the communication is
initiated by Producer: it calls BlockingQueue#put(item), while the queue
does not call anything like Producer#get(). We want to convert synchronous
communication to asynchronous with minimal efforts, so we still need active
Producer and passive Queue. And since in the pair Publisher/Subscriber
Publisher is passive and Subscriber is active, AsyncQueue must act like
Publisher and Producer must act like Subscriber.



--
Sent from: http://jsr166-concurrency.10961.n7.nabble.com/
_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: java.util.concurrent.Flow with opposite direction of item flow

JSR166 Concurrency mailing list
no, i did not follow the the half-queue example. I have BlockingQueue with
strictly defined interface, and want to access it asynchronously. I do not
need to access fictional half-queues, where /the consumer initialising the
buffer with Unit objects./ - in real life, BlockingQueue  implementations
initialize the buffer without the help of a consumer. Your half-queue can
behave in very strange way, but they are not related to the problem I trying
to solve: how to augment existing means of inter-thread communication with
asynchronous interfaces, so that they could connect threads and asynchronous
tasks in any combinations.



--
Sent from: http://jsr166-concurrency.10961.n7.nabble.com/
_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest