CompletableFuture.exceptionallyAsync or handleCompose

classic Classic list List threaded Threaded
25 messages Options
12
Reply | Threaded
Open this post in threaded view
|

Re: CompletableFuture.exceptionallyAsync or handleCompose

JSR166 Concurrency mailing list
Regarding testing of default methods on CompletionStage...

If you produce tests for new methods in CompletableFuture, then the same tests can be used to test CompletionStage default methods simply by passing a subclass of CompletableFuture, overriding the new methods and delegating to CompletionStage methods. This can't be perfomed in Java, but can be performed using MethodHandle(s). For example:

/**
 * A CompletableFuture subclass that delegates execution of methods that
 * have defaults in {@link CompletionStage} to the default implementations.
 */
public class CompletableFutureUsingDefaults<T> extends CompletableFuture<T> {
    private static final MethodHandle exceptionallyAsync;
    private static final MethodHandle exceptionallyAsyncEx;
    private static final MethodHandle exceptionallyCompose;
    private static final MethodHandle exceptionallyComposeAsync;
    private static final MethodHandle exceptionallyComposeAsyncEx;

    static {
        try {
            MethodHandles.Lookup lookup = MethodHandles
                .privateLookupIn(CompletionStage.class, MethodHandles.lookup());
            exceptionallyAsync = lookup.findSpecial(
                CompletionStage.class, "exceptionallyAsync",
                MethodType.methodType(CompletionStage.class, Function.class),
                CompletableFuture.class);
            exceptionallyAsyncEx = lookup.findSpecial(
                CompletionStage.class, "exceptionallyAsync",
                MethodType.methodType(CompletionStage.class, Function.class, Executor.class),
                CompletableFuture.class);
            exceptionallyCompose = lookup.findSpecial(
                CompletionStage.class, "exceptionallyCompose",
                MethodType.methodType(CompletionStage.class, Function.class),
                CompletableFuture.class);
            exceptionallyComposeAsync = lookup.findSpecial(
                CompletionStage.class, "exceptionallyComposeAsync",
                MethodType.methodType(CompletionStage.class, Function.class),
                CompletableFuture.class);
            exceptionallyComposeAsyncEx = lookup.findSpecial(
                CompletionStage.class, "exceptionallyComposeAsync",
                MethodType.methodType(CompletionStage.class, Function.class, Executor.class),
                CompletableFuture.class);
        } catch (IllegalAccessException e) {
            throw (Error) new IllegalAccessError(e.getMessage()).initCause(e);
        } catch (NoSuchMethodException e) {
            throw (Error) new NoSuchMethodError(e.getMessage()).initCause(e);
        }

    }

    private static RuntimeException unchecked(Throwable exception) {
        if (exception instanceof RuntimeException) {
            return (RuntimeException) exception;
        } else if (exception instanceof Error) {
            throw (Error) exception;
        } else {
            return new RuntimeException(exception);
        }
    }

    @Override
    public <U> CompletableFuture<U> newIncompleteFuture() {
        return new CompletableFutureUsingDefaults<>();
    }

    @SuppressWarnings("unchecked")
    public CompletableFuture<T> exceptionallyAsync(Function<Throwable, ? extends T> fn) {
        try {
            return (CompletableFuture<T>) (CompletionStage<T>)
                exceptionallyAsync.invokeExact((CompletableFuture<T>) this, fn);
        } catch (Throwable ex) {
            throw unchecked(ex);
        }
    }

    @SuppressWarnings("unchecked")
    public CompletableFuture<T> exceptionallyAsync(Function<Throwable, ? extends T> fn, Executor executor) {
        try {
            return (CompletableFuture<T>) (CompletionStage<T>)
                exceptionallyAsyncEx.invokeExact((CompletableFuture<T>) this, fn, executor);
        } catch (Throwable ex) {
            throw unchecked(ex);
        }
    }

    @SuppressWarnings("unchecked")
    public CompletableFuture<T> exceptionallyCompose(Function<Throwable, ? extends CompletionStage<T>> fn) {
        try {
            return (CompletableFuture<T>) (CompletionStage<T>)
                exceptionallyCompose.invokeExact((CompletableFuture<T>) this, fn);
        } catch (Throwable ex) {
            throw unchecked(ex);
        }
    }

    @SuppressWarnings("unchecked")
    public CompletableFuture<T> exceptionallyComposeAsync(Function<Throwable, ? extends CompletionStage<T>> fn) {
        try {
            return (CompletableFuture<T>) (CompletionStage<T>)
                exceptionallyComposeAsync.invokeExact((CompletableFuture<T>) this, fn);
        } catch (Throwable ex) {
            throw unchecked(ex);
        }
    }

    @SuppressWarnings("unchecked")
    public CompletableFuture<T> exceptionallyComposeAsync(Function<Throwable, ? extends CompletionStage<T>> fn, Executor executor) {
        try {
            return (CompletableFuture<T>) (CompletionStage<T>)
                exceptionallyComposeAsyncEx.invokeExact((CompletableFuture<T>) this, fn, executor);
        } catch (Throwable ex) {
            throw unchecked(ex);
        }
    }
}


If a test with such class is on class-path, then it must be invoked with an additional java option:

    --add-opens java.base/java.util.concurrent=ALL-UNNAMED


The effects of testing such class should be the same as the effects of testing a CompletableFuture that didn't override the default methods.

Regards, Peter


On 09/19/2018 04:40 PM, Doug Lea wrote:
On 09/19/2018 04:49 AM, Peter Levart wrote:
...or help the type inference in the following way:

    default CompletionStage<T> exceptionallyAsync(Function<Throwable, ?
extends T> fn) {
        return handle(
            (r, ex) -> (ex == null)
                       ? this
                       : this.<T>handleAsync((r1, ex1) -> fn.apply(ex1))
        ).thenCompose(Function.identity());
    }

This is a handy trick. I had mistaken inability to infer type as a need
for using a concrete type. Further simplifying using thenApplyAsync:

        return handle((r, ex) -> (ex == null) ? this
                      : this.<T>thenApplyAsync(x -> fn.apply(ex)))
            .thenCompose(Function.identity());

Something similar works with exceptionallyCompose:

     public default CompletionStage<T> exceptionallyCompose
        (Function<Throwable, ? extends CompletionStage<T>> fn) {
        return handle((r, ex) -> (ex == null) ? this : fn.apply(ex))
            .thenCompose(Function.identity());
    }

I haven't found a nice way yet to do so with exceptionallyComposeAsync.
Assuming we do, these would all make for less surprising default
implementations, but add enough overhead vs direct implementations that
CompletionStage authors would be motivated to override.

We'll also need to add separate default-implementation tests to TCK,
which will take some work.

-Doug



_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: CompletableFuture.exceptionallyAsync or handleCompose

JSR166 Concurrency mailing list
Thanks, Peter - we may end up parameterizing tests with a
CompletableFutureFactory

We already use --add-opens java.base/java.util.concurrent=ALL-UNNAMED

On Wed, Sep 19, 2018 at 3:19 PM, Peter Levart via Concurrency-interest
<[hidden email]> wrote:

> Regarding testing of default methods on CompletionStage...
>
> If you produce tests for new methods in CompletableFuture, then the same
> tests can be used to test CompletionStage default methods simply by passing
> a subclass of CompletableFuture, overriding the new methods and delegating
> to CompletionStage methods. This can't be perfomed in Java, but can be
> performed using MethodHandle(s). For example:
>
> /**
>  * A CompletableFuture subclass that delegates execution of methods that
>  * have defaults in {@link CompletionStage} to the default implementations.
>  */
> public class CompletableFutureUsingDefaults<T> extends CompletableFuture<T>
> {
>     private static final MethodHandle exceptionallyAsync;
>     private static final MethodHandle exceptionallyAsyncEx;
>     private static final MethodHandle exceptionallyCompose;
>     private static final MethodHandle exceptionallyComposeAsync;
>     private static final MethodHandle exceptionallyComposeAsyncEx;
>
>     static {
>         try {
>             MethodHandles.Lookup lookup = MethodHandles
>                 .privateLookupIn(CompletionStage.class,
> MethodHandles.lookup());
>             exceptionallyAsync = lookup.findSpecial(
>                 CompletionStage.class, "exceptionallyAsync",
>                 MethodType.methodType(CompletionStage.class,
> Function.class),
>                 CompletableFuture.class);
>             exceptionallyAsyncEx = lookup.findSpecial(
>                 CompletionStage.class, "exceptionallyAsync",
>                 MethodType.methodType(CompletionStage.class, Function.class,
> Executor.class),
>                 CompletableFuture.class);
>             exceptionallyCompose = lookup.findSpecial(
>                 CompletionStage.class, "exceptionallyCompose",
>                 MethodType.methodType(CompletionStage.class,
> Function.class),
>                 CompletableFuture.class);
>             exceptionallyComposeAsync = lookup.findSpecial(
>                 CompletionStage.class, "exceptionallyComposeAsync",
>                 MethodType.methodType(CompletionStage.class,
> Function.class),
>                 CompletableFuture.class);
>             exceptionallyComposeAsyncEx = lookup.findSpecial(
>                 CompletionStage.class, "exceptionallyComposeAsync",
>                 MethodType.methodType(CompletionStage.class, Function.class,
> Executor.class),
>                 CompletableFuture.class);
>         } catch (IllegalAccessException e) {
>             throw (Error) new
> IllegalAccessError(e.getMessage()).initCause(e);
>         } catch (NoSuchMethodException e) {
>             throw (Error) new
> NoSuchMethodError(e.getMessage()).initCause(e);
>         }
>
>     }
>
>     private static RuntimeException unchecked(Throwable exception) {
>         if (exception instanceof RuntimeException) {
>             return (RuntimeException) exception;
>         } else if (exception instanceof Error) {
>             throw (Error) exception;
>         } else {
>             return new RuntimeException(exception);
>         }
>     }
>
>     @Override
>     public <U> CompletableFuture<U> newIncompleteFuture() {
>         return new CompletableFutureUsingDefaults<>();
>     }
>
>     @SuppressWarnings("unchecked")
>     public CompletableFuture<T> exceptionallyAsync(Function<Throwable, ?
> extends T> fn) {
>         try {
>             return (CompletableFuture<T>) (CompletionStage<T>)
>                 exceptionallyAsync.invokeExact((CompletableFuture<T>) this,
> fn);
>         } catch (Throwable ex) {
>             throw unchecked(ex);
>         }
>     }
>
>     @SuppressWarnings("unchecked")
>     public CompletableFuture<T> exceptionallyAsync(Function<Throwable, ?
> extends T> fn, Executor executor) {
>         try {
>             return (CompletableFuture<T>) (CompletionStage<T>)
>                 exceptionallyAsyncEx.invokeExact((CompletableFuture<T>)
> this, fn, executor);
>         } catch (Throwable ex) {
>             throw unchecked(ex);
>         }
>     }
>
>     @SuppressWarnings("unchecked")
>     public CompletableFuture<T> exceptionallyCompose(Function<Throwable, ?
> extends CompletionStage<T>> fn) {
>         try {
>             return (CompletableFuture<T>) (CompletionStage<T>)
>                 exceptionallyCompose.invokeExact((CompletableFuture<T>)
> this, fn);
>         } catch (Throwable ex) {
>             throw unchecked(ex);
>         }
>     }
>
>     @SuppressWarnings("unchecked")
>     public CompletableFuture<T>
> exceptionallyComposeAsync(Function<Throwable, ? extends CompletionStage<T>>
> fn) {
>         try {
>             return (CompletableFuture<T>) (CompletionStage<T>)
>                 exceptionallyComposeAsync.invokeExact((CompletableFuture<T>)
> this, fn);
>         } catch (Throwable ex) {
>             throw unchecked(ex);
>         }
>     }
>
>     @SuppressWarnings("unchecked")
>     public CompletableFuture<T>
> exceptionallyComposeAsync(Function<Throwable, ? extends CompletionStage<T>>
> fn, Executor executor) {
>         try {
>             return (CompletableFuture<T>) (CompletionStage<T>)
>
> exceptionallyComposeAsyncEx.invokeExact((CompletableFuture<T>) this, fn,
> executor);
>         } catch (Throwable ex) {
>             throw unchecked(ex);
>         }
>     }
> }
>
>
> If a test with such class is on class-path, then it must be invoked with an
> additional java option:
>
>     --add-opens java.base/java.util.concurrent=ALL-UNNAMED
>
>
> The effects of testing such class should be the same as the effects of
> testing a CompletableFuture that didn't override the default methods.
>
> Regards, Peter
>
>
> On 09/19/2018 04:40 PM, Doug Lea wrote:
>
> On 09/19/2018 04:49 AM, Peter Levart wrote:
>
> ...or help the type inference in the following way:
>
>     default CompletionStage<T> exceptionallyAsync(Function<Throwable, ?
> extends T> fn) {
>         return handle(
>             (r, ex) -> (ex == null)
>                        ? this
>                        : this.<T>handleAsync((r1, ex1) -> fn.apply(ex1))
>         ).thenCompose(Function.identity());
>     }
>
> This is a handy trick. I had mistaken inability to infer type as a need
> for using a concrete type. Further simplifying using thenApplyAsync:
>
>         return handle((r, ex) -> (ex == null) ? this
>                       : this.<T>thenApplyAsync(x -> fn.apply(ex)))
>             .thenCompose(Function.identity());
>
> Something similar works with exceptionallyCompose:
>
>      public default CompletionStage<T> exceptionallyCompose
>         (Function<Throwable, ? extends CompletionStage<T>> fn) {
>         return handle((r, ex) -> (ex == null) ? this : fn.apply(ex))
>             .thenCompose(Function.identity());
>     }
>
> I haven't found a nice way yet to do so with exceptionallyComposeAsync.
> Assuming we do, these would all make for less surprising default
> implementations, but add enough overhead vs direct implementations that
> CompletionStage authors would be motivated to override.
>
> We'll also need to add separate default-implementation tests to TCK,
> which will take some work.
>
> -Doug
>
>
>
> _______________________________________________
> Concurrency-interest mailing list
> [hidden email]
> http://cs.oswego.edu/mailman/listinfo/concurrency-interest
>
_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: CompletableFuture.exceptionallyAsync or handleCompose

JSR166 Concurrency mailing list
In reply to this post by JSR166 Concurrency mailing list
I am not sure what “the specs don’t require it” means.

It would be very surprising, if not utterly wrong, if anyone implemented then* for CompletionStage that ended exceptionally.

Alex

On 19 Sep 2018, at 16:38, Doug Lea <[hidden email]> wrote:

On 09/19/2018 11:14 AM, Alex Otenko wrote:

On 19 Sep 2018, at 15:40, Doug Lea via Concurrency-interest <[hidden email]> wrote:

On 09/19/2018 04:49 AM, Peter Levart wrote:
...or help the type inference in the following way:

   default CompletionStage<T> exceptionallyAsync(Function<Throwable, ?
extends T> fn) {
       return handle(
           (r, ex) -> (ex == null)
                      ? this
                      : this.<T>handleAsync((r1, ex1) -> fn.apply(ex1))
       ).thenCompose(Function.identity());
   }


This is a handy trick. I had mistaken inability to infer type as a need
for using a concrete type. Further simplifying using thenApplyAsync:

      return handle((r, ex) -> (ex == null) ? this
                    : this.<T>thenApplyAsync(x -> fn.apply(ex)))
          .thenCompose(Function.identity());


Will thenApplyAsync apply in exceptional case? I don’t think it will?

Good point. The specs don't require it, so Peter's version is a better
choice.

-Doug


_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: CompletableFuture.exceptionallyAsync or handleCompose

JSR166 Concurrency mailing list
In reply to this post by JSR166 Concurrency mailing list
For example,

   default CompletionStage<T> exceptionallyComposeAsync(Function<Throwable, ? extends CompletionStage<T>> fn) {
       return thenApply(_r -> this).<CompletionStage<T>>exceptionallyAsync(fn).thenCompose(r -> r);
   }

is an alternative.

I am not competing for clarity (although to me that’s much clearer), but trying to demonstrate the expected behaviour to be compatible with this.

Because thenApply(_r -> this).thenCompose(r -> r) is expected to be Function.identity().

Alex

> On 19 Sep 2018, at 17:33, Alex Otenko <[hidden email]> wrote:
>
> It’s a common strategy to reduce CompletionStage<CompletionStage<T>> to CompletionStage<T>
>
> Alex
>
>> On 19 Sep 2018, at 16:40, Doug Lea via Concurrency-interest <[hidden email]> wrote:
>>
>> On 09/19/2018 11:18 AM, Peter Levart wrote:
>>
>>>> I haven't found a nice way yet to do so with exceptionallyComposeAsync.
>>>
>>> What about continuing with the nesting fashion:
>>>
>>>    default CompletionStage<T>
>>> exceptionallyComposeAsync(Function<Throwable, ? extends
>>> CompletionStage<T>> fn) {
>>>        return handle(
>>>            (r, ex) -> (ex == null)
>>>                       ? this
>>>                       : this.handleAsync((r1, ex1) -> fn.apply(ex1))
>>>                             .thenCompose(Function.identity())
>>>        ).thenCompose(Function.identity());
>>>    }
>>>
>>
>> Thanks. Yet another layer of wrapping, but at least it compiles!
>>
>> -Doug
>>
>>
>> _______________________________________________
>> Concurrency-interest mailing list
>> [hidden email]
>> http://cs.oswego.edu/mailman/listinfo/concurrency-interest
>

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: CompletableFuture.exceptionallyAsync or handleCompose

JSR166 Concurrency mailing list
In reply to this post by JSR166 Concurrency mailing list

Thanks all for the prods and help. This is now an openjdk CR and CSR,
heading for jdk12:
  https://bugs.openjdk.java.net/browse/JDK-8211010
  https://bugs.openjdk.java.net/browse/JDK-8210971

Also, thanks Peter for the MethodHandle approach (that we'll probably
someday somehow use) but the tck tests just use a
DelegatedCompletionStage that leaves out relays for new defaut methods.

And thanks Alex for posts about formulating defaults. We went with the
version Peter posted based on your initial suggestions. We could have
made some a little more concise, but these choices seem to create fewer
(or at least no more) intermediary objects of overhead.

-Doug

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
12