draft Carrier API

classic Classic list List threaded Threaded
28 messages Options
12
Reply | Threaded
Open this post in threaded view
|

Re: draft Carrier API

JSR166 Concurrency mailing list
Thanks, that will probably work. Or some tryReceive version like poll with timeout. Passing max_value is essentially blocking until closed or a value arrives (or spurious wakeup-like condition). 

Alex

On Tue, 10 Mar 2020, 11:30 Doug Lea via Concurrency-interest, <[hidden email]> wrote:
On 3/9/20 3:29 PM, Alex Otenko wrote:
> IllegalStateException is ok if receiver should've known there are no
> more items to receive. This is a good idea in cases with definite length
> of stream, and the length being known to the receiver before entering
> receive(). This doesn't seem like a good idea for indefinite length
> cases - like, loop to read all items until eof.
>
This is the reason for:
    Stream<T> stream();             // destructive (consume-on-traverse)
But it is also sensible to provide a simpler forEach analog:
    long consumeEach(Consumer<? super T> proc); // return count

For those who need stateful loops, we could add "eventually" forms of
tryReceive. With non-value-types, the preferable form that can co-exist
with value-types is usually to return a resultIfAbsent (that is almost
always chosen to be null), and for value types, Optional. To avoid
annoying people, we should probably have both.

    T tryReceive(T resultIfAbsent); // resultIfAbsent if closed or empty
    Optional<T> tryReceive();       // Optional.empty if closed or empty

    T tryReceiveEventually(T resultIfAbsent); // resultIfAbsent if closed
    Optional<T> tryReceiveEventually(); // Optional.empty if closed

Maybe there is a better method name.

(See updates at http://gee.cs.oswego.edu/dl/wwwtmp/Carrier.java)

-Doug

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: draft Carrier API

JSR166 Concurrency mailing list
As an API comparison point, Clojure's CSP library is called core.async. It features channels and send/receive operations, and the CSP select operation called "alts". The alts op is a plain function (no compiler magic) which non-deterministically chooses the first operation that can proceed, whether it is a channel put or a take. Channel buffers are polymorphic (fixed, sliding and dropping; or no buffer, meaning rendezvous semantics).

Channel put returns whether the put succeeded (false if channel is closed).  Critically, the channel isClosed predicate is not exposed publicly, as it is hard to use without a TOCTOU bug.  Channel take receives the item (which is any reference, including Boolean false!) or receives nil when a channel is closed. There is only a single concrete implementation of a channel, ManyToManyChannel, but the underlying interfaces are split into read/write.

When is it useful to use the exception throwing variants of Carrier, rather than trySend/tryReceive? Seems like there is a whole lot of API surface area to deal with channels being closed or closing.

From a receivers point of view, the distinction between shutdown & closed seems arbitrary. You're not done receiving until a buffer (if present) drains. Over the years, I have found from my usage of core.async that the most useful ops around termination are: a sender signaling that they're done, or a receiver abandoning the interaction early. Generally only one side is in charge of closing a channel, but in the consumer abandonment scenario, the producer will detect that the channel has closed during its next put (then walks away, too.)

I don't understand the distinction between send+timeout and sendSynchronously(). Isn't synchronicity more a property of whether a buffer is present or not (rendezvous channel)?  Same question re: tryReceiveEventually

I love that there is something CSP-like going into the JVM - it will be killer when Loom drops.

On Tue, Mar 10, 2020 at 7:49 AM Alex Otenko via Concurrency-interest <[hidden email]> wrote:
Thanks, that will probably work. Or some tryReceive version like poll with timeout. Passing max_value is essentially blocking until closed or a value arrives (or spurious wakeup-like condition). 

Alex

On Tue, 10 Mar 2020, 11:30 Doug Lea via Concurrency-interest, <[hidden email]> wrote:
On 3/9/20 3:29 PM, Alex Otenko wrote:
> IllegalStateException is ok if receiver should've known there are no
> more items to receive. This is a good idea in cases with definite length
> of stream, and the length being known to the receiver before entering
> receive(). This doesn't seem like a good idea for indefinite length
> cases - like, loop to read all items until eof.
>
This is the reason for:
    Stream<T> stream();             // destructive (consume-on-traverse)
But it is also sensible to provide a simpler forEach analog:
    long consumeEach(Consumer<? super T> proc); // return count

For those who need stateful loops, we could add "eventually" forms of
tryReceive. With non-value-types, the preferable form that can co-exist
with value-types is usually to return a resultIfAbsent (that is almost
always chosen to be null), and for value types, Optional. To avoid
annoying people, we should probably have both.

    T tryReceive(T resultIfAbsent); // resultIfAbsent if closed or empty
    Optional<T> tryReceive();       // Optional.empty if closed or empty

    T tryReceiveEventually(T resultIfAbsent); // resultIfAbsent if closed
    Optional<T> tryReceiveEventually(); // Optional.empty if closed

Maybe there is a better method name.

(See updates at http://gee.cs.oswego.edu/dl/wwwtmp/Carrier.java)

-Doug

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: draft Carrier API

JSR166 Concurrency mailing list
I think the mapping between poll, take of blocking queue and various versions of receive are fairly obvious. Same for offer, put and various send, including the versions with timeouts. The substantial difference is in dealing with "closed" states in blocking send/receive. It is difficult to signal that state without changing return type. Returning arbitrary references or null is a source of errors. I see returning Optional as a necessary deviation from the direct mapping of the blocking put/take, but I see dealing with Optional can also be awkward. I guess we'll wait and see.

SendSynchronous is not just a wait for send to succeed, it also ensures the corresponding receive consuming the sent item has occurred. You can see this as an ack of that item and all the preceding items, although I don't know if the API goes as far as the guarantee for the preceding receives.


Alex

On Wed, 11 Mar 2020, 03:10 Ghadi Shayban, <[hidden email]> wrote:
As an API comparison point, Clojure's CSP library is called core.async. It
features channels and send/receive operations, and the CSP select operation
called "alts". The alts op is a plain function (no compiler magic) which
non-deterministically chooses the first operation that can proceed, whether
it is a channel put or a take. Channel buffers are polymorphic (fixed,
sliding and dropping; or no buffer, meaning rendezvous semantics).

Channel put returns whether the put succeeded (false if channel is
closed).  Critically, the channel isClosed predicate is not exposed
publicly, as it is hard to use without a TOCTOU bug.  Channel take receives
the item (which is any reference, including Boolean false!) or receives nil
when a channel is closed. There is only a single concrete implementation of
a channel, ManyToManyChannel, but the underlying interfaces are split into
read/write.

When is it useful to use the exception throwing variants of Carrier, rather
than trySend/tryReceive? Seems like there is a whole lot of API surface
area to deal with channels being closed or closing.

From a receivers point of view, the distinction between shutdown & closed
seems arbitrary. You're not done receiving until a buffer (if present)
drains. Over the years, I have found from my usage of core.async that the
most useful ops around termination are: a sender signaling that they're
done, or a receiver abandoning the interaction early. Generally only one
side is in charge of closing a channel, but in the consumer abandonment
scenario, the producer will detect that the channel has closed during its
next put (then walks away, too.)

I don't understand the distinction between send+timeout and
sendSynchronously(). Isn't synchronicity more a property of whether a
buffer is present or not (rendezvous channel)?  Same question re:
tryReceiveEventually

I love that there is something CSP-like going into the JVM - it will be
killer when Loom drops.

On Tue, Mar 10, 2020 at 7:49 AM Alex Otenko via Concurrency-interest <
[hidden email]> wrote:

> Thanks, that will probably work. Or some tryReceive version like poll with
> timeout. Passing max_value is essentially blocking until closed or a value
> arrives (or spurious wakeup-like condition).
>
> Alex
>
> On Tue, 10 Mar 2020, 11:30 Doug Lea via Concurrency-interest, <
> [hidden email]> wrote:
>
>> On 3/9/20 3:29 PM, Alex Otenko wrote:
>> > IllegalStateException is ok if receiver should've known there are no
>> > more items to receive. This is a good idea in cases with definite length
>> > of stream, and the length being known to the receiver before entering
>> > receive(). This doesn't seem like a good idea for indefinite length
>> > cases - like, loop to read all items until eof.
>> >
>> This is the reason for:
>>     Stream<T> stream();             // destructive (consume-on-traverse)
>> But it is also sensible to provide a simpler forEach analog:
>>     long consumeEach(Consumer<? super T> proc); // return count
>>
>> For those who need stateful loops, we could add "eventually" forms of
>> tryReceive. With non-value-types, the preferable form that can co-exist
>> with value-types is usually to return a resultIfAbsent (that is almost
>> always chosen to be null), and for value types, Optional. To avoid
>> annoying people, we should probably have both.
>>
>>     T tryReceive(T resultIfAbsent); // resultIfAbsent if closed or empty
>>     Optional<T> tryReceive();       // Optional.empty if closed or empty
>>
>>     T tryReceiveEventually(T resultIfAbsent); // resultIfAbsent if closed
>>     Optional<T> tryReceiveEventually(); // Optional.empty if closed
>>
>> Maybe there is a better method name.
>>
>> (See updates at http://gee.cs.oswego.edu/dl/wwwtmp/Carrier.java)
>>
>> -Doug
>>
>> _______________________________________________
>> Concurrency-interest mailing list
>> [hidden email]
>> http://cs.oswego.edu/mailman/listinfo/concurrency-interest
>>
> _______________________________________________
> Concurrency-interest mailing list
> [hidden email]
> http://cs.oswego.edu/mailman/listinfo/concurrency-interest
>

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: draft Carrier API

JSR166 Concurrency mailing list
In reply to this post by JSR166 Concurrency mailing list
The method sendSynchronously() requires to wrap each message in an envelope
which also contains a reference to sender, to pass ack signal. This is a
pure performance loss for (most frequent) cases where acknowledge is not
required. Besides, this method has little sense. It notificates sender that
the message has left the carrier, but did it reached the destination? There
can be other intermediate steps between the sender and the destination.
I propose to remove this method.



--
Sent from: http://jsr166-concurrency.10961.n7.nabble.com/
_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: draft Carrier API

JSR166 Concurrency mailing list
In reply to this post by JSR166 Concurrency mailing list

> On 11 Mar 2020, at 05:00, John Rose <[hidden email]> wrote:
>
> On Mar 8, 2020, at 6:37 AM, Doug Lea <[hidden email]> wrote:
>>
>> interface CarrierSender<T> extends Carriable<T> {
>>   void finishSending();           // disable sending; close when empty
>>   // ...
>> }
>
> This rings true to me (but I’m not an expert in these APIs).
>
> Would there be a use case where the sender side of a connection
> is the subject of a try-with-resources, where the end of the block
> needs to issue a `finishSending` call?  In that case, the sender
> side (in isolation from the whole channel) might benefit from
> a view object whose close method is an alias for `finishSending`.


Yeah, I was going to ask something similar, about how we see common
usage of senders. I suspect that a lot (the majority?) of folk will
want orderly shutdown on the sender-side, so will invoke
`shutdownSending`, which first disables further sending, and secondly
closes when empty.

For scenarios where the receiver and sender do not have intimate or
direct knowledge of each other, then I'm not sure if it makes all that
much sense to use a try-with-resources on the sender side - unless it is
being used as an abort mechanism if orderly-close has not been completed
by the end of the try-with-resources block.

If we think that orderly-close will be more common that abortive-close,
then it could be made the default, and an explicit method added to
CarrierSender for abortive-close. This would more easily facilitate the
use within try-with-resources blocks - but of course the resource is not
really "freed", that depends on a well-behaved receiver. I don't think
that this would have any adverse negative affect on the receiver.

-Chris.

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: draft Carrier API

JSR166 Concurrency mailing list
In reply to this post by JSR166 Concurrency mailing list
I don't think so.

Count messages sent, and count messages returned by receive. When the count of receive reaches the number for the message sent using sendSynchronously, unblock the sendSynchronously.

It is useful as a barrier. 

Alex

On Wed, 11 Mar 2020, 08:23 Alexei Kaigorodov via Concurrency-interest, <[hidden email]> wrote:
The method sendSynchronously() requires to wrap each message in an envelope
which also contains a reference to sender, to pass ack signal. This is a
pure performance loss for (most frequent) cases where acknowledge is not
required. Besides, this method has little sense. It notificates sender that
the message has left the carrier, but did it reached the destination? There
can be other intermediate steps between the sender and the destination.
I propose to remove this method.



--
Sent from: http://jsr166-concurrency.10961.n7.nabble.com/
_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: draft Carrier API

JSR166 Concurrency mailing list
In reply to this post by JSR166 Concurrency mailing list
Hi,

We have a similar API at Kotlin [1] with both sending and receiving parts; happy to see a j.u.c is moving toward a similar API shape!

I have a few notes based on our experience with Kotlin Channel's API:

1) isFull method. We used to have it as well and eventually decided to deprecate it. We didn't find any compelling use-cases, its semantics is vague (e.g. is "TransferCarrier" always full?), and it is hard to make it linearizable with the rest of operations for buffered carriers.

2) receive throws ClosedException (that is IllegalStateException). To be somehow consistent with the Collections API, we've decided that attempt to receive from a closed carrier/channel is similar to 'remove' from an empty queue and thus should be NoSuchElementException. Send attempt to a closed channel, though, is still IllegalStateException. Maybe it is something worth considering here as well.

3) trySend. We have an analogue called offer, that also returns boolean, **but** may throw an exception if a channel was closed with an exception (aka "failure" rather than a normal close). In retrospect, this was not the best design decision: users do not expect method with a signature 'boolean offer(value)' to throw. We currently are thinking about migrating users to 'trySend' that never throws.
The question here is whether to return boolean or some value-type like "Try<Boolean>" or "Either<Boolean|Throwable>"  to avoid check-and-act misusages. Otherwise, it is impossible to deterministically know whether 'trySend' failed because the source was closed or because buffer was full.

4) Conflation. There are primitives (in Channels, Rx, etc.) that allow value conflation: if there is no place in a (usually, single-slot) buffer on send, the current value just gets overwritten.
I am not sure whether such API is in the scope of Carrier API, but if so, contracts around Carriable methods (capacity, estimatedSize, close etc.) require a really careful wording to make conflation fit here.

5) Send atomicity along with cancellation. If one sends a closeable resource via carrier, is there any guarantee on when it is safe to close a resource on the sending side (because a carrier is closed and there are no receivers) and when it is not? E.g. if 'send' throws ClosedException or CancellationException, is it safe to close an item in a catch block?
"upon timeout, the item is no longer available" on send with timeout really caught my eye here.


[1]  https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-channel/index.html
--
Best regards,
Tolstopyatov Vsevolod


On Fri, Mar 6, 2020 at 6:22 PM Doug Lea <[hidden email]> wrote:

[Cross-posting concurrency-interest and loom-dev.]

To continue improving java.util.concurrent support for increasingly
diverse programming styles (while still avoiding arguments about whether
any of them are best!), it would be helpful to provide "BlockingQueues
meet AutoCloseable" APIs that are loom-friendly, but not loom-specific.
A sketch is pasted below. To avoid mail-reader glitches, you might want
to read updated snapshots at gee.cs.oswego.edu/dl/wwwtmp/Carrier.java

Suggestions and comments are welcome. An initial implementation class
(LinkedCarrier) should be available shortly after API issues settle;
others later.

...

// API sketches, with "public" omitted throughout

/**
 * A component for sending and receiving messages. Carriers support
 * usages similar to those of BlockingQueues, but additionally
 * implement AutoCloseable, and may be explicitly closed for sending,
 * receiving, or both. Carriers also provide policy-based control for
 * responses to Thread.interrupt while blocked (ignoring, cancelling
 * the current operation only, or closing the carrier). Concrete
 * implementation classes may be created with a given capacity (after
 * which method send will block waiting for available space), or
 * effectively unbounded, in which case method send will never block
 * but may fail with an OutOfMemoryError.
 *
 * Design notes:
 *
 * (1) Both send and receive methods are declared here, but allowing
 * either side to be permanently (vs eventually) closed for send-only
 * or receive-only components. This loses some static type checking
 * opportunities of separate send and receive APIs. However the class
 * includes methods (in the style of Collections.unmodifiableX) to
 * produce views that provide dynamic directionality enforcement.
 *
 * (2) This is an abstract class (rather than interface) providing
 * uniform Observer-syle methods for Selectors and related
 * classes. The alternative is some sort of SPI.
 *
 * (3) To control interactions between Thread interrupts and state,
 * rather than throwing InterruptedExceptions, potentially blocking
 * methods rely on a provided policy to distinguish cancelling the
 * operation vs closing the carrier vs ignoring the interrupt. The
 * default is CANCEL, because it is the least constraining; for
 * example some mixed usages can catch CancellationException to then
 * close only when desired.
 *
 * (4) To broaden coverage of channel-based programming styles,
 * implementations support sendSynchronously, which is otherwise
 * available in BlockingQueues only as the poorly-named and underused
 * method LinkedTransferQueue.transfer.
 */
abstract class Carrier<T> implements AutoCloseable {
    Carrier(OnInterrupt policy);
    Carrier() { this(OnInterrupt.CANCEL); } // default

    // Basic messaging

    /**
     * Consume item, throw if isClosedForReceiving, block if empty.
     * May cancel or close on interrupt, depending on OnInterrupt policy.
     */
    T receive() throws ClosedException, CancellationException;

    /**
     * Send item, throw if isClosedForSending, block if full.
     * May cancel or close on interrupt, depending on OnInterrupt policy.
     */
    void send(T item) throws ClosedException, CancellationException;

    /** Send and block until item received */
    void sendSynchronously(T item) throws ClosedException,
CancellationException;

    // Timeout versions
    T receive(Duration timeout)
        throws ClosedException, CancellationException, TimeoutException;
    void send(T item, Duration timeout)
        throws ClosedException, CancellationException, TimeoutException;
    void sendSynchronously(T item, Duration timeout)
        throws ClosedException, CancellationException, TimeoutException;

    // Non-blocking access
    boolean trySend(T item);        // false if closed or full
    T tryReceive(T resultIfAbsent); // absent if closed or empty
    T peek(T resultIfAbsent);       // may false-positive

    // Termination
    void closeForSending();         // fully close when isClosedForReceiving
    void closeForReceiving();       // vice-versa
    void close();                   // immediate close
    void awaitClose() throws interruptedException;
    void onClose(Runnable closeHandler); // run by thread triggering close

    // Status
    boolean isClosedForSending();
    boolean isClosedForReceiving();
    boolean isClosed();             // true if both sides closed
    boolean isOpen()                { return !isClosed(); }
    boolean isEmpty();
    boolean isFull();               // never true if unbounded
    long    capacity();             // Long.MAX_VALUE if unbounded
    OnInterrupt interruptPolicy();  // return policy

    // linkage support, noops here; locators are opaque cookie-like
identifiers
    protected void registerSource(Carrier<? super T> c, long locator) {}
    // notification of send or close by registered carrier
    protected void sourceEvent(long locator, boolean isClosed) {}

    // views to disable one direction; similar to Collections.unmodifiableX
    static <E> Carrier<E> sendOnlyCarrier(Carrier<E> c);
    static <E> Carrier<E> receiveOnlyCarrier(Carrier<E> c);

    // other possible utilities
    Stream<T> stream();             // destructive (consume-on-traverse)
    static <E> Carrier<E> discardingCarrier(); // /dev/null analog
    // TBD: selector as static factory method vs class (as below)
    // TBD: Flow (reactive stream) adaptors
}

class LinkedCarrier<T> extends Carrier<T> {
    // main linked implementation
    // coming soon, based on LinkedTransferQueue algorithms
}

class BufferedCarrier<T> extends Carrier<T> {
    // main array-based implementation(s)
    // coming later, with single- vs multiple- sink/source options
}

/**
 * A Carrier that aggregates sources established in its constructor.
 * The receive method blocks waiting for any to become available, then
 * returns the corresponding item. Selectors are always closed for
 * sending, and may become fully closed when all sources close.
 */
class Selector<T> extends Carrier<T> { // possibly a more specific name
    Selector(<Carrier<? extends T> c, ...) {
        // for each c { c.registerSource(this, locatorFor(c)); }
    }
    boolean isClosedForSending() { return true; }
    // ...
}

/**
 * A policy for responding to Thread.interrupt in blocking methods in
 * classes implementing AutoCloseable
 */
static Enum OnInterrupt {
    IGNORE,  // continue waiting
    CANCEL,  // throw CancellationException
    CLOSE    // close and throw ClosedException
}

// This could be placed in java.lang for use with any AutoCloseable
class ClosedException extends IllegalStateException {
    ClosedException(AutoCloseable c); // the closed component
    // ...
}


_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Separate implementations of CarrierSender and CarrierReceiver, and firing mechanism

JSR166 Concurrency mailing list
In reply to this post by JSR166 Concurrency mailing list
If we make:
 - separate implementations of CarrierSender and CarrierReceiver,
 - mechanism that watches their state, and when both instances are ready
(!CarrierSender.isEmpty() && !CarrierReceiver.isFull()) and not closed,
fires a user-defined method (say, run()),

then the whole implementation of the Carrier becomes trivial:

class CarrierImpl<T> implements Carrier<T> {
    CarrierReceiver<T> receiver;
    CarrierSender <T> sender;

   public CarrierImpl() {
      // initialize sender and receiver
   }

   protected void run() {
     if (receiver.isClosed) {
        sender.close();
        return;
     }
     T item= receiver.receive();
     sender.send(item);
  }

// a lot of delegating methods like
   void send(T item) throws ClosedException, CancellationException {
       sender.send(item);
   }
}


(I omitted some subtle nuances like how to provide serial execution of the
run() method and avoid violations of the message order).

This implementation is not the most performant, but can be used as a basis
of numerous other use cases:

1) allow user to override method run() and create simple message processor.
2) let CarrierSender implement Flow.Publisher and CarrierReceiver implement
Flow.Subscriber, and we have basic implementation of Flow.Processor
3) extend firing mechanism so that it can watch arbitrary set of Carriables,
and we have a basis for  processors with arbitrary number of inputs and
outputs, including pure producers and pure consumers.
4) expose implementations of CarrierReceiver and CarrierSender to user and
allow user to extend that classes. Very quickly implementations of
org.reactivestreams.{Publisher, Subscriber} and
reactor.core.publisher.{Mono, Flux} appear, and JDK would become
interoperable with any existing and future synchronous and asynchronous
message-passing libraries.

The killer feature of this approach is the firing mechanism, which is easy
to implement.





--
Sent from: http://jsr166-concurrency.10961.n7.nabble.com/
_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
12