proposal: Thread.currentExecutor()

classic Classic list List threaded Threaded
24 messages Options
12
Reply | Threaded
Open this post in threaded view
|

proposal: Thread.currentExecutor()

JSR166 Concurrency mailing list
I believe there exists deep analogy between synchronous and asynchronous
worlds. Just as any method can call other methods /on the same thread,/ any
asynchronous procedure must be able to start other asynchronous procedures
/on the same thread pool/. To enable this, I propose to add new method to
the standard java API which would return the reference  to the thread pool
to which current thread belongs. If the current thread does not belong to
any thread pool, then the user should be able to set this property with some
default thread pool.

By asynchronous procedure I mean java.util.concurrent.CompletableFuture,
java.nio.channels.CompletionHandler, or any  construct which requires a
thread pool to execute: Akka actor, Kotlin coroutine, RxJava Observable etc.

I believe it should be named java.lang.Thread.currentExecutor(), similar to
Thread.currentThread(), but can also be named defaultExecutor or
defaultThreadPool.

I am looking for help and support to compose a  JEP (Java Extension
Proposal).



--
Sent from: http://jsr166-concurrency.10961.n7.nabble.com/
_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: proposal: Thread.currentExecutor()

JSR166 Concurrency mailing list
Hi Alexei,

If for a moment, we limit the applicability of your proposal to
ComplatableFuture only, then the obvious question is: "What would we
want to achieve with such API addition?". You say that you want to be
able to execute an asynchronous task in the same Executor as the
executor of the thread executing previous asynchronous task. Is that the
behavior you want to achieve? Well, I would want to achieve the same or
similar thing in many occasions. So let's see some examples:

     CompletableFuture.supplyAsync(Supplier, Executor)

This case is obvious. The task is submitted by the thread that executes
the supplyAsync method. So if this thread is an asynchronous
computation, we would achieve the desired effect by passing
Thread.currentExecutor() result to this method. But what about this:

     CompletableFuture cf = ....;
     ...

     cf.thenApplyAsync(Function, Executor)

What is the executor of the thread executing an asynchronous computation
represented by 'cf' which is the "previous" asynchronous task? It can be
and usually is a different executor than the executor of the thread
executing the thenApplyAsync method. So passing the result of
Thread.currentExecutor() into this method would not achieve the desired
effect.

That said, I too would need similar functionality in many occasions. 
You can get the "nearly" desired functionality by using the
non-asynchronous methods of CompletableFuture. For example:

     CompletableFuture cf = ....;
     ...

     cf.thenApply(Function)

The documentation says that the submitted task will be executed
synchronously by the same thread that completes the 'cf' unless 'cf' has
already been completed before calling the thenApply method in which case
it will be executed by current thread (the one calling thenApply
method). So the behavior depends on timing. That's very unfortunate if
the thread calling thenApply method is a scarce resource (for example a
single thread in an event loop) and is meant to only dispatch events. It
can happen that it also executes tasks if timing is right.

So what would be very useful for CompletableFuture (or another kind of
CompletionStage?) would be to have a special mode of building the
execution chains of tasks that would never employ the thread that calls
the CF API to actually execute the tasks.

This can be achieved today with consistent calls to thenXXXAsync()
methods by passing the desired Executor, but then the tasks are really
executed each in it's own asynchronous computation. If one wanted to
"append" a task that would execute synchronously in the same thread as
the previous in the chain immediately after it if it is possible or else
in the same executor as the previous task, this can't be achieved today.

I'm thinking of the following semantics and method names... We already
have thenXxxAsync methods without Executor parameter and thenXxxAsync
methods with Executor parameter with defined semantics. We already have
thenXxx methods without Executor parameter with defined semantics.
Obvious candidate for new semantics would be thenXxx methods with
Executor parameter. For example:

     CompletableFuture cf = ....;
     ...

     cf.thenApply(Function, Executor)

The behaviour of this method would be: If the 'cf' is not completed yet,
it submits a task that will be completed synchronously in the same
thread that completes the 'cf'. If 'cf' is already completed, it submits
an asynchronous task to be executed in the provided Executor.

Would that make sense as an addition to the CompletableFuture API?

Regards, Peter


On 3/11/19 6:47 PM, Alexei Kaigorodov via Concurrency-interest wrote:

> I believe there exists deep analogy between synchronous and asynchronous
> worlds. Just as any method can call other methods /on the same thread,/ any
> asynchronous procedure must be able to start other asynchronous procedures
> /on the same thread pool/. To enable this, I propose to add new method to
> the standard java API which would return the reference  to the thread pool
> to which current thread belongs. If the current thread does not belong to
> any thread pool, then the user should be able to set this property with some
> default thread pool.
>
> By asynchronous procedure I mean java.util.concurrent.CompletableFuture,
> java.nio.channels.CompletionHandler, or any  construct which requires a
> thread pool to execute: Akka actor, Kotlin coroutine, RxJava Observable etc.
>
> I believe it should be named java.lang.Thread.currentExecutor(), similar to
> Thread.currentThread(), but can also be named defaultExecutor or
> defaultThreadPool.
>
> I am looking for help and support to compose a  JEP (Java Extension
> Proposal).
>
>
>
> --
> Sent from: http://jsr166-concurrency.10961.n7.nabble.com/
> _______________________________________________
> Concurrency-interest mailing list
> [hidden email]
> http://cs.oswego.edu/mailman/listinfo/concurrency-interest

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: proposal: Thread.currentExecutor()

JSR166 Concurrency mailing list
In reply to this post by JSR166 Concurrency mailing list
On Mon, Mar 11, 2019 at 5:48 PM Alexei Kaigorodov via
Concurrency-interest <[hidden email]> wrote:
>
> To enable this, I propose to add new method to
> the standard java API which would return the reference  to the thread pool
> to which current thread belongs.

If I were you I would make a good case for that first. Explain
exhaustively why this feature is needed, with examples. List the
alternatives and listen carefully to the feedback. Rinse, repeat. JEP
is more of a procedural step compared to those previous steps and can
be done later.

-Pavel
_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: proposal: Thread.currentExecutor()

JSR166 Concurrency mailing list
In reply to this post by JSR166 Concurrency mailing list
On 3/12/19 5:58 AM, Peter Levart via Concurrency-interest wrote:

>     CompletableFuture cf = ....;
>     ...
>
>     cf.thenApply(Function, Executor)
>
> The behaviour of this method would be: If the 'cf' is not completed yet,
> it submits a task that will be completed synchronously in the same
> thread that completes the 'cf'. If 'cf' is already completed, it submits
> an asynchronous task to be executed in the provided Executor.
>

Thanks for the analysis. I agree that if we had this method (and its
variants), people would be unlikely to want a currentExecutor() method.
(Which is not quite to conclude yet that we should do it.)

And those who still do want some form of currentExecutor() could just
use a ThreadLocal.

Further opinions welcome though.

-Doug


_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: proposal: Thread.currentExecutor()

JSR166 Concurrency mailing list
Check if the current Thread’s ThreadGroup is instanceOf Executor? ;-)

On Tue, 12 Mar 2019 at 16:30, Doug Lea via Concurrency-interest <[hidden email]> wrote:
On 3/12/19 5:58 AM, Peter Levart via Concurrency-interest wrote:

>     CompletableFuture cf = ....;
>     ...
>
>     cf.thenApply(Function, Executor)
>
> The behaviour of this method would be: If the 'cf' is not completed yet,
> it submits a task that will be completed synchronously in the same
> thread that completes the 'cf'. If 'cf' is already completed, it submits
> an asynchronous task to be executed in the provided Executor.
>

Thanks for the analysis. I agree that if we had this method (and its
variants), people would be unlikely to want a currentExecutor() method.
(Which is not quite to conclude yet that we should do it.)

And those who still do want some form of currentExecutor() could just
use a ThreadLocal.

Further opinions welcome though.

-Doug


_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
--
Cheers,

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: proposal: Thread.currentExecutor()

JSR166 Concurrency mailing list
In reply to this post by JSR166 Concurrency mailing list
I think what Peter wants is more like:

  public static <T, U> CompletableFuture<U> then(CompletableFuture<T> cf,
                                                 Function<T, U> f) {
    final AtomicInteger ai = new AtomicInteger();
    final CompletableFuture<U> that = cf.thenCompose(i -> ai.get() == 0 ?
                                                           cf.thenApplyAsync(f):
                                                           cf.thenApply(f));
    ai.set(1);
    return that;
  }

Not so much a ThreadLocal, but a witness of who got to thenCompose_ - this method, or someone concurrent.

Alex


On 12 Mar 2019, at 15:28, Doug Lea via Concurrency-interest <[hidden email]> wrote:

On 3/12/19 5:58 AM, Peter Levart via Concurrency-interest wrote:

    CompletableFuture cf = ....;
    ...

    cf.thenApply(Function, Executor)

The behaviour of this method would be: If the 'cf' is not completed yet,
it submits a task that will be completed synchronously in the same
thread that completes the 'cf'. If 'cf' is already completed, it submits
an asynchronous task to be executed in the provided Executor.


Thanks for the analysis. I agree that if we had this method (and its
variants), people would be unlikely to want a currentExecutor() method.
(Which is not quite to conclude yet that we should do it.)

And those who still do want some form of currentExecutor() could just
use a ThreadLocal.

Further opinions welcome though.

-Doug


_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest


_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: proposal: Thread.currentExecutor()

JSR166 Concurrency mailing list
In reply to this post by JSR166 Concurrency mailing list
Peter,

I think the proposed semantics of method "thenApply(Function, Executor)" is misleading. The user provides
Executor, but sometimes the Function is executed on that Executor, and sometimes not. This is very confusing.

Anyway, this is not similar to what I am looking for.

thanks,
Alexei

On Tue, 12 Mar 2019 at 16:59, Peter Levart <[hidden email]> wrote:
Obvious candidate for new semantics would be thenXxx methods with
Executor parameter. For example:

     CompletableFuture cf = ....;
     ...

     cf.thenApply(Function, Executor)

The behaviour of this method would be: If the 'cf' is not completed yet,
it submits a task that will be completed synchronously in the same
thread that completes the 'cf'. If 'cf' is already completed, it submits
an asynchronous task to be executed in the provided Executor.

Would that make sense as an addition to the CompletableFuture API?

Regards, Peter
On 3/11/19 6:47 PM, Alexei Kaigorodov via Concurrency-interest wrote:
> I believe there exists deep analogy between synchronous and asynchronous
> worlds. Just as any method can call other methods /on the same thread,/ any
> asynchronous procedure must be able to start other asynchronous procedures
> /on the same thread pool/. To enable this, I propose to add new method to
> the standard java API which would return the reference  to the thread pool
> to which current thread belongs. If the current thread does not belong to
> any thread pool, then the user should be able to set this property with some
> default thread pool.
>
> By asynchronous procedure I mean java.util.concurrent.CompletableFuture,
> java.nio.channels.CompletionHandler, or any  construct which requires a
> thread pool to execute: Akka actor, Kotlin coroutine, RxJava Observable etc.
>
> I believe it should be named java.lang.Thread.currentExecutor(), similar to
> Thread.currentThread(), but can also be named defaultExecutor or
> defaultThreadPool.
>
> I am looking for help and support to compose a  JEP (Java Extension
> Proposal).
>
>
>
> --
> Sent from: http://jsr166-concurrency.10961.n7.nabble.com/
> _______________________________________________
> Concurrency-interest mailing list
> [hidden email]
> http://cs.oswego.edu/mailman/listinfo/concurrency-interest


_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: proposal: Thread.currentExecutor()

JSR166 Concurrency mailing list
In reply to this post by JSR166 Concurrency mailing list
Pavel,

Let we have a simple method asyncJob0 and call it asynchronously:
    Executor exec = ForkJoinPool.commonPool();
CompletableFuture<String> cf0 = CompletableFuture.supplyAsync(CurrentExec::asyncJob0, exec);
System.out.println(cf0.get());

Sooner or later, we want to refactor (parallelize) the method asyncJob0, but we do not want to change the place where it is called. This is a common use case when a library is evolving. So we change the method as follows (its name changed to asyncJob1 only for convenience, in real life it would not):

static String asyncJob1() {
    Executor exec = currentExecutor();
    CompletableFuture<String> cf0 = CompletableFuture.supplyAsync(CurrentExec::asyncJob0, exec);
    String part1 = "+asyncJob1"; // suppose this is a heavy computation
    try {
        return cf0.get()+part1;
    } catch (Exception e) {
        throw new RuntimeException(e);
    }
}

So we made 2 parallel branches: one computes cf0 and the other computes part1. And we want to compute cf0 on the same thread pool as that is currently used - just because we have no other computational resources at hand, all resources are controlled by the user and we do not want to bother him and ask him to resolve our problems. For this we call exec = currentExecutor(); the implementation in the ghist works for ForkJoinPool but does not work for others (@Viktor Klang: checking if the thread group implements Executor, does not work for result of Executors.newFixedThreadPool()).

This solution is not ideal because it may block when calling to cf0.get() -  but it is the result of limitations of CompletableFuture design: we have no other way to provide the result. The right solution would be to replace the single asynchronous procedure call of asyncJob0 with 3: two parallel branches plus computing the final result. This is possible when some other asynchronous frameworks used.


thanks
Alexei


On Tue, 12 Mar 2019 at 18:51, Pavel Rappo <[hidden email]> wrote:
On Mon, Mar 11, 2019 at 5:48 PM Alexei Kaigorodov via
Concurrency-interest <[hidden email]> wrote:
>
> To enable this, I propose to add new method to
> the standard java API which would return the reference  to the thread pool
> to which current thread belongs.

If I were you I would make a good case for that first. Explain
exhaustively why this feature is needed, with examples. List the
alternatives and listen carefully to the feedback. Rinse, repeat. JEP
is more of a procedural step compared to those previous steps and can
be done later.

-Pavel

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: proposal: Thread.currentExecutor()

JSR166 Concurrency mailing list
An Executors.currentExecutor() seems more feasible given that Thread is in java.lang

However, there are tons of subleties here, for instance: sometimes an Executor is simply overlaying logic ontop of another Executor, in those cases you’d want the overlaying Executor to be returned.

I assume it would be rather simple to retrofit current jsr166 pools to do the right thing, but there are many customizations/alternative implementations out there and I see no easy solution to that.

Cheers,
V



On Tue, 12 Mar 2019 at 18:48, Alexei Kaigorodov via Concurrency-interest <[hidden email]> wrote:
Pavel,

Let we have a simple method asyncJob0 and call it asynchronously:
    Executor exec = ForkJoinPool.commonPool();
CompletableFuture<String> cf0 = CompletableFuture.supplyAsync(CurrentExec::asyncJob0, exec);
System.out.println(cf0.get());

Sooner or later, we want to refactor (parallelize) the method asyncJob0, but we do not want to change the place where it is called. This is a common use case when a library is evolving. So we change the method as follows (its name changed to asyncJob1 only for convenience, in real life it would not):

static String asyncJob1() {
    Executor exec = currentExecutor();
    CompletableFuture<String> cf0 = CompletableFuture.supplyAsync(CurrentExec::asyncJob0, exec);
    String part1 = "+asyncJob1"; // suppose this is a heavy computation
    try {
        return cf0.get()+part1;
    } catch (Exception e) {
        throw new RuntimeException(e);
    }
}

So we made 2 parallel branches: one computes cf0 and the other computes part1. And we want to compute cf0 on the same thread pool as that is currently used - just because we have no other computational resources at hand, all resources are controlled by the user and we do not want to bother him and ask him to resolve our problems. For this we call exec = currentExecutor(); the implementation in the ghist works for ForkJoinPool but does not work for others (@Viktor Klang: checking if the thread group implements Executor, does not work for result of Executors.newFixedThreadPool()).

This solution is not ideal because it may block when calling to cf0.get() -  but it is the result of limitations of CompletableFuture design: we have no other way to provide the result. The right solution would be to replace the single asynchronous procedure call of asyncJob0 with 3: two parallel branches plus computing the final result. This is possible when some other asynchronous frameworks used.


thanks

Alexei


On Tue, 12 Mar 2019 at 18:51, Pavel Rappo <[hidden email]> wrote:
On Mon, Mar 11, 2019 at 5:48 PM Alexei Kaigorodov via
Concurrency-interest <[hidden email]> wrote:
>
> To enable this, I propose to add new method to
> the standard java API which would return the reference  to the thread pool
> to which current thread belongs.

If I were you I would make a good case for that first. Explain
exhaustively why this feature is needed, with examples. List the
alternatives and listen carefully to the feedback. Rinse, repeat. JEP
is more of a procedural step compared to those previous steps and can
be done later.

-Pavel
_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
--
Cheers,

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: proposal: Thread.currentExecutor()

JSR166 Concurrency mailing list
In reply to this post by JSR166 Concurrency mailing list


On 3/12/19 5:00 PM, Alex Otenko via Concurrency-interest wrote:
I think what Peter wants is more like:

  public static <T, U> CompletableFuture<U> then(CompletableFuture<T> cf,
                                                 Function<T, U> f) {
    final AtomicInteger ai = new AtomicInteger();
    final CompletableFuture<U> that = cf.thenCompose(i -> ai.get() == 0 ?
                                                           cf.thenApplyAsync(f):
                                                           cf.thenApply(f));
    ai.set(1);
    return that;
  }

Not so much a ThreadLocal, but a witness of who got to thenCompose_ - this method, or someone concurrent.

Alex

Yes, the semantics of above method is what I was thinking about. Now that Alex has (again) found the solution with thenCompose, I can modify it into a similar (without harmless race) solution that is hopefully also more easy to understand:

    public static <T, U> CompletableFuture<U> thenApply(CompletableFuture<T> cf, Function<? super T, ? extends U> fn) {
        Thread thisThread = Thread.currentThread();
        return cf.thenCompose(t -> Thread.currentThread() == thisThread
                                   ? cf.thenApplyAsync(fn)
                                   : cf.thenApply(fn));
    }

Which suggests that the solution is very easy to implement as static methods. It is not as pretty as CF instance methods would be, but will do for now.

Thanks,

Peter



On 12 Mar 2019, at 15:28, Doug Lea via Concurrency-interest <[hidden email]> wrote:

On 3/12/19 5:58 AM, Peter Levart via Concurrency-interest wrote:

    CompletableFuture cf = ....;
    ...

    cf.thenApply(Function, Executor)

The behaviour of this method would be: If the 'cf' is not completed yet,
it submits a task that will be completed synchronously in the same
thread that completes the 'cf'. If 'cf' is already completed, it submits
an asynchronous task to be executed in the provided Executor.


Thanks for the analysis. I agree that if we had this method (and its
variants), people would be unlikely to want a currentExecutor() method.
(Which is not quite to conclude yet that we should do it.)

And those who still do want some form of currentExecutor() could just
use a ThreadLocal.

Further opinions welcome though.

-Doug


_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest


_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest


_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: proposal: Thread.currentExecutor()

JSR166 Concurrency mailing list
In reply to this post by JSR166 Concurrency mailing list

thanks,
Alexei


On Wed, 13 Mar 2019 at 01:05, Viktor Klang <[hidden email]> wrote:
An Executors.currentExecutor() seems more feasible given that Thread is in java.lang
Since this is a static method, the enclosing class does not matter. I proposed class Thread only because the other way to get access to the current pool is to propagate method ForkJoinWorkerThread.getPool() to the class java.lang.Thread.

However, there are tons of subleties here, for instance: sometimes an Executor is simply overlaying logic ontop of another Executor, in those cases you’d want the overlaying Executor to be returned.
I don't think so. To get exposed Executor is ok.
 
I assume it would be rather simple to retrofit current jsr166 pools to do the right thing, but there are many customizations/alternative implementations out there and I see no easy solution to that.
 Yes indeed, we cannot expect to get executor in all cases, also because the current thread may not be a member of any thread pool. 

Cheers,
V

thanks,
Alexei
 

On Tue, 12 Mar 2019 at 18:48, Alexei Kaigorodov via Concurrency-interest <[hidden email]> wrote:
Pavel,

Let we have a simple method asyncJob0 and call it asynchronously:
    Executor exec = ForkJoinPool.commonPool();
CompletableFuture<String> cf0 = CompletableFuture.supplyAsync(CurrentExec::asyncJob0, exec);
System.out.println(cf0.get());

Sooner or later, we want to refactor (parallelize) the method asyncJob0, but we do not want to change the place where it is called. This is a common use case when a library is evolving. So we change the method as follows (its name changed to asyncJob1 only for convenience, in real life it would not):

static String asyncJob1() {
    Executor exec = currentExecutor();
    CompletableFuture<String> cf0 = CompletableFuture.supplyAsync(CurrentExec::asyncJob0, exec);
    String part1 = "+asyncJob1"; // suppose this is a heavy computation
    try {
        return cf0.get()+part1;
    } catch (Exception e) {
        throw new RuntimeException(e);
    }
}

So we made 2 parallel branches: one computes cf0 and the other computes part1. And we want to compute cf0 on the same thread pool as that is currently used - just because we have no other computational resources at hand, all resources are controlled by the user and we do not want to bother him and ask him to resolve our problems. For this we call exec = currentExecutor(); the implementation in the ghist works for ForkJoinPool but does not work for others (@Viktor Klang: checking if the thread group implements Executor, does not work for result of Executors.newFixedThreadPool()).

This solution is not ideal because it may block when calling to cf0.get() -  but it is the result of limitations of CompletableFuture design: we have no other way to provide the result. The right solution would be to replace the single asynchronous procedure call of asyncJob0 with 3: two parallel branches plus computing the final result. This is possible when some other asynchronous frameworks used.


thanks

Alexei


On Tue, 12 Mar 2019 at 18:51, Pavel Rappo <[hidden email]> wrote:
On Mon, Mar 11, 2019 at 5:48 PM Alexei Kaigorodov via
Concurrency-interest <[hidden email]> wrote:
>
> To enable this, I propose to add new method to
> the standard java API which would return the reference  to the thread pool
> to which current thread belongs.

If I were you I would make a good case for that first. Explain
exhaustively why this feature is needed, with examples. List the
alternatives and listen carefully to the feedback. Rinse, repeat. JEP
is more of a procedural step compared to those previous steps and can
be done later.

-Pavel
_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
--
Cheers,

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: proposal: Thread.currentExecutor()

JSR166 Concurrency mailing list
Hi Alexei,

On 3/12/19 9:09 PM, Alexei Kaigorodov via Concurrency-interest wrote:

thanks,
Alexei


On Wed, 13 Mar 2019 at 01:05, Viktor Klang <[hidden email]> wrote:
An Executors.currentExecutor() seems more feasible given that Thread is in java.lang
Since this is a static method, the enclosing class does not matter. I proposed class Thread only because the other way to get access to the current pool is to propagate method ForkJoinWorkerThread.getPool() to the class java.lang.Thread.

However, there are tons of subleties here, for instance: sometimes an Executor is simply overlaying logic ontop of another Executor, in those cases you’d want the overlaying Executor to be returned.
I don't think so. To get exposed Executor is ok.

That might not be safe. Executors get wrapped for reasons. For example, Executors.unconfigurableExecutorService(ExecutorService) shields the underlying ExecutorService from being modified. Getting to it would defeat the purpose of wrapping it.

OTOH, if you can make the "client" that chooses the Executor of async computation to wrap it with a special delegating AccessibleExecutor, you could then access it in the in the body of asyncJob() method via a ThreadLocal:

public class AccessibleExecutor implements Executor {
    private static final ThreadLocal<AccessibleExecutor> EXECUTOR_TL = new ThreadLocal<>();

    public static Executor getCurrent() {
        return EXECUTOR_TL.get();
    }

    private final Executor executor;

    public AccessibleExecutor(Executor executor) {
        this.executor = executor;
    }

    @Override
    public void execute(Runnable command) {
        executor.execute(() -> {
            EXECUTOR_TL.set(this);
            try {
                command.run();
            } finally {
                EXECUTOR_TL.remove();
            }
        });
    }
}

// and then ...

Executor exec0 = ...
Executor exec = new
AccessibleExecutor(exec0);
CompletableFuture.supplyAsync(CurrentExec::asyncJob, exec);

static String asyncJob() {
    Executor exec =
AccessibleExecutor.getCurrent();
    ...


Regards, Peter

 
I assume it would be rather simple to retrofit current jsr166 pools to do the right thing, but there are many customizations/alternative implementations out there and I see no easy solution to that.
 Yes indeed, we cannot expect to get executor in all cases, also because the current thread may not be a member of any thread pool. 

Cheers,
V

thanks,
Alexei
 

On Tue, 12 Mar 2019 at 18:48, Alexei Kaigorodov via Concurrency-interest <[hidden email]> wrote:
Pavel,

Let we have a simple method asyncJob0 and call it asynchronously:
    Executor exec = ForkJoinPool.commonPool();
    CompletableFuture<String> cf0 = CompletableFuture.supplyAsync(CurrentExec::asyncJob0, exec);
    System.out.println(cf0.get());

Sooner or later, we want to refactor (parallelize) the method asyncJob0, but we do not want to change the place where it is called. This is a common use case when a library is evolving. So we change the method as follows (its name changed to asyncJob1 only for convenience, in real life it would not):

static String asyncJob1() {
    Executor exec = currentExecutor();
    CompletableFuture<String> cf0 = CompletableFuture.supplyAsync(CurrentExec::asyncJob0, exec);
    String part1 = "+asyncJob1"; // suppose this is a heavy computation
    try {
        return cf0.get()+part1;
    } catch (Exception e) {
        throw new RuntimeException(e);
    }
}

So we made 2 parallel branches: one computes cf0 and the other computes part1. And we want to compute cf0 on the same thread pool as that is currently used - just because we have no other computational resources at hand, all resources are controlled by the user and we do not want to bother him and ask him to resolve our problems. For this we call exec = currentExecutor(); the implementation in the ghist works for ForkJoinPool but does not work for others (@Viktor Klang: checking if the thread group implements Executor, does not work for result of Executors.newFixedThreadPool()).

This solution is not ideal because it may block when calling to cf0.get() -  but it is the result of limitations of CompletableFuture design: we have no other way to provide the result. The right solution would be to replace the single asynchronous procedure call of asyncJob0 with 3: two parallel branches plus computing the final result. This is possible when some other asynchronous frameworks used.


thanks

Alexei


On Tue, 12 Mar 2019 at 18:51, Pavel Rappo <[hidden email]> wrote:
On Mon, Mar 11, 2019 at 5:48 PM Alexei Kaigorodov via
Concurrency-interest <[hidden email]> wrote:
>
> To enable this, I propose to add new method to
> the standard java API which would return the reference  to the thread pool
> to which current thread belongs.

If I were you I would make a good case for that first. Explain
exhaustively why this feature is needed, with examples. List the
alternatives and listen carefully to the feedback. Rinse, repeat. JEP
is more of a procedural step compared to those previous steps and can
be done later.

-Pavel
_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
--
Cheers,

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest


_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: proposal: Thread.currentExecutor()

JSR166 Concurrency mailing list
In reply to this post by JSR166 Concurrency mailing list
Thanks for providing that example.

I agree there is some "analogy between synchronous and asynchronous worlds".
However, it's not clear how to extend what we have in the world of Threads into
the world of Executors.

There is something in the world of Executors that doesn't seem to exist in the
world of Threads. I don't know how important that is though.

For instance, could it be the case that a thread might belong to more than a
single executor? Consider a "same thread" executor or wrapped executors. Or
consider the case where executors are hidden because of encapsulation. After all
there's a limited damage you could do having a reference to the current thread
or tying it up with your tasks unwisely. It might be a completely different
thing, if you had a reference to the executor.

Is it possible that this is not such a widespread or general problem that it
requires addressing on such a high level (java.lang.Thread)? Is it possible that
this is mostly a problem for the code that uses CompletableFutures? Could it be
that the most of that problem would be gone if the user had a simple way to say
"execute that task in the same executor that this current task is executing"[*]?
In which case there wouldn't be any need for a reference to the executor.

If so, then the scope of the problem seems a lot smaller.

[*] Modulo the root task which will either have to use some default setting or
    specify the executor explicitly.

> On 12 Mar 2019, at 17:47, Alexei Kaigorodov <[hidden email]> wrote:
>
> Pavel,
> I prepared a ghist: https://gist.github.com/akaigoro/734553139c87bee98e1ef190eb5c0536
>
> Let we have a simple method asyncJob0 and call it asynchronously:
>     Executor exec = ForkJoinPool.commonPool();
>     CompletableFuture<String> cf0 = CompletableFuture.supplyAsync(CurrentExec::asyncJob0, exec);
>     System.out.println(cf0.get());
>
> Sooner or later, we want to refactor (parallelize) the method asyncJob0, but we do not want to change the place where it is called. This is a common use case when a library is evolving. So we change the method as follows (its name changed to asyncJob1 only for convenience, in real life it would not):
>
> static String asyncJob1() {
>     Executor exec = currentExecutor();
>     CompletableFuture<String> cf0 = CompletableFuture.supplyAsync(CurrentExec::asyncJob0, exec);
>     String part1 = "+asyncJob1"; // suppose this is a heavy computation
>     try {
>         return cf0.get()+part1;
>     } catch (Exception e) {
>         throw new RuntimeException(e);
>     }
> }
>
> So we made 2 parallel branches: one computes cf0 and the other computes part1. And we want to compute cf0 on the same thread pool as that is currently used - just because we have no other computational resources at hand, all resources are controlled by the user and we do not want to bother him and ask him to resolve our problems. For this we call exec = currentExecutor(); the implementation in the ghist works for ForkJoinPool but does not work for others (@Viktor Klang: checking if the thread group implements Executor, does not work for result of Executors.newFixedThreadPool()).
>
> This solution is not ideal because it may block when calling to cf0.get() -  but it is the result of limitations of CompletableFuture design: we have no other way to provide the result. The right solution would be to replace the single asynchronous procedure call of asyncJob0 with 3: two parallel branches plus computing the final result. This is possible when some other asynchronous frameworks used.
>
>
> thanks
> Alexei
>
>
> On Tue, 12 Mar 2019 at 18:51, Pavel Rappo <[hidden email]> wrote:
> On Mon, Mar 11, 2019 at 5:48 PM Alexei Kaigorodov via
> Concurrency-interest <[hidden email]> wrote:
> >
> > To enable this, I propose to add new method to
> > the standard java API which would return the reference  to the thread pool
> > to which current thread belongs.
>
> If I were you I would make a good case for that first. Explain
> exhaustively why this feature is needed, with examples. List the
> alternatives and listen carefully to the feedback. Rinse, repeat. JEP
> is more of a procedural step compared to those previous steps and can
> be done later.
>
> -Pavel

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: proposal: Thread.currentExecutor()

JSR166 Concurrency mailing list
Pavel,
good points. 
- yes there can exist delegating executors and taking reference to current executor from current thread would point to different executor.

- is not  a problem for the code that uses CompletableFutures, because when using CompletableFutures, user code has no access to the underlying Future anyway. The only way it can set the value to the underlying future is to return a computed value, and I an thinking of the use case when user code decides to delegate computation of the value to some other set of asynchronous procedures.

-<<problem would be gone if the user had a simple way to say "execute that task in the same executor that this current task is executing"[*]?>>
 yes, a sort of. I am looking how to set default executor so that subsequent creations of asynchronous procedures would use that executor, not requiring to pass it explicitly. One way is to set it as a property of current thread, and I thought the reference to the thread's enclosing executor would be a good fit. No I see it is not - not only because of delegating executors, but also because there can exist some library which wants to create asynchronous procedures working on some predefined executor, which does not depends on the executor from where that library is called.

Anyway, this is not a problem, but desire to write less amount of boilerplate code.

thanks,
Alexei


On Thu, 14 Mar 2019 at 20:50, Pavel Rappo <[hidden email]> wrote:
Thanks for providing that example.

I agree there is some "analogy between synchronous and asynchronous worlds".
However, it's not clear how to extend what we have in the world of Threads into
the world of Executors.

There is something in the world of Executors that doesn't seem to exist in the
world of Threads. I don't know how important that is though.

For instance, could it be the case that a thread might belong to more than a
single executor? Consider a "same thread" executor or wrapped executors. Or
consider the case where executors are hidden because of encapsulation. After all
there's a limited damage you could do having a reference to the current thread
or tying it up with your tasks unwisely. It might be a completely different
thing, if you had a reference to the executor.

Is it possible that this is not such a widespread or general problem that it
requires addressing on such a high level (java.lang.Thread)? Is it possible that
this is mostly a problem for the code that uses CompletableFutures? Could it be
that the most of that problem would be gone if the user had a simple way to say
"execute that task in the same executor that this current task is executing"[*]?
In which case there wouldn't be any need for a reference to the executor.

If so, then the scope of the problem seems a lot smaller.

[*] Modulo the root task which will either have to use some default setting or
    specify the executor explicitly.

> On 12 Mar 2019, at 17:47, Alexei Kaigorodov <[hidden email]> wrote:
>
> Pavel,
> I prepared a ghist: https://gist.github.com/akaigoro/734553139c87bee98e1ef190eb5c0536
>
> Let we have a simple method asyncJob0 and call it asynchronously:
>     Executor exec = ForkJoinPool.commonPool();
>     CompletableFuture<String> cf0 = CompletableFuture.supplyAsync(CurrentExec::asyncJob0, exec);
>     System.out.println(cf0.get());
>
> Sooner or later, we want to refactor (parallelize) the method asyncJob0, but we do not want to change the place where it is called. This is a common use case when a library is evolving. So we change the method as follows (its name changed to asyncJob1 only for convenience, in real life it would not):
>
> static String asyncJob1() {
>     Executor exec = currentExecutor();
>     CompletableFuture<String> cf0 = CompletableFuture.supplyAsync(CurrentExec::asyncJob0, exec);
>     String part1 = "+asyncJob1"; // suppose this is a heavy computation
>     try {
>         return cf0.get()+part1;
>     } catch (Exception e) {
>         throw new RuntimeException(e);
>     }
> }
>
> So we made 2 parallel branches: one computes cf0 and the other computes part1. And we want to compute cf0 on the same thread pool as that is currently used - just because we have no other computational resources at hand, all resources are controlled by the user and we do not want to bother him and ask him to resolve our problems. For this we call exec = currentExecutor(); the implementation in the ghist works for ForkJoinPool but does not work for others (@Viktor Klang: checking if the thread group implements Executor, does not work for result of Executors.newFixedThreadPool()).
>
> This solution is not ideal because it may block when calling to cf0.get() -  but it is the result of limitations of CompletableFuture design: we have no other way to provide the result. The right solution would be to replace the single asynchronous procedure call of asyncJob0 with 3: two parallel branches plus computing the final result. This is possible when some other asynchronous frameworks used.
>
>
> thanks
> Alexei
>
>
> On Tue, 12 Mar 2019 at 18:51, Pavel Rappo <[hidden email]> wrote:
> On Mon, Mar 11, 2019 at 5:48 PM Alexei Kaigorodov via
> Concurrency-interest <[hidden email]> wrote:
> >
> > To enable this, I propose to add new method to
> > the standard java API which would return the reference  to the thread pool
> > to which current thread belongs.
>
> If I were you I would make a good case for that first. Explain
> exhaustively why this feature is needed, with examples. List the
> alternatives and listen carefully to the feedback. Rinse, repeat. JEP
> is more of a procedural step compared to those previous steps and can
> be done later.
>
> -Pavel


_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: proposal: Thread.currentExecutor()

JSR166 Concurrency mailing list
In reply to this post by JSR166 Concurrency mailing list
The reason I used AtomicInteger is because I wanted a subtly different guarantee: I wanted to express that the execution should be asynchronous in the current thread only if the code is reached before return.

It does not mean that it is better than watching thisThread, but I would like to mention that it reduces the requirement of visibility of the value to this thread only - the writes to the flag do not need to be visible by the other threads; in fact, not seeing those writes alone would indicate we are not in the invocation of then.

The code I actually want would look the same as Peter’s suggestion with Threads, but I observe that in that case thisThread is “effectively final”, as javac puts it. So the compiler probably inserts barriers to ensure cross-thread visibility.

Is there a way to control the existence of such barriers?


Alex


On 12 Mar 2019, at 19:55, Peter Levart <[hidden email]> wrote:



On 3/12/19 5:00 PM, Alex Otenko via Concurrency-interest wrote:
I think what Peter wants is more like:

  public static <T, U> CompletableFuture<U> then(CompletableFuture<T> cf,
                                                 Function<T, U> f) {
    final AtomicInteger ai = new AtomicInteger();
    final CompletableFuture<U> that = cf.thenCompose(i -> ai.get() == 0 ?
                                                           cf.thenApplyAsync(f):
                                                           cf.thenApply(f));
    ai.set(1);
    return that;
  }

Not so much a ThreadLocal, but a witness of who got to thenCompose_ - this method, or someone concurrent.

Alex

Yes, the semantics of above method is what I was thinking about. Now that Alex has (again) found the solution with thenCompose, I can modify it into a similar (without harmless race) solution that is hopefully also more easy to understand:

    public static <T, U> CompletableFuture<U> thenApply(CompletableFuture<T> cf, Function<? super T, ? extends U> fn) {
        Thread thisThread = Thread.currentThread();
        return cf.thenCompose(t -> Thread.currentThread() == thisThread
                                   ? cf.thenApplyAsync(fn)
                                   : cf.thenApply(fn));
    }

Which suggests that the solution is very easy to implement as static methods. It is not as pretty as CF instance methods would be, but will do for now.

Thanks,

Peter



On 12 Mar 2019, at 15:28, Doug Lea via Concurrency-interest <[hidden email]> wrote:

On 3/12/19 5:58 AM, Peter Levart via Concurrency-interest wrote:

    CompletableFuture cf = ....;
    ...

    cf.thenApply(Function, Executor)

The behaviour of this method would be: If the 'cf' is not completed yet,
it submits a task that will be completed synchronously in the same
thread that completes the 'cf'. If 'cf' is already completed, it submits
an asynchronous task to be executed in the provided Executor.


Thanks for the analysis. I agree that if we had this method (and its
variants), people would be unlikely to want a currentExecutor() method.
(Which is not quite to conclude yet that we should do it.)

And those who still do want some form of currentExecutor() could just
use a ThreadLocal.

Further opinions welcome though.

-Doug


_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest


_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest



_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: proposal: Thread.currentExecutor()

JSR166 Concurrency mailing list
Hi Alex,

On 3/18/19 2:37 PM, Alex Otenko wrote:
> The reason I used AtomicInteger is because I wanted a subtly different guarantee: I wanted to express that the execution should be asynchronous in the current thread only if the code is reached before return.
>
> It does not mean that it is better than watching thisThread, but I would like to mention that it reduces the requirement of visibility of the value to this thread only - the writes to the flag do not need to be visible by the other threads; in fact, not seeing those writes alone would indicate we are not in the invocation of then.
>
> The code I actually want would look the same as Peter’s suggestion with Threads, but I observe that in that case thisThread is “effectively final”, as javac puts it. So the compiler probably inserts barriers to ensure cross-thread visibility.
>
> Is there a way to control the existence of such barriers?

You may not require the visibility of the AtomicInteger value write from
this to the other thread, but you require visibility of the 'ai'
reference pointing to the AtomicInteger object in the same way as my
variant requires the visibility of 'thisThread' reference pointing to
the Thread object. Your variant dereferences the 'ai' reference when
reading its inner value while my variant just compares the reference
pointer to another Thread reference pointer.

And yes, the compiler treats this reference locals as effectively final
and "captures" them in the lambda object as final fields. So there are
barriers inserted when they are needed for final fields. But I think
that CompletableFuture already guarantees a happens-before relationship
between invoking the CF.thenCompose and execution of code in provided
lambda, so barriers are inserted regardless aren't they?

Regards, Peter

>
>
> Alex
>
>
>> On 12 Mar 2019, at 19:55, Peter Levart <[hidden email]> wrote:
>>
>>
>>
>> On 3/12/19 5:00 PM, Alex Otenko via Concurrency-interest wrote:
>>> I think what Peter wants is more like:
>>>
>>>    public static <T, U> CompletableFuture<U> then(CompletableFuture<T> cf,
>>>                                                   Function<T, U> f) {
>>>      final AtomicInteger ai = new AtomicInteger();
>>>      final CompletableFuture<U> that = cf.thenCompose(i -> ai.get() == 0 ?
>>>                                                             cf.thenApplyAsync(f):
>>>                                                             cf.thenApply(f));
>>>      ai.set(1);
>>>      return that;
>>>    }
>>>
>>> Not so much a ThreadLocal, but a witness of who got to thenCompose_ - this method, or someone concurrent.
>>>
>>> Alex
>> Yes, the semantics of above method is what I was thinking about. Now that Alex has (again) found the solution with thenCompose, I can modify it into a similar (without harmless race) solution that is hopefully also more easy to understand:
>>
>>      public static <T, U> CompletableFuture<U> thenApply(CompletableFuture<T> cf, Function<? super T, ? extends U> fn) {
>>          Thread thisThread = Thread.currentThread();
>>          return cf.thenCompose(t -> Thread.currentThread() == thisThread
>>                                     ? cf.thenApplyAsync(fn)
>>                                     : cf.thenApply(fn));
>>      }
>>
>> Which suggests that the solution is very easy to implement as static methods. It is not as pretty as CF instance methods would be, but will do for now.
>>
>> Thanks,
>>
>> Peter
>>
>>>
>>>> On 12 Mar 2019, at 15:28, Doug Lea via Concurrency-interest <[hidden email] <mailto:[hidden email]>> wrote:
>>>>
>>>> On 3/12/19 5:58 AM, Peter Levart via Concurrency-interest wrote:
>>>>
>>>>>      CompletableFuture cf = ....;
>>>>>      ...
>>>>>
>>>>>      cf.thenApply(Function, Executor)
>>>>>
>>>>> The behaviour of this method would be: If the 'cf' is not completed yet,
>>>>> it submits a task that will be completed synchronously in the same
>>>>> thread that completes the 'cf'. If 'cf' is already completed, it submits
>>>>> an asynchronous task to be executed in the provided Executor.
>>>>>
>>>> Thanks for the analysis. I agree that if we had this method (and its
>>>> variants), people would be unlikely to want a currentExecutor() method.
>>>> (Which is not quite to conclude yet that we should do it.)
>>>>
>>>> And those who still do want some form of currentExecutor() could just
>>>> use a ThreadLocal.
>>>>
>>>> Further opinions welcome though.
>>>>
>>>> -Doug
>>>>
>>>>
>>>> _______________________________________________
>>>> Concurrency-interest mailing list
>>>> [hidden email] <mailto:[hidden email]>
>>>> http://cs.oswego.edu/mailman/listinfo/concurrency-interest <http://cs.oswego.edu/mailman/listinfo/concurrency-interest>
>>>
>>>
>>> _______________________________________________
>>> Concurrency-interest mailing list
>>> [hidden email] <mailto:[hidden email]>
>>> http://cs.oswego.edu/mailman/listinfo/concurrency-interest <http://cs.oswego.edu/mailman/listinfo/concurrency-interest>
>

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: proposal: Thread.currentExecutor()

JSR166 Concurrency mailing list
In reply to this post by JSR166 Concurrency mailing list


On 3/18/19 2:37 PM, Alex Otenko wrote:
> The reason I used AtomicInteger is because I wanted a subtly different guarantee: I wanted to express that the execution should be asynchronous in the current thread only if the code is reached before return.
>
> It does not mean that it is better than watching thisThread, but I would like to mention that it reduces the requirement of visibility of the value to this thread only - the writes to the flag do not need to be visible by the other threads; in fact, not seeing those writes alone would indicate we are not in the invocation of then.
>
> The code I actually want would look the same as Peter’s suggestion with Threads, but I observe that in that case thisThread is “effectively final”, as javac puts it. So the compiler probably inserts barriers to ensure cross-thread visibility.
>
> Is there a way to control the existence of such barriers?

Perhaps this is the variant that requires no visibility across threads
(except for static final state):

   static final ThreadLocal<Boolean> THEN_APPLY_IN_PROGRESS = new
ThreadLocal<>();

   public static <T, U> CompletableFuture<U>
thenApply(CompletableFuture<T> cf, Function<? super T, ? extends U> fn)
   {
     THEN_APPLY_IN_PROGRESS.set(Boolean.TRUE);
     try {
       return cf.thenCompose(t -> Boolean.TRUE ==
THEN_APPLY_IN_PROGRESS.get()
                                  ? cf.thenApplyAsync(fn)
                                  : cf.thenApply(fn));
     }
     finally {
       THEN_APPLY_IN_PROGRESS.remove();
     }
   }

Peter

>
>
> Alex
>
>
>> On 12 Mar 2019, at 19:55, Peter Levart <[hidden email]> wrote:
>>
>>
>>
>> On 3/12/19 5:00 PM, Alex Otenko via Concurrency-interest wrote:
>>> I think what Peter wants is more like:
>>>
>>>    public static <T, U> CompletableFuture<U> then(CompletableFuture<T> cf,
>>>                                                   Function<T, U> f) {
>>>      final AtomicInteger ai = new AtomicInteger();
>>>      final CompletableFuture<U> that = cf.thenCompose(i -> ai.get() == 0 ?
>>>                                                             cf.thenApplyAsync(f):
>>>                                                             cf.thenApply(f));
>>>      ai.set(1);
>>>      return that;
>>>    }
>>>
>>> Not so much a ThreadLocal, but a witness of who got to thenCompose_ - this method, or someone concurrent.
>>>
>>> Alex
>> Yes, the semantics of above method is what I was thinking about. Now that Alex has (again) found the solution with thenCompose, I can modify it into a similar (without harmless race) solution that is hopefully also more easy to understand:
>>
>>      public static <T, U> CompletableFuture<U> thenApply(CompletableFuture<T> cf, Function<? super T, ? extends U> fn) {
>>          Thread thisThread = Thread.currentThread();
>>          return cf.thenCompose(t -> Thread.currentThread() == thisThread
>>                                     ? cf.thenApplyAsync(fn)
>>                                     : cf.thenApply(fn));
>>      }
>>
>> Which suggests that the solution is very easy to implement as static methods. It is not as pretty as CF instance methods would be, but will do for now.
>>
>> Thanks,
>>
>> Peter
>>
>>>
>>>> On 12 Mar 2019, at 15:28, Doug Lea via Concurrency-interest <[hidden email] <mailto:[hidden email]>> wrote:
>>>>
>>>> On 3/12/19 5:58 AM, Peter Levart via Concurrency-interest wrote:
>>>>
>>>>>      CompletableFuture cf = ....;
>>>>>      ...
>>>>>
>>>>>      cf.thenApply(Function, Executor)
>>>>>
>>>>> The behaviour of this method would be: If the 'cf' is not completed yet,
>>>>> it submits a task that will be completed synchronously in the same
>>>>> thread that completes the 'cf'. If 'cf' is already completed, it submits
>>>>> an asynchronous task to be executed in the provided Executor.
>>>>>
>>>> Thanks for the analysis. I agree that if we had this method (and its
>>>> variants), people would be unlikely to want a currentExecutor() method.
>>>> (Which is not quite to conclude yet that we should do it.)
>>>>
>>>> And those who still do want some form of currentExecutor() could just
>>>> use a ThreadLocal.
>>>>
>>>> Further opinions welcome though.
>>>>
>>>> -Doug
>>>>
>>>>
>>>> _______________________________________________
>>>> Concurrency-interest mailing list
>>>> [hidden email] <mailto:[hidden email]>
>>>> http://cs.oswego.edu/mailman/listinfo/concurrency-interest <http://cs.oswego.edu/mailman/listinfo/concurrency-interest>
>>>
>>>
>>> _______________________________________________
>>> Concurrency-interest mailing list
>>> [hidden email] <mailto:[hidden email]>
>>> http://cs.oswego.edu/mailman/listinfo/concurrency-interest <http://cs.oswego.edu/mailman/listinfo/concurrency-interest>
>

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: proposal: Thread.currentExecutor()

JSR166 Concurrency mailing list


On 3/18/19 4:25 PM, Peter Levart wrote:

>
>
> On 3/18/19 2:37 PM, Alex Otenko wrote:
>> The reason I used AtomicInteger is because I wanted a subtly
>> different guarantee: I wanted to express that the execution should be
>> asynchronous in the current thread only if the code is reached before
>> return.
>>
>> It does not mean that it is better than watching thisThread, but I
>> would like to mention that it reduces the requirement of visibility
>> of the value to this thread only - the writes to the flag do not need
>> to be visible by the other threads; in fact, not seeing those writes
>> alone would indicate we are not in the invocation of then.
>>
>> The code I actually want would look the same as Peter’s suggestion
>> with Threads, but I observe that in that case thisThread is
>> “effectively final”, as javac puts it. So the compiler probably
>> inserts barriers to ensure cross-thread visibility.
>>
>> Is there a way to control the existence of such barriers?
>
> Perhaps this is the variant that requires no visibility across threads
> (except for static final state):
>
>   static final ThreadLocal<Boolean> THEN_APPLY_IN_PROGRESS = new
> ThreadLocal<>();
>
>   public static <T, U> CompletableFuture<U>
> thenApply(CompletableFuture<T> cf, Function<? super T, ? extends U> fn)
>   {
>     THEN_APPLY_IN_PROGRESS.set(Boolean.TRUE);
>     try {
>       return cf.thenCompose(t -> Boolean.TRUE ==
> THEN_APPLY_IN_PROGRESS.get()
>                                  ? cf.thenApplyAsync(fn)
>                                  : cf.thenApply(fn));
>     }
>     finally {
>       THEN_APPLY_IN_PROGRESS.remove();
>     }
>   }
>

Not entirely true :~(. Of course, the lambda object reference has to be
visible to the other thread (as it is not a constant non-capturing
lambda which would be initialized just once) and with it all the
captured state:
- the reference to 'fn' Function
- the reference to 'cf' CompletableFuture

So capturing another reference doesn't make much difference.

Peter

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: proposal: Thread.currentExecutor()

JSR166 Concurrency mailing list
Well, yes, since the lambda is made visible to all threads, an extra reference is not a problem.

But this pattern actually has another dangerous property, which I wanted to avoid by using AtomicInteger. Although benign in most cases, it captures thisThread for the duration of the closure’s lifetime. This is a common problem with continuation passing: the scope of lifetime of references is no longer limited to lexical scope.

Alex

On 18 Mar 2019, at 15:36, Peter Levart <[hidden email]> wrote:



On 3/18/19 4:25 PM, Peter Levart wrote:


On 3/18/19 2:37 PM, Alex Otenko wrote:
The reason I used AtomicInteger is because I wanted a subtly different guarantee: I wanted to express that the execution should be asynchronous in the current thread only if the code is reached before return.

It does not mean that it is better than watching thisThread, but I would like to mention that it reduces the requirement of visibility of the value to this thread only - the writes to the flag do not need to be visible by the other threads; in fact, not seeing those writes alone would indicate we are not in the invocation of then.

The code I actually want would look the same as Peter’s suggestion with Threads, but I observe that in that case thisThread is “effectively final”, as javac puts it. So the compiler probably inserts barriers to ensure cross-thread visibility.

Is there a way to control the existence of such barriers?

Perhaps this is the variant that requires no visibility across threads (except for static final state):

  static final ThreadLocal<Boolean> THEN_APPLY_IN_PROGRESS = new ThreadLocal<>();

  public static <T, U> CompletableFuture<U> thenApply(CompletableFuture<T> cf, Function<? super T, ? extends U> fn)
  {
    THEN_APPLY_IN_PROGRESS.set(Boolean.TRUE);
    try {
      return cf.thenCompose(t -> Boolean.TRUE == THEN_APPLY_IN_PROGRESS.get()
                                 ? cf.thenApplyAsync(fn)
                                 : cf.thenApply(fn));
    }
    finally {
      THEN_APPLY_IN_PROGRESS.remove();
    }
  }


Not entirely true :~(. Of course, the lambda object reference has to be visible to the other thread (as it is not a constant non-capturing lambda which would be initialized just once) and with it all the captured state:
- the reference to 'fn' Function
- the reference to 'cf' CompletableFuture

So capturing another reference doesn't make much difference.

Peter


_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: proposal: Thread.currentExecutor()

JSR166 Concurrency mailing list


On 3/18/19 4:48 PM, Alex Otenko wrote:
> Well, yes, since the lambda is made visible to all threads, an extra reference is not a problem.
>
> But this pattern actually has another dangerous property, which I wanted to avoid by using AtomicInteger. Although benign in most cases, it captures thisThread for the duration of the closure’s lifetime. This is a common problem with continuation passing: the scope of lifetime of references is no longer limited to lexical scope.

Right, we can not avoid capturing the 'cf' reference, because we need it
to .thenApply[Async] on it. User already expects that the 'fn' Function
will be captured and stay alive until it is used. But we can at least
avoid capturing 'thisThread' and instead capture just its id:

   public static <T, U> CompletableFuture<U>
thenApply(CompletableFuture<T> cf, Function<? super T, ? extends U> fn)
   {
     long thisThreadId = Thread.currentThread().getId();
     return cf.thenCompose(t -> Thread.currentThread().getId() ==
thisThreadId
                                ? cf.thenApplyAsync(fn)
                                : cf.thenApply(fn));
   }

The javadoc says that thread ID may be reused after thread terminates:

      * Returns the identifier of this Thread.  The thread ID is a positive
      * {@code long} number generated when this thread was created.
      * The thread ID is unique and remains unchanged during its lifetime.
      * When a thread is terminated, this thread ID may be reused.

...but the implementation of the generator for thread IDs does not care
about Thread's lifetime:

     /* For generating thread ID */
     private static long threadSeqNumber;

     private static synchronized long nextThreadID() {
         return ++threadSeqNumber;
     }

...making it an equal probability for ID to be reused regardless of
whether the thread terminated or not. Practically this would happen in
584942 years if the JVM created one Thread each micro second.

Peter

>
> Alex
>
>> On 18 Mar 2019, at 15:36, Peter Levart <[hidden email]> wrote:
>>
>>
>>
>> On 3/18/19 4:25 PM, Peter Levart wrote:
>>>
>>> On 3/18/19 2:37 PM, Alex Otenko wrote:
>>>> The reason I used AtomicInteger is because I wanted a subtly different guarantee: I wanted to express that the execution should be asynchronous in the current thread only if the code is reached before return.
>>>>
>>>> It does not mean that it is better than watching thisThread, but I would like to mention that it reduces the requirement of visibility of the value to this thread only - the writes to the flag do not need to be visible by the other threads; in fact, not seeing those writes alone would indicate we are not in the invocation of then.
>>>>
>>>> The code I actually want would look the same as Peter’s suggestion with Threads, but I observe that in that case thisThread is “effectively final”, as javac puts it. So the compiler probably inserts barriers to ensure cross-thread visibility.
>>>>
>>>> Is there a way to control the existence of such barriers?
>>> Perhaps this is the variant that requires no visibility across threads (except for static final state):
>>>
>>>    static final ThreadLocal<Boolean> THEN_APPLY_IN_PROGRESS = new ThreadLocal<>();
>>>
>>>    public static <T, U> CompletableFuture<U> thenApply(CompletableFuture<T> cf, Function<? super T, ? extends U> fn)
>>>    {
>>>      THEN_APPLY_IN_PROGRESS.set(Boolean.TRUE);
>>>      try {
>>>        return cf.thenCompose(t -> Boolean.TRUE == THEN_APPLY_IN_PROGRESS.get()
>>>                                   ? cf.thenApplyAsync(fn)
>>>                                   : cf.thenApply(fn));
>>>      }
>>>      finally {
>>>        THEN_APPLY_IN_PROGRESS.remove();
>>>      }
>>>    }
>>>
>> Not entirely true :~(. Of course, the lambda object reference has to be visible to the other thread (as it is not a constant non-capturing lambda which would be initialized just once) and with it all the captured state:
>> - the reference to 'fn' Function
>> - the reference to 'cf' CompletableFuture
>>
>> So capturing another reference doesn't make much difference.
>>
>> Peter
>

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
12