draft Carrier API

classic Classic list List threaded Threaded
28 messages Options
12
Reply | Threaded
Open this post in threaded view
|

draft Carrier API

JSR166 Concurrency mailing list

[Cross-posting concurrency-interest and loom-dev.]

To continue improving java.util.concurrent support for increasingly
diverse programming styles (while still avoiding arguments about whether
any of them are best!), it would be helpful to provide "BlockingQueues
meet AutoCloseable" APIs that are loom-friendly, but not loom-specific.
A sketch is pasted below. To avoid mail-reader glitches, you might want
to read updated snapshots at gee.cs.oswego.edu/dl/wwwtmp/Carrier.java

Suggestions and comments are welcome. An initial implementation class
(LinkedCarrier) should be available shortly after API issues settle;
others later.

...

// API sketches, with "public" omitted throughout

/**
 * A component for sending and receiving messages. Carriers support
 * usages similar to those of BlockingQueues, but additionally
 * implement AutoCloseable, and may be explicitly closed for sending,
 * receiving, or both. Carriers also provide policy-based control for
 * responses to Thread.interrupt while blocked (ignoring, cancelling
 * the current operation only, or closing the carrier). Concrete
 * implementation classes may be created with a given capacity (after
 * which method send will block waiting for available space), or
 * effectively unbounded, in which case method send will never block
 * but may fail with an OutOfMemoryError.
 *
 * Design notes:
 *
 * (1) Both send and receive methods are declared here, but allowing
 * either side to be permanently (vs eventually) closed for send-only
 * or receive-only components. This loses some static type checking
 * opportunities of separate send and receive APIs. However the class
 * includes methods (in the style of Collections.unmodifiableX) to
 * produce views that provide dynamic directionality enforcement.
 *
 * (2) This is an abstract class (rather than interface) providing
 * uniform Observer-syle methods for Selectors and related
 * classes. The alternative is some sort of SPI.
 *
 * (3) To control interactions between Thread interrupts and state,
 * rather than throwing InterruptedExceptions, potentially blocking
 * methods rely on a provided policy to distinguish cancelling the
 * operation vs closing the carrier vs ignoring the interrupt. The
 * default is CANCEL, because it is the least constraining; for
 * example some mixed usages can catch CancellationException to then
 * close only when desired.
 *
 * (4) To broaden coverage of channel-based programming styles,
 * implementations support sendSynchronously, which is otherwise
 * available in BlockingQueues only as the poorly-named and underused
 * method LinkedTransferQueue.transfer.
 */
abstract class Carrier<T> implements AutoCloseable {
    Carrier(OnInterrupt policy);
    Carrier() { this(OnInterrupt.CANCEL); } // default

    // Basic messaging

    /**
     * Consume item, throw if isClosedForReceiving, block if empty.
     * May cancel or close on interrupt, depending on OnInterrupt policy.
     */
    T receive() throws ClosedException, CancellationException;

    /**
     * Send item, throw if isClosedForSending, block if full.
     * May cancel or close on interrupt, depending on OnInterrupt policy.
     */
    void send(T item) throws ClosedException, CancellationException;

    /** Send and block until item received */
    void sendSynchronously(T item) throws ClosedException,
CancellationException;

    // Timeout versions
    T receive(Duration timeout)
        throws ClosedException, CancellationException, TimeoutException;
    void send(T item, Duration timeout)
        throws ClosedException, CancellationException, TimeoutException;
    void sendSynchronously(T item, Duration timeout)
        throws ClosedException, CancellationException, TimeoutException;

    // Non-blocking access
    boolean trySend(T item);        // false if closed or full
    T tryReceive(T resultIfAbsent); // absent if closed or empty
    T peek(T resultIfAbsent);       // may false-positive

    // Termination
    void closeForSending();         // fully close when isClosedForReceiving
    void closeForReceiving();       // vice-versa
    void close();                   // immediate close
    void awaitClose() throws interruptedException;
    void onClose(Runnable closeHandler); // run by thread triggering close

    // Status
    boolean isClosedForSending();
    boolean isClosedForReceiving();
    boolean isClosed();             // true if both sides closed
    boolean isOpen()                { return !isClosed(); }
    boolean isEmpty();
    boolean isFull();               // never true if unbounded
    long    capacity();             // Long.MAX_VALUE if unbounded
    OnInterrupt interruptPolicy();  // return policy

    // linkage support, noops here; locators are opaque cookie-like
identifiers
    protected void registerSource(Carrier<? super T> c, long locator) {}
    // notification of send or close by registered carrier
    protected void sourceEvent(long locator, boolean isClosed) {}

    // views to disable one direction; similar to Collections.unmodifiableX
    static <E> Carrier<E> sendOnlyCarrier(Carrier<E> c);
    static <E> Carrier<E> receiveOnlyCarrier(Carrier<E> c);

    // other possible utilities
    Stream<T> stream();             // destructive (consume-on-traverse)
    static <E> Carrier<E> discardingCarrier(); // /dev/null analog
    // TBD: selector as static factory method vs class (as below)
    // TBD: Flow (reactive stream) adaptors
}

class LinkedCarrier<T> extends Carrier<T> {
    // main linked implementation
    // coming soon, based on LinkedTransferQueue algorithms
}

class BufferedCarrier<T> extends Carrier<T> {
    // main array-based implementation(s)
    // coming later, with single- vs multiple- sink/source options
}

/**
 * A Carrier that aggregates sources established in its constructor.
 * The receive method blocks waiting for any to become available, then
 * returns the corresponding item. Selectors are always closed for
 * sending, and may become fully closed when all sources close.
 */
class Selector<T> extends Carrier<T> { // possibly a more specific name
    Selector(<Carrier<? extends T> c, ...) {
        // for each c { c.registerSource(this, locatorFor(c)); }
    }
    boolean isClosedForSending() { return true; }
    // ...
}

/**
 * A policy for responding to Thread.interrupt in blocking methods in
 * classes implementing AutoCloseable
 */
static Enum OnInterrupt {
    IGNORE,  // continue waiting
    CANCEL,  // throw CancellationException
    CLOSE    // close and throw ClosedException
}

// This could be placed in java.lang for use with any AutoCloseable
class ClosedException extends IllegalStateException {
    ClosedException(AutoCloseable c); // the closed component
    // ...
}

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: draft Carrier API

JSR166 Concurrency mailing list
Hi. I have a few comments:

onClose(Runnable): will this allow one handler at most, many handler, or calling multiple times will replace the handler and run the old one? Calling onClose the first time after the carrier was closed runs the Runnable immediately, right?

tryReceive(T def): always somewhat trouble to expect a default value of T, perhaps return Optional<T> if T is planned to be non-null.

stream(): I presume closing the stream closes the carrier

+ receive(Consumer): boolean: could allow receiving without making a default T

+ closeExceptionally(Throwable) : perhaps for both sides? Certainly, one can send a record of T+Throwable one way, but not the other.

+ onSenderReady(Runnable r): would allow a non-blocking consumer to react to items or sender-side close.
+ onReceiverReady(Runnable r): would allow a non-blocking producer to react to buffer slots becoming available or receiver-side close.

+ receiveAsPublisher(Executor): 
  ~ probably should only allow one subscriber and rely on external multicasting
  ~ without onSenderReady, it has to rely on blocking and thus run on a suspendable thread

+ sendAsSubscriber(Executor)
  ~ a full buffer requires suspending so that when a slot becomes available, the upstream can now to send more items
  ~ without onReceiverReady and blocking on send, I'm not sure how to link cancel to a receiver-side close - depends on how onClose allows registering multiple runnables
  ~ unbounded capacity <-> unbounded request?



Doug Lea via Concurrency-interest <[hidden email]> ezt írta (időpont: 2020. márc. 6., P, 16:22):

[Cross-posting concurrency-interest and loom-dev.]

To continue improving java.util.concurrent support for increasingly
diverse programming styles (while still avoiding arguments about whether
any of them are best!), it would be helpful to provide "BlockingQueues
meet AutoCloseable" APIs that are loom-friendly, but not loom-specific.
A sketch is pasted below. To avoid mail-reader glitches, you might want
to read updated snapshots at gee.cs.oswego.edu/dl/wwwtmp/Carrier.java

Suggestions and comments are welcome. An initial implementation class
(LinkedCarrier) should be available shortly after API issues settle;
others later.

...

// API sketches, with "public" omitted throughout

/**
 * A component for sending and receiving messages. Carriers support
 * usages similar to those of BlockingQueues, but additionally
 * implement AutoCloseable, and may be explicitly closed for sending,
 * receiving, or both. Carriers also provide policy-based control for
 * responses to Thread.interrupt while blocked (ignoring, cancelling
 * the current operation only, or closing the carrier). Concrete
 * implementation classes may be created with a given capacity (after
 * which method send will block waiting for available space), or
 * effectively unbounded, in which case method send will never block
 * but may fail with an OutOfMemoryError.
 *
 * Design notes:
 *
 * (1) Both send and receive methods are declared here, but allowing
 * either side to be permanently (vs eventually) closed for send-only
 * or receive-only components. This loses some static type checking
 * opportunities of separate send and receive APIs. However the class
 * includes methods (in the style of Collections.unmodifiableX) to
 * produce views that provide dynamic directionality enforcement.
 *
 * (2) This is an abstract class (rather than interface) providing
 * uniform Observer-syle methods for Selectors and related
 * classes. The alternative is some sort of SPI.
 *
 * (3) To control interactions between Thread interrupts and state,
 * rather than throwing InterruptedExceptions, potentially blocking
 * methods rely on a provided policy to distinguish cancelling the
 * operation vs closing the carrier vs ignoring the interrupt. The
 * default is CANCEL, because it is the least constraining; for
 * example some mixed usages can catch CancellationException to then
 * close only when desired.
 *
 * (4) To broaden coverage of channel-based programming styles,
 * implementations support sendSynchronously, which is otherwise
 * available in BlockingQueues only as the poorly-named and underused
 * method LinkedTransferQueue.transfer.
 */
abstract class Carrier<T> implements AutoCloseable {
    Carrier(OnInterrupt policy);
    Carrier() { this(OnInterrupt.CANCEL); } // default

    // Basic messaging

    /**
     * Consume item, throw if isClosedForReceiving, block if empty.
     * May cancel or close on interrupt, depending on OnInterrupt policy.
     */
    T receive() throws ClosedException, CancellationException;

    /**
     * Send item, throw if isClosedForSending, block if full.
     * May cancel or close on interrupt, depending on OnInterrupt policy.
     */
    void send(T item) throws ClosedException, CancellationException;

    /** Send and block until item received */
    void sendSynchronously(T item) throws ClosedException,
CancellationException;

    // Timeout versions
    T receive(Duration timeout)
        throws ClosedException, CancellationException, TimeoutException;
    void send(T item, Duration timeout)
        throws ClosedException, CancellationException, TimeoutException;
    void sendSynchronously(T item, Duration timeout)
        throws ClosedException, CancellationException, TimeoutException;

    // Non-blocking access
    boolean trySend(T item);        // false if closed or full
    T tryReceive(T resultIfAbsent); // absent if closed or empty
    T peek(T resultIfAbsent);       // may false-positive

    // Termination
    void closeForSending();         // fully close when isClosedForReceiving
    void closeForReceiving();       // vice-versa
    void close();                   // immediate close
    void awaitClose() throws interruptedException;
    void onClose(Runnable closeHandler); // run by thread triggering close

    // Status
    boolean isClosedForSending();
    boolean isClosedForReceiving();
    boolean isClosed();             // true if both sides closed
    boolean isOpen()                { return !isClosed(); }
    boolean isEmpty();
    boolean isFull();               // never true if unbounded
    long    capacity();             // Long.MAX_VALUE if unbounded
    OnInterrupt interruptPolicy();  // return policy

    // linkage support, noops here; locators are opaque cookie-like
identifiers
    protected void registerSource(Carrier<? super T> c, long locator) {}
    // notification of send or close by registered carrier
    protected void sourceEvent(long locator, boolean isClosed) {}

    // views to disable one direction; similar to Collections.unmodifiableX
    static <E> Carrier<E> sendOnlyCarrier(Carrier<E> c);
    static <E> Carrier<E> receiveOnlyCarrier(Carrier<E> c);

    // other possible utilities
    Stream<T> stream();             // destructive (consume-on-traverse)
    static <E> Carrier<E> discardingCarrier(); // /dev/null analog
    // TBD: selector as static factory method vs class (as below)
    // TBD: Flow (reactive stream) adaptors
}

class LinkedCarrier<T> extends Carrier<T> {
    // main linked implementation
    // coming soon, based on LinkedTransferQueue algorithms
}

class BufferedCarrier<T> extends Carrier<T> {
    // main array-based implementation(s)
    // coming later, with single- vs multiple- sink/source options
}

/**
 * A Carrier that aggregates sources established in its constructor.
 * The receive method blocks waiting for any to become available, then
 * returns the corresponding item. Selectors are always closed for
 * sending, and may become fully closed when all sources close.
 */
class Selector<T> extends Carrier<T> { // possibly a more specific name
    Selector(<Carrier<? extends T> c, ...) {
        // for each c { c.registerSource(this, locatorFor(c)); }
    }
    boolean isClosedForSending() { return true; }
    // ...
}

/**
 * A policy for responding to Thread.interrupt in blocking methods in
 * classes implementing AutoCloseable
 */
static Enum OnInterrupt {
    IGNORE,  // continue waiting
    CANCEL,  // throw CancellationException
    CLOSE    // close and throw ClosedException
}

// This could be placed in java.lang for use with any AutoCloseable
class ClosedException extends IllegalStateException {
    ClosedException(AutoCloseable c); // the closed component
    // ...
}

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest


--
Best regards,
David Karnok

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: draft Carrier API

JSR166 Concurrency mailing list
In reply to this post by JSR166 Concurrency mailing list
Can methods return something more useful than void? Eg something that can be used to test progress? (I am afraid my imagination is limited to returning a ticket number, and a sequencer API to inspect whether send / close has a matching receive / await for such an event) 

SendSynchronously with timeout is ambiguous. If timeout occurs, was it placed in the buffer, and not received yet, or not even buffered?

Alex

On Fri, 6 Mar 2020, 15:22 Doug Lea, <[hidden email]> wrote:

[Cross-posting concurrency-interest and loom-dev.]

To continue improving java.util.concurrent support for increasingly
diverse programming styles (while still avoiding arguments about whether
any of them are best!), it would be helpful to provide "BlockingQueues
meet AutoCloseable" APIs that are loom-friendly, but not loom-specific.
A sketch is pasted below. To avoid mail-reader glitches, you might want
to read updated snapshots at gee.cs.oswego.edu/dl/wwwtmp/Carrier.java

Suggestions and comments are welcome. An initial implementation class
(LinkedCarrier) should be available shortly after API issues settle;
others later.

...

// API sketches, with "public" omitted throughout

/**
 * A component for sending and receiving messages. Carriers support
 * usages similar to those of BlockingQueues, but additionally
 * implement AutoCloseable, and may be explicitly closed for sending,
 * receiving, or both. Carriers also provide policy-based control for
 * responses to Thread.interrupt while blocked (ignoring, cancelling
 * the current operation only, or closing the carrier). Concrete
 * implementation classes may be created with a given capacity (after
 * which method send will block waiting for available space), or
 * effectively unbounded, in which case method send will never block
 * but may fail with an OutOfMemoryError.
 *
 * Design notes:
 *
 * (1) Both send and receive methods are declared here, but allowing
 * either side to be permanently (vs eventually) closed for send-only
 * or receive-only components. This loses some static type checking
 * opportunities of separate send and receive APIs. However the class
 * includes methods (in the style of Collections.unmodifiableX) to
 * produce views that provide dynamic directionality enforcement.
 *
 * (2) This is an abstract class (rather than interface) providing
 * uniform Observer-syle methods for Selectors and related
 * classes. The alternative is some sort of SPI.
 *
 * (3) To control interactions between Thread interrupts and state,
 * rather than throwing InterruptedExceptions, potentially blocking
 * methods rely on a provided policy to distinguish cancelling the
 * operation vs closing the carrier vs ignoring the interrupt. The
 * default is CANCEL, because it is the least constraining; for
 * example some mixed usages can catch CancellationException to then
 * close only when desired.
 *
 * (4) To broaden coverage of channel-based programming styles,
 * implementations support sendSynchronously, which is otherwise
 * available in BlockingQueues only as the poorly-named and underused
 * method LinkedTransferQueue.transfer.
 */
abstract class Carrier<T> implements AutoCloseable {
    Carrier(OnInterrupt policy);
    Carrier() { this(OnInterrupt.CANCEL); } // default

    // Basic messaging

    /**
     * Consume item, throw if isClosedForReceiving, block if empty.
     * May cancel or close on interrupt, depending on OnInterrupt policy.
     */
    T receive() throws ClosedException, CancellationException;

    /**
     * Send item, throw if isClosedForSending, block if full.
     * May cancel or close on interrupt, depending on OnInterrupt policy.
     */
    void send(T item) throws ClosedException, CancellationException;

    /** Send and block until item received */
    void sendSynchronously(T item) throws ClosedException,
CancellationException;

    // Timeout versions
    T receive(Duration timeout)
        throws ClosedException, CancellationException, TimeoutException;
    void send(T item, Duration timeout)
        throws ClosedException, CancellationException, TimeoutException;
    void sendSynchronously(T item, Duration timeout)
        throws ClosedException, CancellationException, TimeoutException;

    // Non-blocking access
    boolean trySend(T item);        // false if closed or full
    T tryReceive(T resultIfAbsent); // absent if closed or empty
    T peek(T resultIfAbsent);       // may false-positive

    // Termination
    void closeForSending();         // fully close when isClosedForReceiving
    void closeForReceiving();       // vice-versa
    void close();                   // immediate close
    void awaitClose() throws interruptedException;
    void onClose(Runnable closeHandler); // run by thread triggering close

    // Status
    boolean isClosedForSending();
    boolean isClosedForReceiving();
    boolean isClosed();             // true if both sides closed
    boolean isOpen()                { return !isClosed(); }
    boolean isEmpty();
    boolean isFull();               // never true if unbounded
    long    capacity();             // Long.MAX_VALUE if unbounded
    OnInterrupt interruptPolicy();  // return policy

    // linkage support, noops here; locators are opaque cookie-like
identifiers
    protected void registerSource(Carrier<? super T> c, long locator) {}
    // notification of send or close by registered carrier
    protected void sourceEvent(long locator, boolean isClosed) {}

    // views to disable one direction; similar to Collections.unmodifiableX
    static <E> Carrier<E> sendOnlyCarrier(Carrier<E> c);
    static <E> Carrier<E> receiveOnlyCarrier(Carrier<E> c);

    // other possible utilities
    Stream<T> stream();             // destructive (consume-on-traverse)
    static <E> Carrier<E> discardingCarrier(); // /dev/null analog
    // TBD: selector as static factory method vs class (as below)
    // TBD: Flow (reactive stream) adaptors
}

class LinkedCarrier<T> extends Carrier<T> {
    // main linked implementation
    // coming soon, based on LinkedTransferQueue algorithms
}

class BufferedCarrier<T> extends Carrier<T> {
    // main array-based implementation(s)
    // coming later, with single- vs multiple- sink/source options
}

/**
 * A Carrier that aggregates sources established in its constructor.
 * The receive method blocks waiting for any to become available, then
 * returns the corresponding item. Selectors are always closed for
 * sending, and may become fully closed when all sources close.
 */
class Selector<T> extends Carrier<T> { // possibly a more specific name
    Selector(<Carrier<? extends T> c, ...) {
        // for each c { c.registerSource(this, locatorFor(c)); }
    }
    boolean isClosedForSending() { return true; }
    // ...
}

/**
 * A policy for responding to Thread.interrupt in blocking methods in
 * classes implementing AutoCloseable
 */
static Enum OnInterrupt {
    IGNORE,  // continue waiting
    CANCEL,  // throw CancellationException
    CLOSE    // close and throw ClosedException
}

// This could be placed in java.lang for use with any AutoCloseable
class ClosedException extends IllegalStateException {
    ClosedException(AutoCloseable c); // the closed component
    // ...
}


_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: draft Carrier API

JSR166 Concurrency mailing list
In reply to this post by JSR166 Concurrency mailing list

Collecting replies/responses, with updated sketches pasted below and at
http://gee.cs.oswego.edu/dl/wwwtmp/Carrier.java

On 3/6/20 3:56 PM, John Rose wrote:

> This design puts both endpoints on one type, as opposed to two
> similar types, like InputStream and OutputStream. This leads to
> fewer types and objects (good) but broader ones. Broader is little
> less good, since most use points only care about 1/2 of the methods;
> the other 1/2 is then noise.

The main API design issue here is that there are three (not two) views
of a Carrier: the protocol state (closed, empty, etc), sender-side, and
 receiver-side operations. If you split them, you need at least four
interfaces/classes total (one to combine them). Doing this in a way that
does not result in nearly all usages needing the combined interface
requires other tradeoffs (like declaring some bookkeeping methods public
and breaking read-only-ness of base interface). But given the reaction
so far, I'm back to thinking this can be done in a way that more people
will prefer. (Aside: I've waffled on this many many times, including
pre-j.u.c
http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html
, and several previous Carrier drafts.)

So, back to a refreshed 4-interface version.

On 3/6/20 1:04 PM, Dávid Karnok wrote:

> onClose(Runnable): will this allow one handler ...

Thanks for the prod. It is much better to define:
  /** Returns a CompletableFuture that isDone when closed. */
  CompletionStage<Carriable<T>> onClose();
In which case these questions and others are already answered, and we
can also omit awaitClose method.

>
> tryReceive(T def): always somewhat trouble to expect a default value
> of T, perhaps return Optional<T> if T is planned to be non-null.

I think best to keep this, but also add a variant of your other suggestion:
  boolean tryConsume(Consumer<? super T> proc); // false if closed or empty

> + closeExceptionally(Throwable) :

Yes, thanks; for the same reasons we added to SubmissionPublisher. Also
adding getClosedException() method.

> + onSenderReady(Runnable r) receiveAsPublisher(Executor) sendAsSubscriber(Executor)

... among other possibilities. I'm leaving interplay with Flow as TBD
for now, in part because...

On 3/6/20 5:14 PM, Thomas May wrote:

> It also could introduce interesting patterns like multiplexing… (IE,
> having multiple receivers getting the same message)

We already have a good multicaster, SubmissionPublisher. But I'm still
not sure of the best linkages.

On 3/6/20 6:06 PM, Alex Otenko wrote:

> Can methods return something more useful than void? Eg something
> that can be used to test progress? (I am afraid my imagination is
> limited to returning a ticket number, and a sequencer API to inspect
> whether send / close has a matching receive / await for such an
> event)

I can't think of enough use cases to justified added cost. Can you?

>
> SendSynchronously with timeout is ambiguous. If timeout occurs, was
> it placed in the buffer, and not received yet, or not even buffered?
>

The only thing you know is that upon exception, the item cannot have
been (and never will be) received. This is no different than other
methods. (although you are right that bounded+synchronous+timeout is
the most complicated to implement.)

... pasting updated draft ...


// snapshot: Sat Mar  7 11:13:22 2020  Doug Lea  (dl at nuc40)

// API sketches, with "public" omitted throughout

/**
 * A component for sending and receiving data. Carriers support
 * usages similar to those of BlockingQueues, but additionally
 * implement AutoCloseable, and may be explicitly closed for sending,
 * receiving, or both.
 *
 * This interface combines three sets of methods, defined in three
 * interfaces: Carriable methods access protocol state and
 * configuration. Interfaces CarrierSender and CarrierReceiver extend
 * Carriable with sender- and receiver- side views. Finally, this
 * interface combines these views.
 *
 * To control interactions between Thread interrupts and state, rather
 * than throwing InterruptedExceptions, potentially blocking methods
 * rely on a provided policy to distinguish cancelling the operation
 * vs closing the carrier vs ignoring the interrupt. The default for
 * current implementations is CANCEL, because it is the least
 * constraining; for example some mixed usages can catch
 * CancellationException to then close only when desired.
 *
 * Concrete implementation classes may enforce a given capacity (after
 * which method send will block waiting for available space), or be
 * effectively unbounded, in which case method send will never block
 * but may fail with an OutOfMemoryError.
 */
interface Carrier<T> extends CarrierSender<T>, CarrierReceiver<T> {
    // TBD: factory methods for jdk implementations
    // some utility methods, such as...
    static <E> CarrierReceiver<E> discardingCarrier(); // /dev/null analog
    // TBD: Flow (reactive stream) adaptors
}

/**
 * Methods accessing the protocol state and configuration of a
 * Carrier.
 */
interface Carriable<T> extends AutoCloseable {
    boolean isClosedForSending();
    boolean isClosedForReceiving();
    boolean isClosed();             // true if both sides closed
    boolean isOpen();               // { return !isClosed(); }
    boolean isEmpty();
    boolean isFull();               // never true if unbounded
    long    capacity();             // Long.MAX_VALUE if unbounded
    OnInterrupt interruptPolicy();  // return policy

    void close();                   // immediate close both sides
    void closeExceptionally(Throwable cause); // record as cause
    Throwable getClosedException();

    /** Returns a CompletableFuture that isDone when closed. */
    CompletionStage<Carriable<T>> onClose();
}

/**
 * Methods defining the sender-side view of a Carrier.
 */
interface CarrierSender<T> extends Carriable<T> {
    /**
     * Send item, throw if isClosedForSending, block if full.
     * May cancel or close on interrupt, depending on OnInterrupt policy.
     */
    void send(T item) throws ClosedException, CancellationException;

    /** Send and block until item received */
    void sendSynchronously(T item) throws ClosedException,
CancellationException;

    /** Try to send, upon timeout, the item is no longer available. */
    void send(T item, Duration timeout)
        throws ClosedException, CancellationException, TimeoutException;
    void sendSynchronously(T item, Duration timeout)
        throws ClosedException, CancellationException, TimeoutException;

    boolean trySend(T item);        // false if closed or full
    void closeForSending();         // fully close when isClosedForReceiving

    // linkage support; locators are opaque cookie-like identifiers
    void registerSource(CarrierSender<? super T> c, long locator);
}

/**
 * Methods defining the receiver-side view of a Carrier.
 */
interface CarrierReceiver<T> extends Carriable<T> {
    /**
     * Consume item, throw if isClosedForReceiving, block if empty.
     * May cancel or close on interrupt, depending on OnInterrupt policy.
     */
    T receive() throws ClosedException, CancellationException;
    T receive(Duration timeout)
        throws ClosedException, CancellationException, TimeoutException;

    T tryReceive(T resultIfAbsent); // absent if closed or empty
    T peek(T resultIfAbsent);       // may false-positive

    void closeForReceiving();       // fully close when isClosedForSending

    boolean tryConsume(Consumer<? super T> proc); // false if closed or
empty
    Stream<T> stream();             // destructive (consume-on-traverse)

    // notification of send or close by registered source
    void sourceSent(CarrierSender<? extends T> source, long locator, T
item);
    void sourceClosed(CarrierSender<? extends T> source, long locator);
}

// TBD: provide abstract class AbstractCarrier<T>.

class LinkedCarrier<T> implements Carrier<T> {
    LinkedCarrier(OnInterrupt policy);
    LinkedCarrier() { this(OnInterrupt.CANCEL); } // default
    // main linked implementation
    // coming soon, based on LinkedTransferQueue algorithms
}

class BufferedCarrier<T> implemnts Carrier<T> {
    // main array-based implementation(s)
    // coming later, with single- vs multiple- sink/source options
}

/**
 * A Carrier that aggregates sources established in its constructor.
 * The receive method blocks waiting for any to become available, then
 * returns the corresponding item.
 */
class CarrierSelector<T> implements CarrierReceiver<T> {
    Selector(<CarrierSender<? extends T> c, ...) {
        // for each c { c.registerSource(this, locatorFor(c)); }
    }
}

/**
 * A policy for responding to Thread.interrupt in blocking methods in
 * classes implementing AutoCloseable
 */
static Enum OnInterrupt {
    IGNORE,  // continue waiting
    CANCEL,  // throw CancellationException
    CLOSE    // close and throw ClosedException
}

// This could be placed in java.lang for use with any AutoCloseable
class ClosedException extends IllegalStateException {
    ClosedException(AutoCloseable c); // the closed component
    // copied from ExecutionException:
    /**
     * Constructs an {@code ClosedException} with the specified cause.
     * The detail message is set to {@code (cause == null ? null :
     * cause.toString())} (which typically contains the class and
     * detail message of {@code cause}).
     *
     * @param  cause the cause (which is saved for later retrieval by the
     *         {@link #getCause()} method)
     */
    public ClosedException(Throwable cause) {
        super(cause);
    }
    // ...
}




_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: draft Carrier API

JSR166 Concurrency mailing list
enough use cases to justified added cost

Most primitives we have are really monolithic. You get only two ways of interacting with them: try or get blocked indefinitely.

A ticket system allows to build nicer things, because essentially you give away a bit of control what to do during  the wait. You can switch between try* and blocking after you made the call, choosing when the time to block is more suitable.

Alex

On Sat, 7 Mar 2020, 16:30 Doug Lea, <[hidden email]> wrote:

Collecting replies/responses, with updated sketches pasted below and at
http://gee.cs.oswego.edu/dl/wwwtmp/Carrier.java

On 3/6/20 3:56 PM, John Rose wrote:

> This design puts both endpoints on one type, as opposed to two
> similar types, like InputStream and OutputStream. This leads to
> fewer types and objects (good) but broader ones. Broader is little
> less good, since most use points only care about 1/2 of the methods;
> the other 1/2 is then noise.

The main API design issue here is that there are three (not two) views
of a Carrier: the protocol state (closed, empty, etc), sender-side, and
 receiver-side operations. If you split them, you need at least four
interfaces/classes total (one to combine them). Doing this in a way that
does not result in nearly all usages needing the combined interface
requires other tradeoffs (like declaring some bookkeeping methods public
and breaking read-only-ness of base interface). But given the reaction
so far, I'm back to thinking this can be done in a way that more people
will prefer. (Aside: I've waffled on this many many times, including
pre-j.u.c
http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html
, and several previous Carrier drafts.)

So, back to a refreshed 4-interface version.

On 3/6/20 1:04 PM, Dávid Karnok wrote:

> onClose(Runnable): will this allow one handler ...

Thanks for the prod. It is much better to define:
  /** Returns a CompletableFuture that isDone when closed. */
  CompletionStage<Carriable<T>> onClose();
In which case these questions and others are already answered, and we
can also omit awaitClose method.

>
> tryReceive(T def): always somewhat trouble to expect a default value
> of T, perhaps return Optional<T> if T is planned to be non-null.

I think best to keep this, but also add a variant of your other suggestion:
  boolean tryConsume(Consumer<? super T> proc); // false if closed or empty

> + closeExceptionally(Throwable) :

Yes, thanks; for the same reasons we added to SubmissionPublisher. Also
adding getClosedException() method.

> + onSenderReady(Runnable r) receiveAsPublisher(Executor) sendAsSubscriber(Executor)

... among other possibilities. I'm leaving interplay with Flow as TBD
for now, in part because...

On 3/6/20 5:14 PM, Thomas May wrote:

> It also could introduce interesting patterns like multiplexing… (IE,
> having multiple receivers getting the same message)

We already have a good multicaster, SubmissionPublisher. But I'm still
not sure of the best linkages.

On 3/6/20 6:06 PM, Alex Otenko wrote:

> Can methods return something more useful than void? Eg something
> that can be used to test progress? (I am afraid my imagination is
> limited to returning a ticket number, and a sequencer API to inspect
> whether send / close has a matching receive / await for such an
> event)

I can't think of enough use cases to justified added cost. Can you?

>
> SendSynchronously with timeout is ambiguous. If timeout occurs, was
> it placed in the buffer, and not received yet, or not even buffered?
>

The only thing you know is that upon exception, the item cannot have
been (and never will be) received. This is no different than other
methods. (although you are right that bounded+synchronous+timeout is
the most complicated to implement.)

... pasting updated draft ...


// snapshot: Sat Mar  7 11:13:22 2020  Doug Lea  (dl at nuc40)

// API sketches, with "public" omitted throughout

/**
 * A component for sending and receiving data. Carriers support
 * usages similar to those of BlockingQueues, but additionally
 * implement AutoCloseable, and may be explicitly closed for sending,
 * receiving, or both.
 *
 * This interface combines three sets of methods, defined in three
 * interfaces: Carriable methods access protocol state and
 * configuration. Interfaces CarrierSender and CarrierReceiver extend
 * Carriable with sender- and receiver- side views. Finally, this
 * interface combines these views.
 *
 * To control interactions between Thread interrupts and state, rather
 * than throwing InterruptedExceptions, potentially blocking methods
 * rely on a provided policy to distinguish cancelling the operation
 * vs closing the carrier vs ignoring the interrupt. The default for
 * current implementations is CANCEL, because it is the least
 * constraining; for example some mixed usages can catch
 * CancellationException to then close only when desired.
 *
 * Concrete implementation classes may enforce a given capacity (after
 * which method send will block waiting for available space), or be
 * effectively unbounded, in which case method send will never block
 * but may fail with an OutOfMemoryError.
 */
interface Carrier<T> extends CarrierSender<T>, CarrierReceiver<T> {
    // TBD: factory methods for jdk implementations
    // some utility methods, such as...
    static <E> CarrierReceiver<E> discardingCarrier(); // /dev/null analog
    // TBD: Flow (reactive stream) adaptors
}

/**
 * Methods accessing the protocol state and configuration of a
 * Carrier.
 */
interface Carriable<T> extends AutoCloseable {
    boolean isClosedForSending();
    boolean isClosedForReceiving();
    boolean isClosed();             // true if both sides closed
    boolean isOpen();               // { return !isClosed(); }
    boolean isEmpty();
    boolean isFull();               // never true if unbounded
    long    capacity();             // Long.MAX_VALUE if unbounded
    OnInterrupt interruptPolicy();  // return policy

    void close();                   // immediate close both sides
    void closeExceptionally(Throwable cause); // record as cause
    Throwable getClosedException();

    /** Returns a CompletableFuture that isDone when closed. */
    CompletionStage<Carriable<T>> onClose();
}

/**
 * Methods defining the sender-side view of a Carrier.
 */
interface CarrierSender<T> extends Carriable<T> {
    /**
     * Send item, throw if isClosedForSending, block if full.
     * May cancel or close on interrupt, depending on OnInterrupt policy.
     */
    void send(T item) throws ClosedException, CancellationException;

    /** Send and block until item received */
    void sendSynchronously(T item) throws ClosedException,
CancellationException;

    /** Try to send, upon timeout, the item is no longer available. */
    void send(T item, Duration timeout)
        throws ClosedException, CancellationException, TimeoutException;
    void sendSynchronously(T item, Duration timeout)
        throws ClosedException, CancellationException, TimeoutException;

    boolean trySend(T item);        // false if closed or full
    void closeForSending();         // fully close when isClosedForReceiving

    // linkage support; locators are opaque cookie-like identifiers
    void registerSource(CarrierSender<? super T> c, long locator);
}

/**
 * Methods defining the receiver-side view of a Carrier.
 */
interface CarrierReceiver<T> extends Carriable<T> {
    /**
     * Consume item, throw if isClosedForReceiving, block if empty.
     * May cancel or close on interrupt, depending on OnInterrupt policy.
     */
    T receive() throws ClosedException, CancellationException;
    T receive(Duration timeout)
        throws ClosedException, CancellationException, TimeoutException;

    T tryReceive(T resultIfAbsent); // absent if closed or empty
    T peek(T resultIfAbsent);       // may false-positive

    void closeForReceiving();       // fully close when isClosedForSending

    boolean tryConsume(Consumer<? super T> proc); // false if closed or
empty
    Stream<T> stream();             // destructive (consume-on-traverse)

    // notification of send or close by registered source
    void sourceSent(CarrierSender<? extends T> source, long locator, T
item);
    void sourceClosed(CarrierSender<? extends T> source, long locator);
}

// TBD: provide abstract class AbstractCarrier<T>.

class LinkedCarrier<T> implements Carrier<T> {
    LinkedCarrier(OnInterrupt policy);
    LinkedCarrier() { this(OnInterrupt.CANCEL); } // default
    // main linked implementation
    // coming soon, based on LinkedTransferQueue algorithms
}

class BufferedCarrier<T> implemnts Carrier<T> {
    // main array-based implementation(s)
    // coming later, with single- vs multiple- sink/source options
}

/**
 * A Carrier that aggregates sources established in its constructor.
 * The receive method blocks waiting for any to become available, then
 * returns the corresponding item.
 */
class CarrierSelector<T> implements CarrierReceiver<T> {
    Selector(<CarrierSender<? extends T> c, ...) {
        // for each c { c.registerSource(this, locatorFor(c)); }
    }
}

/**
 * A policy for responding to Thread.interrupt in blocking methods in
 * classes implementing AutoCloseable
 */
static Enum OnInterrupt {
    IGNORE,  // continue waiting
    CANCEL,  // throw CancellationException
    CLOSE    // close and throw ClosedException
}

// This could be placed in java.lang for use with any AutoCloseable
class ClosedException extends IllegalStateException {
    ClosedException(AutoCloseable c); // the closed component
    // copied from ExecutionException:
    /**
     * Constructs an {@code ClosedException} with the specified cause.
     * The detail message is set to {@code (cause == null ? null :
     * cause.toString())} (which typically contains the class and
     * detail message of {@code cause}).
     *
     * @param  cause the cause (which is saved for later retrieval by the
     *         {@link #getCause()} method)
     */
    public ClosedException(Throwable cause) {
        super(cause);
    }
    // ...
}





_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: draft Carrier API

JSR166 Concurrency mailing list
In reply to this post by JSR166 Concurrency mailing list
Send is declared to throw when is closed for sending. Is there a good reason to not throw when is closed for receiving? Or what is the intended behavior in this case, given that it may block if full?

Alex

On Sat, 7 Mar 2020, 16:30 Doug Lea, <[hidden email]> wrote:

Collecting replies/responses, with updated sketches pasted below and at
http://gee.cs.oswego.edu/dl/wwwtmp/Carrier.java

On 3/6/20 3:56 PM, John Rose wrote:

> This design puts both endpoints on one type, as opposed to two
> similar types, like InputStream and OutputStream. This leads to
> fewer types and objects (good) but broader ones. Broader is little
> less good, since most use points only care about 1/2 of the methods;
> the other 1/2 is then noise.

The main API design issue here is that there are three (not two) views
of a Carrier: the protocol state (closed, empty, etc), sender-side, and
 receiver-side operations. If you split them, you need at least four
interfaces/classes total (one to combine them). Doing this in a way that
does not result in nearly all usages needing the combined interface
requires other tradeoffs (like declaring some bookkeeping methods public
and breaking read-only-ness of base interface). But given the reaction
so far, I'm back to thinking this can be done in a way that more people
will prefer. (Aside: I've waffled on this many many times, including
pre-j.u.c
http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html
, and several previous Carrier drafts.)

So, back to a refreshed 4-interface version.

On 3/6/20 1:04 PM, Dávid Karnok wrote:

> onClose(Runnable): will this allow one handler ...

Thanks for the prod. It is much better to define:
  /** Returns a CompletableFuture that isDone when closed. */
  CompletionStage<Carriable<T>> onClose();
In which case these questions and others are already answered, and we
can also omit awaitClose method.

>
> tryReceive(T def): always somewhat trouble to expect a default value
> of T, perhaps return Optional<T> if T is planned to be non-null.

I think best to keep this, but also add a variant of your other suggestion:
  boolean tryConsume(Consumer<? super T> proc); // false if closed or empty

> + closeExceptionally(Throwable) :

Yes, thanks; for the same reasons we added to SubmissionPublisher. Also
adding getClosedException() method.

> + onSenderReady(Runnable r) receiveAsPublisher(Executor) sendAsSubscriber(Executor)

... among other possibilities. I'm leaving interplay with Flow as TBD
for now, in part because...

On 3/6/20 5:14 PM, Thomas May wrote:

> It also could introduce interesting patterns like multiplexing… (IE,
> having multiple receivers getting the same message)

We already have a good multicaster, SubmissionPublisher. But I'm still
not sure of the best linkages.

On 3/6/20 6:06 PM, Alex Otenko wrote:

> Can methods return something more useful than void? Eg something
> that can be used to test progress? (I am afraid my imagination is
> limited to returning a ticket number, and a sequencer API to inspect
> whether send / close has a matching receive / await for such an
> event)

I can't think of enough use cases to justified added cost. Can you?

>
> SendSynchronously with timeout is ambiguous. If timeout occurs, was
> it placed in the buffer, and not received yet, or not even buffered?
>

The only thing you know is that upon exception, the item cannot have
been (and never will be) received. This is no different than other
methods. (although you are right that bounded+synchronous+timeout is
the most complicated to implement.)

... pasting updated draft ...


// snapshot: Sat Mar  7 11:13:22 2020  Doug Lea  (dl at nuc40)

// API sketches, with "public" omitted throughout

/**
 * A component for sending and receiving data. Carriers support
 * usages similar to those of BlockingQueues, but additionally
 * implement AutoCloseable, and may be explicitly closed for sending,
 * receiving, or both.
 *
 * This interface combines three sets of methods, defined in three
 * interfaces: Carriable methods access protocol state and
 * configuration. Interfaces CarrierSender and CarrierReceiver extend
 * Carriable with sender- and receiver- side views. Finally, this
 * interface combines these views.
 *
 * To control interactions between Thread interrupts and state, rather
 * than throwing InterruptedExceptions, potentially blocking methods
 * rely on a provided policy to distinguish cancelling the operation
 * vs closing the carrier vs ignoring the interrupt. The default for
 * current implementations is CANCEL, because it is the least
 * constraining; for example some mixed usages can catch
 * CancellationException to then close only when desired.
 *
 * Concrete implementation classes may enforce a given capacity (after
 * which method send will block waiting for available space), or be
 * effectively unbounded, in which case method send will never block
 * but may fail with an OutOfMemoryError.
 */
interface Carrier<T> extends CarrierSender<T>, CarrierReceiver<T> {
    // TBD: factory methods for jdk implementations
    // some utility methods, such as...
    static <E> CarrierReceiver<E> discardingCarrier(); // /dev/null analog
    // TBD: Flow (reactive stream) adaptors
}

/**
 * Methods accessing the protocol state and configuration of a
 * Carrier.
 */
interface Carriable<T> extends AutoCloseable {
    boolean isClosedForSending();
    boolean isClosedForReceiving();
    boolean isClosed();             // true if both sides closed
    boolean isOpen();               // { return !isClosed(); }
    boolean isEmpty();
    boolean isFull();               // never true if unbounded
    long    capacity();             // Long.MAX_VALUE if unbounded
    OnInterrupt interruptPolicy();  // return policy

    void close();                   // immediate close both sides
    void closeExceptionally(Throwable cause); // record as cause
    Throwable getClosedException();

    /** Returns a CompletableFuture that isDone when closed. */
    CompletionStage<Carriable<T>> onClose();
}

/**
 * Methods defining the sender-side view of a Carrier.
 */
interface CarrierSender<T> extends Carriable<T> {
    /**
     * Send item, throw if isClosedForSending, block if full.
     * May cancel or close on interrupt, depending on OnInterrupt policy.
     */
    void send(T item) throws ClosedException, CancellationException;

    /** Send and block until item received */
    void sendSynchronously(T item) throws ClosedException,
CancellationException;

    /** Try to send, upon timeout, the item is no longer available. */
    void send(T item, Duration timeout)
        throws ClosedException, CancellationException, TimeoutException;
    void sendSynchronously(T item, Duration timeout)
        throws ClosedException, CancellationException, TimeoutException;

    boolean trySend(T item);        // false if closed or full
    void closeForSending();         // fully close when isClosedForReceiving

    // linkage support; locators are opaque cookie-like identifiers
    void registerSource(CarrierSender<? super T> c, long locator);
}

/**
 * Methods defining the receiver-side view of a Carrier.
 */
interface CarrierReceiver<T> extends Carriable<T> {
    /**
     * Consume item, throw if isClosedForReceiving, block if empty.
     * May cancel or close on interrupt, depending on OnInterrupt policy.
     */
    T receive() throws ClosedException, CancellationException;
    T receive(Duration timeout)
        throws ClosedException, CancellationException, TimeoutException;

    T tryReceive(T resultIfAbsent); // absent if closed or empty
    T peek(T resultIfAbsent);       // may false-positive

    void closeForReceiving();       // fully close when isClosedForSending

    boolean tryConsume(Consumer<? super T> proc); // false if closed or
empty
    Stream<T> stream();             // destructive (consume-on-traverse)

    // notification of send or close by registered source
    void sourceSent(CarrierSender<? extends T> source, long locator, T
item);
    void sourceClosed(CarrierSender<? extends T> source, long locator);
}

// TBD: provide abstract class AbstractCarrier<T>.

class LinkedCarrier<T> implements Carrier<T> {
    LinkedCarrier(OnInterrupt policy);
    LinkedCarrier() { this(OnInterrupt.CANCEL); } // default
    // main linked implementation
    // coming soon, based on LinkedTransferQueue algorithms
}

class BufferedCarrier<T> implemnts Carrier<T> {
    // main array-based implementation(s)
    // coming later, with single- vs multiple- sink/source options
}

/**
 * A Carrier that aggregates sources established in its constructor.
 * The receive method blocks waiting for any to become available, then
 * returns the corresponding item.
 */
class CarrierSelector<T> implements CarrierReceiver<T> {
    Selector(<CarrierSender<? extends T> c, ...) {
        // for each c { c.registerSource(this, locatorFor(c)); }
    }
}

/**
 * A policy for responding to Thread.interrupt in blocking methods in
 * classes implementing AutoCloseable
 */
static Enum OnInterrupt {
    IGNORE,  // continue waiting
    CANCEL,  // throw CancellationException
    CLOSE    // close and throw ClosedException
}

// This could be placed in java.lang for use with any AutoCloseable
class ClosedException extends IllegalStateException {
    ClosedException(AutoCloseable c); // the closed component
    // copied from ExecutionException:
    /**
     * Constructs an {@code ClosedException} with the specified cause.
     * The detail message is set to {@code (cause == null ? null :
     * cause.toString())} (which typically contains the class and
     * detail message of {@code cause}).
     *
     * @param  cause the cause (which is saved for later retrieval by the
     *         {@link #getCause()} method)
     */
    public ClosedException(Throwable cause) {
        super(cause);
    }
    // ...
}





_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: draft Carrier API

JSR166 Concurrency mailing list
On 3/7/20 6:25 PM, Alex Otenko wrote:
> Send is declared to throw when is closed for sending. Is there a good
> reason to not throw when is closed for receiving? Or what is the
> intended behavior in this case, given that it may block if full?
>

Normally, a completed receiver should invoke (bidirectional) close.
Calling closeForReceiving provides more flexibility, but with more cases
for users to consider (like stuck senders).

But this question invites considering whether even having
closeForReceiving would lead to more errors than correct usages.
Considering that some of the motivation for Carrier is to reduce
opportunities for errors people encounter with hand-made components
built from BlockingQueues etc, I think we could remove it.

-Doug


_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: draft Carrier API

JSR166 Concurrency mailing list
In reply to this post by JSR166 Concurrency mailing list
On 3/7/20 6:17 PM, Alex Otenko wrote:

> A ticket system allows to build nicer things, because essentially you
> give away a bit of control what to do during  the wait. You can switch
> between try* and blocking after you made the call, choosing when the
> time to block is more suitable.

In other words, you'd sometimes like Carrier to act more like
ExecutorService (obtaining a Future or something like it on submission).
Stay tuned for part 3 of j.u.c loom-related support for doing this.

-Doug


_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: draft Carrier API

JSR166 Concurrency mailing list
In reply to this post by JSR166 Concurrency mailing list
I think it may be useful to clarify who is meant to call each of the close methods.

Eg is it receiver calling closeForSending to stop an inundation, and sender closeForReceiving to signal end of stream? Maybe senderDone / receiverDone or some better name can help. 

Also, it is not clear how the receive for a stream with no more items is meant to behave. Sender closes its end, the receiver blocks when empty. Or throws? That seems like the only way out, but seems really weird. A bit like python throwing StopIterationException.

Alex

On Sun, 8 Mar 2020, 11:55 Doug Lea, <[hidden email]> wrote:
On 3/7/20 6:25 PM, Alex Otenko wrote:
> Send is declared to throw when is closed for sending. Is there a good
> reason to not throw when is closed for receiving? Or what is the
> intended behavior in this case, given that it may block if full?
>

Normally, a completed receiver should invoke (bidirectional) close.
Calling closeForReceiving provides more flexibility, but with more cases
for users to consider (like stuck senders).

But this question invites considering whether even having
closeForReceiving would lead to more errors than correct usages.
Considering that some of the motivation for Carrier is to reduce
opportunities for errors people encounter with hand-made components
built from BlockingQueues etc, I think we could remove it.

-Doug



_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: draft Carrier API

JSR166 Concurrency mailing list
On 3/8/20 8:49 AM, Alex Otenko via Concurrency-interest wrote:
> I think it may be useful to clarify who is meant to call each of the
> close methods.

Right; thanks especially for the questions about what confused
programmers who aren't used to dealing with half-closed states might do.
 Probably best to stop using "close" except for full close. And kill
closeForReceiving. Leaving better names and simpler specs:

interface Carriable<T> extends AutoCloseable {
    boolean isClosed();
    boolean isFinishedSending();    // closed or sending disabled
    //...
}
interface CarrierSender<T> extends Carriable<T> {
    void finishSending();           // disable sending; close when empty
    // ...
}

Full versions as usual at: http://gee.cs.oswego.edu/dl/wwwtmp/Carrier.java

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: draft Carrier API

JSR166 Concurrency mailing list
I am familiar with half closed states, but in the socket world. The use of the same term is lost on me.

In networking, half closed socket is really a fully closed pipe in a duplex. Here it is more of half closed singular pipe - closing one end of the same pipe,  just from different ends. So some of the states that become possible are really strange.

Calling it finish is better, but perhaps some confusion about possibly interpreting it as an imperative instruction is still there. (Telling the sender to "Finish!")

My understanding that the intention is to capture Carrier transitioning like so:

sending and receiving -> sending complete, receiver can only drain buffered items, reject future send attempts 

Or

sending and receiving->receiving complete, no more items can be delivered, discard all buffered items and reject future send attempts, reject future attempts to receive 

Is that right?

In the first case receiver needs some hint how many receives are sensible, and if a receive is blocked on empty when sender transitions into the sending done state, the receiver needs a nice way out. Throwing seems like last resort, as typically exceptions indicate transitioning into error states. Returning Optional<T> can be better.

Alex

On Sun, 8 Mar 2020, 13:37 Doug Lea, <[hidden email]> wrote:
On 3/8/20 8:49 AM, Alex Otenko via Concurrency-interest wrote:
> I think it may be useful to clarify who is meant to call each of the
> close methods.

Right; thanks especially for the questions about what confused
programmers who aren't used to dealing with half-closed states might do.
 Probably best to stop using "close" except for full close. And kill
closeForReceiving. Leaving better names and simpler specs:

interface Carriable<T> extends AutoCloseable {
    boolean isClosed();
    boolean isFinishedSending();    // closed or sending disabled
    //...
}
interface CarrierSender<T> extends Carriable<T> {
    void finishSending();           // disable sending; close when empty
    // ...
}

Full versions as usual at: http://gee.cs.oswego.edu/dl/wwwtmp/Carrier.java


_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: draft Carrier API

JSR166 Concurrency mailing list
Naming things is maybe the hardest problem in computer science :) but in the end people are going to get used to the convention, whatever you choose.

But the type of receive() is going to affect how the code is structured. So I would suggest to spend some time understanding the implications.

Was: blocking queue, can be shown to be a co-list. (take() is the destructor removing head)

Now: blocking queue with an end, so it becomes a representation of a list (possibly finite). At type level it becomes a union of co-list and a unit. The disjoined unit is a special value returned (eg -1 returned by input stream read), or a mutually exclusive continuation (onNext delivers head vs onComplete delivers unit; fold gets a case for empty list and a function for the case when the list has a head). So it's inevitable the special value emerges in the API. Null, a special exception, empty Optional are probably all you can do in Java.

I'd think Optional is the best choice. Null is not safe. Exceptions require quite a bit of boilerplate, which can be annoying, especially given that it doesn't reflect an error state.

Alex

On Sun, 8 Mar 2020, 16:36 Alex Otenko, <[hidden email]> wrote:
I am familiar with half closed states, but in the socket world. The use of the same term is lost on me.

In networking, half closed socket is really a fully closed pipe in a duplex. Here it is more of half closed singular pipe - closing one end of the same pipe,  just from different ends. So some of the states that become possible are really strange.

Calling it finish is better, but perhaps some confusion about possibly interpreting it as an imperative instruction is still there. (Telling the sender to "Finish!")

My understanding that the intention is to capture Carrier transitioning like so:

sending and receiving -> sending complete, receiver can only drain buffered items, reject future send attempts 

Or

sending and receiving->receiving complete, no more items can be delivered, discard all buffered items and reject future send attempts, reject future attempts to receive 

Is that right?

In the first case receiver needs some hint how many receives are sensible, and if a receive is blocked on empty when sender transitions into the sending done state, the receiver needs a nice way out. Throwing seems like last resort, as typically exceptions indicate transitioning into error states. Returning Optional<T> can be better.

Alex

On Sun, 8 Mar 2020, 13:37 Doug Lea, <[hidden email]> wrote:
On 3/8/20 8:49 AM, Alex Otenko via Concurrency-interest wrote:
> I think it may be useful to clarify who is meant to call each of the
> close methods.

Right; thanks especially for the questions about what confused
programmers who aren't used to dealing with half-closed states might do.
 Probably best to stop using "close" except for full close. And kill
closeForReceiving. Leaving better names and simpler specs:

interface Carriable<T> extends AutoCloseable {
    boolean isClosed();
    boolean isFinishedSending();    // closed or sending disabled
    //...
}
interface CarrierSender<T> extends Carriable<T> {
    void finishSending();           // disable sending; close when empty
    // ...
}

Full versions as usual at: http://gee.cs.oswego.edu/dl/wwwtmp/Carrier.java


_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: draft Carrier API

JSR166 Concurrency mailing list
In reply to this post by JSR166 Concurrency mailing list
Just a quick comment,
I would much prefer if Carrier stuck with using the standard [long
timeout, TimeUnit unit]
used throughout java.util.concurrent. Instead of adopting Duration for
a single class.

/Kasper

On Fri, 6 Mar 2020 at 15:22, Doug Lea <[hidden email]> wrote:

>
>
> [Cross-posting concurrency-interest and loom-dev.]
>
> To continue improving java.util.concurrent support for increasingly
> diverse programming styles (while still avoiding arguments about whether
> any of them are best!), it would be helpful to provide "BlockingQueues
> meet AutoCloseable" APIs that are loom-friendly, but not loom-specific.
> A sketch is pasted below. To avoid mail-reader glitches, you might want
> to read updated snapshots at gee.cs.oswego.edu/dl/wwwtmp/Carrier.java
>
> Suggestions and comments are welcome. An initial implementation class
> (LinkedCarrier) should be available shortly after API issues settle;
> others later.
>
> ...
>
> // API sketches, with "public" omitted throughout
>
> /**
>  * A component for sending and receiving messages. Carriers support
>  * usages similar to those of BlockingQueues, but additionally
>  * implement AutoCloseable, and may be explicitly closed for sending,
>  * receiving, or both. Carriers also provide policy-based control for
>  * responses to Thread.interrupt while blocked (ignoring, cancelling
>  * the current operation only, or closing the carrier). Concrete
>  * implementation classes may be created with a given capacity (after
>  * which method send will block waiting for available space), or
>  * effectively unbounded, in which case method send will never block
>  * but may fail with an OutOfMemoryError.
>  *
>  * Design notes:
>  *
>  * (1) Both send and receive methods are declared here, but allowing
>  * either side to be permanently (vs eventually) closed for send-only
>  * or receive-only components. This loses some static type checking
>  * opportunities of separate send and receive APIs. However the class
>  * includes methods (in the style of Collections.unmodifiableX) to
>  * produce views that provide dynamic directionality enforcement.
>  *
>  * (2) This is an abstract class (rather than interface) providing
>  * uniform Observer-syle methods for Selectors and related
>  * classes. The alternative is some sort of SPI.
>  *
>  * (3) To control interactions between Thread interrupts and state,
>  * rather than throwing InterruptedExceptions, potentially blocking
>  * methods rely on a provided policy to distinguish cancelling the
>  * operation vs closing the carrier vs ignoring the interrupt. The
>  * default is CANCEL, because it is the least constraining; for
>  * example some mixed usages can catch CancellationException to then
>  * close only when desired.
>  *
>  * (4) To broaden coverage of channel-based programming styles,
>  * implementations support sendSynchronously, which is otherwise
>  * available in BlockingQueues only as the poorly-named and underused
>  * method LinkedTransferQueue.transfer.
>  */
> abstract class Carrier<T> implements AutoCloseable {
>     Carrier(OnInterrupt policy);
>     Carrier() { this(OnInterrupt.CANCEL); } // default
>
>     // Basic messaging
>
>     /**
>      * Consume item, throw if isClosedForReceiving, block if empty.
>      * May cancel or close on interrupt, depending on OnInterrupt policy.
>      */
>     T receive() throws ClosedException, CancellationException;
>
>     /**
>      * Send item, throw if isClosedForSending, block if full.
>      * May cancel or close on interrupt, depending on OnInterrupt policy.
>      */
>     void send(T item) throws ClosedException, CancellationException;
>
>     /** Send and block until item received */
>     void sendSynchronously(T item) throws ClosedException,
> CancellationException;
>
>     // Timeout versions
>     T receive(Duration timeout)
>         throws ClosedException, CancellationException, TimeoutException;
>     void send(T item, Duration timeout)
>         throws ClosedException, CancellationException, TimeoutException;
>     void sendSynchronously(T item, Duration timeout)
>         throws ClosedException, CancellationException, TimeoutException;
>
>     // Non-blocking access
>     boolean trySend(T item);        // false if closed or full
>     T tryReceive(T resultIfAbsent); // absent if closed or empty
>     T peek(T resultIfAbsent);       // may false-positive
>
>     // Termination
>     void closeForSending();         // fully close when isClosedForReceiving
>     void closeForReceiving();       // vice-versa
>     void close();                   // immediate close
>     void awaitClose() throws interruptedException;
>     void onClose(Runnable closeHandler); // run by thread triggering close
>
>     // Status
>     boolean isClosedForSending();
>     boolean isClosedForReceiving();
>     boolean isClosed();             // true if both sides closed
>     boolean isOpen()                { return !isClosed(); }
>     boolean isEmpty();
>     boolean isFull();               // never true if unbounded
>     long    capacity();             // Long.MAX_VALUE if unbounded
>     OnInterrupt interruptPolicy();  // return policy
>
>     // linkage support, noops here; locators are opaque cookie-like
> identifiers
>     protected void registerSource(Carrier<? super T> c, long locator) {}
>     // notification of send or close by registered carrier
>     protected void sourceEvent(long locator, boolean isClosed) {}
>
>     // views to disable one direction; similar to Collections.unmodifiableX
>     static <E> Carrier<E> sendOnlyCarrier(Carrier<E> c);
>     static <E> Carrier<E> receiveOnlyCarrier(Carrier<E> c);
>
>     // other possible utilities
>     Stream<T> stream();             // destructive (consume-on-traverse)
>     static <E> Carrier<E> discardingCarrier(); // /dev/null analog
>     // TBD: selector as static factory method vs class (as below)
>     // TBD: Flow (reactive stream) adaptors
> }
>
> class LinkedCarrier<T> extends Carrier<T> {
>     // main linked implementation
>     // coming soon, based on LinkedTransferQueue algorithms
> }
>
> class BufferedCarrier<T> extends Carrier<T> {
>     // main array-based implementation(s)
>     // coming later, with single- vs multiple- sink/source options
> }
>
> /**
>  * A Carrier that aggregates sources established in its constructor.
>  * The receive method blocks waiting for any to become available, then
>  * returns the corresponding item. Selectors are always closed for
>  * sending, and may become fully closed when all sources close.
>  */
> class Selector<T> extends Carrier<T> { // possibly a more specific name
>     Selector(<Carrier<? extends T> c, ...) {
>         // for each c { c.registerSource(this, locatorFor(c)); }
>     }
>     boolean isClosedForSending() { return true; }
>     // ...
> }
>
> /**
>  * A policy for responding to Thread.interrupt in blocking methods in
>  * classes implementing AutoCloseable
>  */
> static Enum OnInterrupt {
>     IGNORE,  // continue waiting
>     CANCEL,  // throw CancellationException
>     CLOSE    // close and throw ClosedException
> }
>
> // This could be placed in java.lang for use with any AutoCloseable
> class ClosedException extends IllegalStateException {
>     ClosedException(AutoCloseable c); // the closed component
>     // ...
> }
>
_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: draft Carrier API

JSR166 Concurrency mailing list
In reply to this post by JSR166 Concurrency mailing list
On 3/8/20 12:36 PM, Alex Otenko wrote:
> I am familiar with half closed states, but in the socket world. The use
> of the same term is lost on me.

Sorry for the mysterious-sounding answer. "Half-closed" is a design
pattern that might never have been written up well but is encountered
all the time in concurrent settings under different names. Which makes
the naming challenging here. On a little more thought, probably the best
choice of terms here are based on those used in ExecutorService, which
most people are more familiar with, and where shutdownNow() is a full
close; and shutdown() disables submissions and triggers full close when
tasks are completed. So:

interface CarrierSender<T> extends Carriable<T> {
  void shutdownSending(); // disable further sending; close when empty
  // ...
}
interface Carriable<T> extends AutoCloseable {
  boolean isClosed();
  boolean isShutdownSending(); // true after shutdownSending or close
  // ...
}


_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: draft Carrier API

JSR166 Concurrency mailing list
In reply to this post by JSR166 Concurrency mailing list
On 3/8/20 6:59 PM, Kasper Nielsen wrote:
> Just a quick comment,
> I would much prefer if Carrier stuck with using the standard [long
> timeout, TimeUnit unit]
> used throughout java.util.concurrent. Instead of adopting Duration for
> a single class.
>

Right. Actually, include both. I listed it without TimeUnit overloads
mainly as a check that we'd get responses confirming that current
BlockingQueue etc users would consider Carrier as an alternative.

(See updates at http://gee.cs.oswego.edu/dl/wwwtmp/Carrier.java)

-Doug


_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: draft Carrier API

JSR166 Concurrency mailing list
In reply to this post by JSR166 Concurrency mailing list
Hi Doug,

> On 7 Mar 2020, at 16:29, Doug Lea via Concurrency-interest <[hidden email]> wrote:
>
>
> Collecting replies/responses, with updated sketches pasted below and at
> https://urldefense.com/v3/__http://gee.cs.oswego.edu/dl/wwwtmp/Carrier.java__;!!GqivPVa7Brio!PZhOkwHJkZRpKSYlBvCwjh6FFuBxTE1YQ5rgYjx4FGbnzfnK03nLFEHaW0lIbt8_SvE$ 


This latest version looks like it is headed in the right direction.

Just a few questions / clarifications around closing:

1) closeExceptionally - what effect will the passed `cause` Throwable
   have on other methods, say, like receive? Will the passed `cause` be
   the cause of the ClosedException ( thrown from a thread blocked in
   receive )?   I assume the `onClose` CF will receive this cause too?

2) ClosedException is an IllegalStateException - Ok. If the OnInterrupt
   policy is `CLOSE`, then a thread already blocked in a receive
   invocation will throw ClosedException - same as it would if the
   carrier was already closed before the receive invocation. Receiving
   an IllegalStateException for an interrupt seems a little odd to me
   for this case (but maybe that is ok). Given this, then it is not
   possible to discern the difference between a carrier that was closed
   prior to receive or if receive was interrupted.  Hmm... maybe this is
   the point - consistent behavior in the face of async-close? Oops...
   now I ask myself will async-close result in ClosedException, or
   interrupt of waiters?
   
3) Should all carriers be, in effect, closeable? What would be the
   affect of Carrier.discardingCarrier().close(). Should this carrier be
   effectively uncloseable, so there could be a singleton instance?

-Chris.
_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: draft Carrier API

JSR166 Concurrency mailing list
IllegalStateException is ok if receiver should've known there are no more items to receive. This is a good idea in cases with definite length of stream, and the length being known to the receiver before entering receive(). This doesn't seem like a good idea for indefinite length cases - like, loop to read all items until eof.

If you use exceptions to signal eof, neat loops turn into a cludge of an infinite loop with try-catch and a break inside, messing up scopes in the process.

Neat loop:
for(Optional<T> x = carrier.receive(); !x.isEmpty(); x = carrier.receive()) ...

Ugly scope and nesting:
for(;;) {
  T x;
  try{
    x = carrier.receive();
  } catch (StopIterationException sie) {
      // did it collect the stack trace that is not needed?
      break;
  }
  ...
}

Alex

On Mon, 9 Mar 2020, 14:49 Chris Hegarty, <[hidden email]> wrote:
Hi Doug,

> On 7 Mar 2020, at 16:29, Doug Lea via Concurrency-interest <[hidden email]> wrote:
>
>
> Collecting replies/responses, with updated sketches pasted below and at
> https://urldefense.com/v3/__http://gee.cs.oswego.edu/dl/wwwtmp/Carrier.java__;!!GqivPVa7Brio!PZhOkwHJkZRpKSYlBvCwjh6FFuBxTE1YQ5rgYjx4FGbnzfnK03nLFEHaW0lIbt8_SvE$


This latest version looks like it is headed in the right direction.

Just a few questions / clarifications around closing:

1) closeExceptionally - what effect will the passed `cause` Throwable
   have on other methods, say, like receive? Will the passed `cause` be
   the cause of the ClosedException ( thrown from a thread blocked in
   receive )?   I assume the `onClose` CF will receive this cause too?

2) ClosedException is an IllegalStateException - Ok. If the OnInterrupt
   policy is `CLOSE`, then a thread already blocked in a receive
   invocation will throw ClosedException - same as it would if the
   carrier was already closed before the receive invocation. Receiving
   an IllegalStateException for an interrupt seems a little odd to me
   for this case (but maybe that is ok). Given this, then it is not
   possible to discern the difference between a carrier that was closed
   prior to receive or if receive was interrupted.  Hmm... maybe this is
   the point - consistent behavior in the face of async-close? Oops...
   now I ask myself will async-close result in ClosedException, or
   interrupt of waiters?

3) Should all carriers be, in effect, closeable? What would be the
   affect of Carrier.discardingCarrier().close(). Should this carrier be
   effectively uncloseable, so there could be a singleton instance?

-Chris.

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: draft Carrier API

JSR166 Concurrency mailing list
In reply to this post by JSR166 Concurrency mailing list
On 3/9/20 10:49 AM, Chris Hegarty via Concurrency-interest wrote:

>
> 1) closeExceptionally - what effect will the passed `cause` Throwable
>    have on other methods, say, like receive? Will the passed `cause` be
>    the cause of the ClosedException ( thrown from a thread blocked in
>    receive )?  

Yes. To be clearer, method getCloseException  should be renamed
getCloseCause (returning null if non-exceptional).

>  I assume the `onClose` CF will receive this cause too?

The CF holds "this" so can be accessed.

>
> 2) ClosedException is an IllegalStateException - Ok. If the OnInterrupt
>    policy is `CLOSE`, then a thread already blocked in a receive
>    invocation will throw ClosedException - same as it would if the
>    carrier was already closed before the receive invocation. Receiving
>    an IllegalStateException for an interrupt seems a little odd to me
>    for this case (but maybe that is ok). Given this, then it is not
>    possible to discern the difference between a carrier that was closed
>    prior to receive or if receive was interrupted.  Hmm... maybe this is
>    the point - consistent behavior in the face of async-close?

Right.
>    now I ask myself will async-close result in ClosedException, or
>    interrupt of waiters?
Abrupt closes always interrupt blocked threads, and even under "IGNORE"
policy will cause them to throw ClosedException. (This is one of several
reasons for policy-based interrupt handling.)

>    
> 3) Should all carriers be, in effect, closeable? What would be the
>    affect of Carrier.discardingCarrier().close(). Should this carrier be
>    effectively uncloseable, so there could be a singleton instance?

Not sure.  It's analogous to the ForkJoinPool.commonPool, that just
ignores shutdown, in explicit violation of ExecutorService spec. Which
no one complains about. The same could be done here. I suppose that the
spec for close could be phrased in a way that allows "permanent"
entities to ignore close.

-Doug



_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: draft Carrier API

JSR166 Concurrency mailing list
Thanks Doug, all sounds good to me.

-Chris.

> On 10 Mar 2020, at 10:41, Doug Lea <[hidden email]> wrote:
>
> On 3/9/20 10:49 AM, Chris Hegarty via Concurrency-interest wrote:
>
>>
>> 1) closeExceptionally - what effect will the passed `cause` Throwable
>>   have on other methods, say, like receive? Will the passed `cause` be
>>   the cause of the ClosedException ( thrown from a thread blocked in
>>   receive )?  
>
> Yes. To be clearer, method getCloseException  should be renamed
> getCloseCause (returning null if non-exceptional).
>
>> I assume the `onClose` CF will receive this cause too?
>
> The CF holds "this" so can be accessed.
>
>>
>> 2) ClosedException is an IllegalStateException - Ok. If the OnInterrupt
>>   policy is `CLOSE`, then a thread already blocked in a receive
>>   invocation will throw ClosedException - same as it would if the
>>   carrier was already closed before the receive invocation. Receiving
>>   an IllegalStateException for an interrupt seems a little odd to me
>>   for this case (but maybe that is ok). Given this, then it is not
>>   possible to discern the difference between a carrier that was closed
>>   prior to receive or if receive was interrupted.  Hmm... maybe this is
>>   the point - consistent behavior in the face of async-close?
>
> Right.
>>   now I ask myself will async-close result in ClosedException, or
>>   interrupt of waiters?
> Abrupt closes always interrupt blocked threads, and even under "IGNORE"
> policy will cause them to throw ClosedException. (This is one of several
> reasons for policy-based interrupt handling.)
>
>>
>> 3) Should all carriers be, in effect, closeable? What would be the
>>   affect of Carrier.discardingCarrier().close(). Should this carrier be
>>   effectively uncloseable, so there could be a singleton instance?
>
> Not sure.  It's analogous to the ForkJoinPool.commonPool, that just
> ignores shutdown, in explicit violation of ExecutorService spec. Which
> no one complains about. The same could be done here. I suppose that the
> spec for close could be phrased in a way that allows "permanent"
> entities to ignore close.
>
> -Doug
>
>
>

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: draft Carrier API

JSR166 Concurrency mailing list
In reply to this post by JSR166 Concurrency mailing list
On 3/9/20 3:29 PM, Alex Otenko wrote:
> IllegalStateException is ok if receiver should've known there are no
> more items to receive. This is a good idea in cases with definite length
> of stream, and the length being known to the receiver before entering
> receive(). This doesn't seem like a good idea for indefinite length
> cases - like, loop to read all items until eof.
>
This is the reason for:
    Stream<T> stream();             // destructive (consume-on-traverse)
But it is also sensible to provide a simpler forEach analog:
    long consumeEach(Consumer<? super T> proc); // return count

For those who need stateful loops, we could add "eventually" forms of
tryReceive. With non-value-types, the preferable form that can co-exist
with value-types is usually to return a resultIfAbsent (that is almost
always chosen to be null), and for value types, Optional. To avoid
annoying people, we should probably have both.

    T tryReceive(T resultIfAbsent); // resultIfAbsent if closed or empty
    Optional<T> tryReceive();       // Optional.empty if closed or empty

    T tryReceiveEventually(T resultIfAbsent); // resultIfAbsent if closed
    Optional<T> tryReceiveEventually(); // Optional.empty if closed

Maybe there is a better method name.

(See updates at http://gee.cs.oswego.edu/dl/wwwtmp/Carrier.java)

-Doug

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
12