ConcurrentHashMapV8 Livelock on computeIfAbsent() ?

classic Classic list List threaded Threaded
27 messages Options
12
Reply | Threaded
Open this post in threaded view
|

Re: ConcurrentHashMapV8 Livelock on computeIfAbsent() ?

Millies, Sebastian

Hi Viktor,

 

I’m sorry, I’ve made a mistake: there hasn’t been a deadlock at all. When running the JMH benchmark, I don’t get as much stack space as when running stand-alone, so the recursion depth of 6000 was causing a StackoverflowError. However, I didn’t see that error, it was hidden by JMH, I just saw the benchmark hanging and the JMH worker threads all being parked, and jumped to the wrong conclusion.

 

Anyway, computing the 3500th Fibonacci number, I consistently do not see any advantage of the async version over the synchronous one. In fact, it is the other way around:

 

2 Threads

Benchmark                                 Mode  Cnt  Score     Error  Units

FibCachedConcurrentBenchmark.cf3500         ss   20   4.132 ±  1.421  ms/op

FibCachedConcurrentBenchmark.cfAsync3500    ss   20   9.134 ±  0.862  ms/op

FibCachedConcurrentBenchmark.cf3500         ss   20   2.887 ±  0.571  ms/op

FibCachedConcurrentBenchmark.cfAsync3500    ss   20  10.345 ± 12.954  ms/op

FibCachedConcurrentBenchmark.cf3500         ss   20   3.500 ±  1.291  ms/op

FibCachedConcurrentBenchmark.cfAsync3500    ss   20   8.803 ±  1.679  ms/op

 

4 Threads

Benchmark                                 Mode  Cnt  Score   Error  Units

FibCachedConcurrentBenchmark.cf3500         ss   20  2.780 ± 0.430  ms/op

FibCachedConcurrentBenchmark.cfAsync3500    ss   20  8.850 ± 1.595  ms/op

FibCachedConcurrentBenchmark.cf3500         ss   20  3.034 ± 0.451  ms/op

FibCachedConcurrentBenchmark.cfAsync3500    ss   20  9.744 ± 1.669  ms/op

FibCachedConcurrentBenchmark.cf3500         ss   20  3.965 ± 1.380  ms/op

FibCachedConcurrentBenchmark.cfAsync3500    ss   20  8.430 ± 2.396  ms/op

 

Perhaps adding to BigIntegers just isn’t expensive enough to warrant the overhead of going async.

 

n  Sebastian

 

PS: Your code below I think doesn’t address the problem, namely as you suggested “to use the same Fib from multiple threads to show how it behaves under contention”. Your test(int) method below produces a new CF instance for each thread, so there is no contention. Or does it?

 

 

From: Viktor Klang [[hidden email]]
Sent: Friday, April 29, 2016 5:59 PM
To: Millies, Sebastian
Subject: Re: [concurrency-interest] ConcurrentHashMapV8 Livelock on computeIfAbsent() ?

 

Hi Sebastian,

 

could it be a thread-pool issue?

 

This works just fine for me:

 

 

package yava.klang;

 

import java.math.BigInteger;

import java.util.Map;

import java.util.concurrent.CompletableFuture;

import java.util.concurrent.CompletionStage;

import java.util.concurrent.ConcurrentHashMap;

import java.util.concurrent.ForkJoinPool;

import java.util.concurrent.Executor;

import java.util.function.Function;

 

/**

 * Demonstrates ways of caching recursive functions.

 * 

 * @author Andrew Haley, Viktor Klang, Sebastian Millies

 * @see triggered by <a href=

 *      "<a href="http://concurrency.markmail.org/search/?q=#query:%20list%3Aedu.oswego.cs.concurrency-interest&#43;page:3&#43;mid:tf7xddfa6i6ow6d3&#43;state:results">http://concurrency.markmail.org/search/?q=#query:%20list%3Aedu.oswego.cs.concurrency-interest+page:3+mid:tf7xddfa6i6ow6d3+state:results">

 *      this discussion</a> on concurrency-interest

 *

 */

public class FibCached {

 

    private static class Memoizer<T, R> {

        private final Map<T, R> memo;

 

        public Memoizer(Map<T, R> memo) {

            this.memo = memo;

        }

 

        public Function<T, R> memoize(Function<T, R> f) {

            return t -> {

                            R r = memo.get(t);

                            if (r == null) {

                                r = f.apply(t);

                                memo.put(t, r);

                            }

                            return r;

                        };

        }

    }

 

    public static class FibonacciSimple {

        private final Memoizer<Integer, BigInteger> m;

 

        public FibonacciSimple(Map<Integer, BigInteger> cache) {

            m = new Memoizer<>(cache);

        }

 

        public BigInteger fib(int n) {

            if (n <= 2) return BigInteger.ONE;

            return m.memoize(this::fib).apply(n - 1).add(

                   m.memoize(this::fib).apply(n - 2));

        }

    }

    

    public static class CF {

        private final static CompletionStage<BigInteger> csOne = CompletableFuture.completedFuture(BigInteger.ONE);

        private final Map<Integer, CompletionStage<BigInteger>> cache;

 

        public CF(Map<Integer, CompletionStage<BigInteger>> cache) {

            this.cache = cache;

        }

 

        public CompletionStage<BigInteger> fib(int n) {

            if (n <= 2) return csOne;

 

            CompletionStage<BigInteger> ret = cache.get(n);

            if (ret == null) {

                final CompletableFuture<BigInteger> compute = new CompletableFuture<>();

                ret = cache.putIfAbsent(n, compute);

                if (ret == null) {

                    ret = fib(n - 1).thenCompose(x -> 

                          fib(n - 2).thenCompose(y -> {

                                compute.complete(x.add(y));

                                return compute;

                    }));

                }

            }

            return ret;

        }

        

        // async version. It's very much possible and recommended to not make the first thenCompose an async one, 

        // as only the addition of x and y might be "expensive" (for large values).

        public CompletionStage<BigInteger> fib(int n, Executor e) {

            if (n <= 2) return csOne;

 

            CompletionStage<BigInteger> ret = cache.get(n);

            if (ret == null) {

                final CompletableFuture<BigInteger> compute = new CompletableFuture<>();

                ret = cache.putIfAbsent(n, compute);

                if (ret == null) {

                    ret = fib(n - 1, e).thenComposeAsync(x -> 

                          fib(n - 2, e).thenComposeAsync(y -> {

                                compute.complete(x.add(y));

                                return compute;

                    }, e));

                }

            }

            return ret;

        }

    }

 

    public static CompletionStage<BigInteger> test(final int n) {

        final CF fib = new CF(new ConcurrentHashMap<>());

        return fib.fib(n, ForkJoinPool.commonPool());

    }

    

}

 

On Thu, Apr 28, 2016 at 10:38 AM, Millies, Sebastian <[hidden email]> wrote:

Dropbox may not have been a good idea L Here’s a thread-dump from deadlock in the cf6000Async benchmark:

 

2016-04-28 10:29:36

Full thread dump Java HotSpot(TM) 64-Bit Server VM (25.92-b14 mixed mode):

 

"java8.concurrent.FibCachedConcurrentBenchmark.cfAsync6000-jmh-worker-2" #14 daemon prio=5 os_prio=0 tid=0x000000001fc95800 nid=0x2418 waiting on condition [0x000000001f62e000]

   java.lang.Thread.State: WAITING (parking)

                at sun.misc.Unsafe.park(Native Method)

                - parking to wait for  <0x000000076bc429b8> (a java.util.concurrent.CompletableFuture$Signaller)

                at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)

                at java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693)

                at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)

                at java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729)

                at java.util.concurrent.CompletableFuture.join(CompletableFuture.java:1934)

                at java8.concurrent.FibCachedConcurrentBenchmark.cfAsync6000(FibCachedConcurrentBenchmark.java:76)

                at java8.concurrent.generated.FibCachedConcurrentBenchmark_cfAsync6000_jmhTest.cfAsync6000_ss_jmhStub(FibCachedConcurrentBenchmark_cfAsync6000_jmhTest.java:490)

                at java8.concurrent.generated.FibCachedConcurrentBenchmark_cfAsync6000_jmhTest.cfAsync6000_SingleShotTime(FibCachedConcurrentBenchmark_cfAsync6000_jmhTest.java:433)

                at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

                at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

                at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

                at java.lang.reflect.Method.invoke(Method.java:498)

                at org.openjdk.jmh.runner.BenchmarkHandler$BenchmarkTask.call(BenchmarkHandler.java:430)

                at org.openjdk.jmh.runner.BenchmarkHandler$BenchmarkTask.call(BenchmarkHandler.java:412)

                at java.util.concurrent.FutureTask.run(FutureTask.java:266)

                at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

                at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

                at java.lang.Thread.run(Thread.java:745)

 

   Locked ownable synchronizers:

                - <0x000000076b9cda18> (a java.util.concurrent.ThreadPoolExecutor$Worker)

 

"java8.concurrent.FibCachedConcurrentBenchmark.cfAsync6000-jmh-worker-1" #13 daemon prio=5 os_prio=0 tid=0x000000001fc94800 nid=0x2414 waiting on condition [0x000000001f3df000]

   java.lang.Thread.State: WAITING (parking)

                at sun.misc.Unsafe.park(Native Method)

                - parking to wait for  <0x000000076b9517a0> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)

                at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)

                at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)

                at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)

                at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)

                at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)

                at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

                at java.lang.Thread.run(Thread.java:745)

 

   Locked ownable synchronizers:

                - None

 

"Service Thread" #10 daemon prio=9 os_prio=0 tid=0x000000001d97d000 nid=0x2404 runnable [0x0000000000000000]

   java.lang.Thread.State: RUNNABLE

 

   Locked ownable synchronizers:

                - None

 

"C1 CompilerThread3" #9 daemon prio=9 os_prio=2 tid=0x000000001c71f000 nid=0x1cb0 waiting on condition [0x0000000000000000]

   java.lang.Thread.State: RUNNABLE

 

   Locked ownable synchronizers:

                - None

 

"C2 CompilerThread2" #8 daemon prio=9 os_prio=2 tid=0x000000001c71e000 nid=0x19cc waiting on condition [0x0000000000000000]

   java.lang.Thread.State: RUNNABLE

 

   Locked ownable synchronizers:

                - None

 

"C2 CompilerThread1" #7 daemon prio=9 os_prio=2 tid=0x000000001c71b000 nid=0x1d78 waiting on condition [0x0000000000000000]

   java.lang.Thread.State: RUNNABLE

 

   Locked ownable synchronizers:

                - None

 

"C2 CompilerThread0" #6 daemon prio=9 os_prio=2 tid=0x000000001d8db000 nid=0xfec waiting on condition [0x0000000000000000]

   java.lang.Thread.State: RUNNABLE

 

   Locked ownable synchronizers:

                - None

 

"Attach Listener" #5 daemon prio=5 os_prio=2 tid=0x000000001d8d9800 nid=0x200c waiting on condition [0x0000000000000000]

   java.lang.Thread.State: RUNNABLE

 

   Locked ownable synchronizers:

                - None

 

"Signal Dispatcher" #4 daemon prio=9 os_prio=2 tid=0x000000001d8d8000 nid=0x620 runnable [0x0000000000000000]

   java.lang.Thread.State: RUNNABLE

 

   Locked ownable synchronizers:

                - None

 

"Finalizer" #3 daemon prio=8 os_prio=1 tid=0x000000001c711000 nid=0x10a4 in Object.wait() [0x000000001ec7f000]

   java.lang.Thread.State: WAITING (on object monitor)

                at java.lang.Object.wait(Native Method)

                - waiting on <0x000000076b208ee0> (a java.lang.ref.ReferenceQueue$Lock)

                at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:143)

                - locked <0x000000076b208ee0> (a java.lang.ref.ReferenceQueue$Lock)

                at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:164)

                at java.lang.ref.Finalizer$FinalizerThread.run(Finalizer.java:209)

 

   Locked ownable synchronizers:

                - None

 

"Reference Handler" #2 daemon prio=10 os_prio=2 tid=0x000000001c70a000 nid=0x1b48 in Object.wait() [0x000000001e91f000]

   java.lang.Thread.State: WAITING (on object monitor)

                at java.lang.Object.wait(Native Method)

                - waiting on <0x000000076b206b50> (a java.lang.ref.Reference$Lock)

                at java.lang.Object.wait(Object.java:502)

                at java.lang.ref.Reference.tryHandlePending(Reference.java:191)

                - locked <0x000000076b206b50> (a java.lang.ref.Reference$Lock)

                at java.lang.ref.Reference$ReferenceHandler.run(Reference.java:153)

 

   Locked ownable synchronizers:

                - None

 

"main" #1 prio=5 os_prio=0 tid=0x000000000020f800 nid=0x1e14 waiting on condition [0x0000000002b1e000]

   java.lang.Thread.State: TIMED_WAITING (parking)

                at sun.misc.Unsafe.park(Native Method)

                - parking to wait for  <0x000000076b9cd9f8> (a java.util.concurrent.FutureTask)

                at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)

                at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:426)

                at java.util.concurrent.FutureTask.get(FutureTask.java:204)

                at org.openjdk.jmh.runner.BenchmarkHandler.runIteration(BenchmarkHandler.java:376)

                at org.openjdk.jmh.runner.BaseRunner.runBenchmark(BaseRunner.java:263)

                at org.openjdk.jmh.runner.BaseRunner.runBenchmark(BaseRunner.java:235)

                at org.openjdk.jmh.runner.BaseRunner.doSingle(BaseRunner.java:142)

                at org.openjdk.jmh.runner.BaseRunner.runBenchmarksForked(BaseRunner.java:76)

                at org.openjdk.jmh.runner.ForkedRunner.run(ForkedRunner.java:72)

                at org.openjdk.jmh.runner.ForkedMain.main(ForkedMain.java:84)

 

   Locked ownable synchronizers:

                - None

 

"VM Thread" os_prio=2 tid=0x000000001c701000 nid=0x2038 runnable

 

"GC task thread#0 (ParallelGC)" os_prio=0 tid=0x000000000266e800 nid=0x1ba0 runnable

 

"GC task thread#1 (ParallelGC)" os_prio=0 tid=0x0000000002670000 nid=0x4a0 runnable

 

"GC task thread#2 (ParallelGC)" os_prio=0 tid=0x0000000002671800 nid=0xfb4 runnable

 

"GC task thread#3 (ParallelGC)" os_prio=0 tid=0x0000000002673000 nid=0x1b8 runnable

 

"GC task thread#4 (ParallelGC)" os_prio=0 tid=0x0000000002676800 nid=0x1168 runnable

 

"GC task thread#5 (ParallelGC)" os_prio=0 tid=0x0000000002677800 nid=0xa84 runnable

 

"GC task thread#6 (ParallelGC)" os_prio=0 tid=0x000000000267b000 nid=0x17dc runnable

 

"GC task thread#7 (ParallelGC)" os_prio=0 tid=0x000000000267c000 nid=0x2090 runnable

 

"VM Periodic Task Thread" os_prio=2 tid=0x000000001d981800 nid=0x2408 waiting on condition

 

JNI global references: 334

 

 

From: Viktor Klang [mailto:[hidden email]]
Sent: Wednesday, April 27, 2016 3:21 PM
To: Millies, Sebastian
Cc: concurrency-interest
Subject: Re: [concurrency-interest] ConcurrentHashMapV8 Livelock on computeIfAbsent() ?

 

Do you have a thread dump? (sorry, I don't have any spare cycles to have a stab at running it right now)

 

On Wed, Apr 27, 2016 at 3:09 PM, Millies, Sebastian <[hidden email]> wrote:

I have added https://gist.github.com/smillies/0cceb17501f74c4f53bf4930eba61889#file-fibcachedconcurrentbenchmark-java

to compute concurrent benchmarks of the 6000th Fibonacci number. Only the CF versions of course.

 

The same Fib is used by two threads in each case, the pool for the async version gets another two.

 

The bad news is they’re prone to  deadlock. Now I am no expert in JMH, perhaps it’s just the way I’ve set up the tests .(I hope so.)

 

n  Sebastian

 

 

Software AG – Sitz/Registered office: Uhlandstraße 12, 64297 Darmstadt, Germany – Registergericht/Commercial register: Darmstadt HRB 1562 - Vorstand/Management Board: Karl-Heinz Streibich (Vorsitzender/Chairman), Eric Duffaut, Dr. Wolfram Jost, Arnd Zinnhardt; - Aufsichtsratsvorsitzender/Chairman of the Supervisory Board: Dr. Andreas Bereczky - http://www.softwareag.com



 

--

Cheers,


_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: ConcurrentHashMapV8 Livelock on computeIfAbsent() ?

Viktor Klang

Hi Sebastian,

I suspect the use of thenComposeAsync is required to trampoline on top of the Executors submission queue rather than exhausting the stack.

I completely missed that you were testing the scalability, I'm sorry. Let me have a look later today or tomorrow.

Thanks for keeping the topic alive,

V

--
Cheers,

On Apr 30, 2016 11:19 PM, "Millies, Sebastian" <[hidden email]> wrote:

Hi Viktor,

 

I’m sorry, I’ve made a mistake: there hasn’t been a deadlock at all. When running the JMH benchmark, I don’t get as much stack space as when running stand-alone, so the recursion depth of 6000 was causing a StackoverflowError. However, I didn’t see that error, it was hidden by JMH, I just saw the benchmark hanging and the JMH worker threads all being parked, and jumped to the wrong conclusion.

 

Anyway, computing the 3500th Fibonacci number, I consistently do not see any advantage of the async version over the synchronous one. In fact, it is the other way around:

 

2 Threads

Benchmark                                 Mode  Cnt  Score     Error  Units

FibCachedConcurrentBenchmark.cf3500         ss   20   4.132 ±  1.421  ms/op

FibCachedConcurrentBenchmark.cfAsync3500    ss   20   9.134 ±  0.862  ms/op

FibCachedConcurrentBenchmark.cf3500         ss   20   2.887 ±  0.571  ms/op

FibCachedConcurrentBenchmark.cfAsync3500    ss   20  10.345 ± 12.954  ms/op

FibCachedConcurrentBenchmark.cf3500         ss   20   3.500 ±  1.291  ms/op

FibCachedConcurrentBenchmark.cfAsync3500    ss   20   8.803 ±  1.679  ms/op

 

4 Threads

Benchmark                                 Mode  Cnt  Score   Error  Units

FibCachedConcurrentBenchmark.cf3500         ss   20  2.780 ± 0.430  ms/op

FibCachedConcurrentBenchmark.cfAsync3500    ss   20  8.850 ± 1.595  ms/op

FibCachedConcurrentBenchmark.cf3500         ss   20  3.034 ± 0.451  ms/op

FibCachedConcurrentBenchmark.cfAsync3500    ss   20  9.744 ± 1.669  ms/op

FibCachedConcurrentBenchmark.cf3500         ss   20  3.965 ± 1.380  ms/op

FibCachedConcurrentBenchmark.cfAsync3500    ss   20  8.430 ± 2.396  ms/op

 

Perhaps adding to BigIntegers just isn’t expensive enough to warrant the overhead of going async.

 

n  Sebastian

 

PS: Your code below I think doesn’t address the problem, namely as you suggested “to use the same Fib from multiple threads to show how it behaves under contention”. Your test(int) method below produces a new CF instance for each thread, so there is no contention. Or does it?

 

 

From: Viktor Klang [[hidden email]]
Sent: Friday, April 29, 2016 5:59 PM
To: Millies, Sebastian
Subject: Re: [concurrency-interest] ConcurrentHashMapV8 Livelock on computeIfAbsent() ?

 

Hi Sebastian,

 

could it be a thread-pool issue?

 

This works just fine for me:

 

 

package yava.klang;

 

import java.math.BigInteger;

import java.util.Map;

import java.util.concurrent.CompletableFuture;

import java.util.concurrent.CompletionStage;

import java.util.concurrent.ConcurrentHashMap;

import java.util.concurrent.ForkJoinPool;

import java.util.concurrent.Executor;

import java.util.function.Function;

 

/**

 * Demonstrates ways of caching recursive functions.

 * 

 * @author Andrew Haley, Viktor Klang, Sebastian Millies

 * @see triggered by <a href=

 *      this discussion</a> on concurrency-interest

 *

 */

public class FibCached {

 

    private static class Memoizer<T, R> {

        private final Map<T, R> memo;

 

        public Memoizer(Map<T, R> memo) {

            this.memo = memo;

        }

 

        public Function<T, R> memoize(Function<T, R> f) {

            return t -> {

                            R r = memo.get(t);

                            if (r == null) {

                                r = f.apply(t);

                                memo.put(t, r);

                            }

                            return r;

                        };

        }

    }

 

    public static class FibonacciSimple {

        private final Memoizer<Integer, BigInteger> m;

 

        public FibonacciSimple(Map<Integer, BigInteger> cache) {

            m = new Memoizer<>(cache);

        }

 

        public BigInteger fib(int n) {

            if (n <= 2) return BigInteger.ONE;

            return m.memoize(this::fib).apply(n - 1).add(

                   m.memoize(this::fib).apply(n - 2));

        }

    }

    

    public static class CF {

        private final static CompletionStage<BigInteger> csOne = CompletableFuture.completedFuture(BigInteger.ONE);

        private final Map<Integer, CompletionStage<BigInteger>> cache;

 

        public CF(Map<Integer, CompletionStage<BigInteger>> cache) {

            this.cache = cache;

        }

 

        public CompletionStage<BigInteger> fib(int n) {

            if (n <= 2) return csOne;

 

            CompletionStage<BigInteger> ret = cache.get(n);

            if (ret == null) {

                final CompletableFuture<BigInteger> compute = new CompletableFuture<>();

                ret = cache.putIfAbsent(n, compute);

                if (ret == null) {

                    ret = fib(n - 1).thenCompose(x -> 

                          fib(n - 2).thenCompose(y -> {

                                compute.complete(x.add(y));

                                return compute;

                    }));

                }

            }

            return ret;

        }

        

        // async version. It's very much possible and recommended to not make the first thenCompose an async one, 

        // as only the addition of x and y might be "expensive" (for large values).

        public CompletionStage<BigInteger> fib(int n, Executor e) {

            if (n <= 2) return csOne;

 

            CompletionStage<BigInteger> ret = cache.get(n);

            if (ret == null) {

                final CompletableFuture<BigInteger> compute = new CompletableFuture<>();

                ret = cache.putIfAbsent(n, compute);

                if (ret == null) {

                    ret = fib(n - 1, e).thenComposeAsync(x -> 

                          fib(n - 2, e).thenComposeAsync(y -> {

                                compute.complete(x.add(y));

                                return compute;

                    }, e));

                }

            }

            return ret;

        }

    }

 

    public static CompletionStage<BigInteger> test(final int n) {

        final CF fib = new CF(new ConcurrentHashMap<>());

        return fib.fib(n, ForkJoinPool.commonPool());

    }

    

}

 

On Thu, Apr 28, 2016 at 10:38 AM, Millies, Sebastian <[hidden email]> wrote:

Dropbox may not have been a good idea L Here’s a thread-dump from deadlock in the cf6000Async benchmark:

 

2016-04-28 10:29:36

Full thread dump Java HotSpot(TM) 64-Bit Server VM (25.92-b14 mixed mode):

 

"java8.concurrent.FibCachedConcurrentBenchmark.cfAsync6000-jmh-worker-2" #14 daemon prio=5 os_prio=0 tid=0x000000001fc95800 nid=0x2418 waiting on condition [0x000000001f62e000]

   java.lang.Thread.State: WAITING (parking)

                at sun.misc.Unsafe.park(Native Method)

                - parking to wait for  <0x000000076bc429b8> (a java.util.concurrent.CompletableFuture$Signaller)

                at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)

                at java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693)

                at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)

                at java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729)

                at java.util.concurrent.CompletableFuture.join(CompletableFuture.java:1934)

                at java8.concurrent.FibCachedConcurrentBenchmark.cfAsync6000(FibCachedConcurrentBenchmark.java:76)

                at java8.concurrent.generated.FibCachedConcurrentBenchmark_cfAsync6000_jmhTest.cfAsync6000_ss_jmhStub(FibCachedConcurrentBenchmark_cfAsync6000_jmhTest.java:490)

                at java8.concurrent.generated.FibCachedConcurrentBenchmark_cfAsync6000_jmhTest.cfAsync6000_SingleShotTime(FibCachedConcurrentBenchmark_cfAsync6000_jmhTest.java:433)

                at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

                at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

                at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

                at java.lang.reflect.Method.invoke(Method.java:498)

                at org.openjdk.jmh.runner.BenchmarkHandler$BenchmarkTask.call(BenchmarkHandler.java:430)

                at org.openjdk.jmh.runner.BenchmarkHandler$BenchmarkTask.call(BenchmarkHandler.java:412)

                at java.util.concurrent.FutureTask.run(FutureTask.java:266)

                at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

                at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

                at java.lang.Thread.run(Thread.java:745)

 

   Locked ownable synchronizers:

                - <0x000000076b9cda18> (a java.util.concurrent.ThreadPoolExecutor$Worker)

 

"java8.concurrent.FibCachedConcurrentBenchmark.cfAsync6000-jmh-worker-1" #13 daemon prio=5 os_prio=0 tid=0x000000001fc94800 nid=0x2414 waiting on condition [0x000000001f3df000]

   java.lang.Thread.State: WAITING (parking)

                at sun.misc.Unsafe.park(Native Method)

                - parking to wait for  <0x000000076b9517a0> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)

                at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)

                at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)

                at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)

                at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)

                at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)

                at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

                at java.lang.Thread.run(Thread.java:745)

 

   Locked ownable synchronizers:

                - None

 

"Service Thread" #10 daemon prio=9 os_prio=0 tid=0x000000001d97d000 nid=0x2404 runnable [0x0000000000000000]

   java.lang.Thread.State: RUNNABLE

 

   Locked ownable synchronizers:

                - None

 

"C1 CompilerThread3" #9 daemon prio=9 os_prio=2 tid=0x000000001c71f000 nid=0x1cb0 waiting on condition [0x0000000000000000]

   java.lang.Thread.State: RUNNABLE

 

   Locked ownable synchronizers:

                - None

 

"C2 CompilerThread2" #8 daemon prio=9 os_prio=2 tid=0x000000001c71e000 nid=0x19cc waiting on condition [0x0000000000000000]

   java.lang.Thread.State: RUNNABLE

 

   Locked ownable synchronizers:

                - None

 

"C2 CompilerThread1" #7 daemon prio=9 os_prio=2 tid=0x000000001c71b000 nid=0x1d78 waiting on condition [0x0000000000000000]

   java.lang.Thread.State: RUNNABLE

 

   Locked ownable synchronizers:

                - None

 

"C2 CompilerThread0" #6 daemon prio=9 os_prio=2 tid=0x000000001d8db000 nid=0xfec waiting on condition [0x0000000000000000]

   java.lang.Thread.State: RUNNABLE

 

   Locked ownable synchronizers:

                - None

 

"Attach Listener" #5 daemon prio=5 os_prio=2 tid=0x000000001d8d9800 nid=0x200c waiting on condition [0x0000000000000000]

   java.lang.Thread.State: RUNNABLE

 

   Locked ownable synchronizers:

                - None

 

"Signal Dispatcher" #4 daemon prio=9 os_prio=2 tid=0x000000001d8d8000 nid=0x620 runnable [0x0000000000000000]

   java.lang.Thread.State: RUNNABLE

 

   Locked ownable synchronizers:

                - None

 

"Finalizer" #3 daemon prio=8 os_prio=1 tid=0x000000001c711000 nid=0x10a4 in Object.wait() [0x000000001ec7f000]

   java.lang.Thread.State: WAITING (on object monitor)

                at java.lang.Object.wait(Native Method)

                - waiting on <0x000000076b208ee0> (a java.lang.ref.ReferenceQueue$Lock)

                at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:143)

                - locked <0x000000076b208ee0> (a java.lang.ref.ReferenceQueue$Lock)

                at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:164)

                at java.lang.ref.Finalizer$FinalizerThread.run(Finalizer.java:209)

 

   Locked ownable synchronizers:

                - None

 

"Reference Handler" #2 daemon prio=10 os_prio=2 tid=0x000000001c70a000 nid=0x1b48 in Object.wait() [0x000000001e91f000]

   java.lang.Thread.State: WAITING (on object monitor)

                at java.lang.Object.wait(Native Method)

                - waiting on <0x000000076b206b50> (a java.lang.ref.Reference$Lock)

                at java.lang.Object.wait(Object.java:502)

                at java.lang.ref.Reference.tryHandlePending(Reference.java:191)

                - locked <0x000000076b206b50> (a java.lang.ref.Reference$Lock)

                at java.lang.ref.Reference$ReferenceHandler.run(Reference.java:153)

 

   Locked ownable synchronizers:

                - None

 

"main" #1 prio=5 os_prio=0 tid=0x000000000020f800 nid=0x1e14 waiting on condition [0x0000000002b1e000]

   java.lang.Thread.State: TIMED_WAITING (parking)

                at sun.misc.Unsafe.park(Native Method)

                - parking to wait for  <0x000000076b9cd9f8> (a java.util.concurrent.FutureTask)

                at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)

                at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:426)

                at java.util.concurrent.FutureTask.get(FutureTask.java:204)

                at org.openjdk.jmh.runner.BenchmarkHandler.runIteration(BenchmarkHandler.java:376)

                at org.openjdk.jmh.runner.BaseRunner.runBenchmark(BaseRunner.java:263)

                at org.openjdk.jmh.runner.BaseRunner.runBenchmark(BaseRunner.java:235)

                at org.openjdk.jmh.runner.BaseRunner.doSingle(BaseRunner.java:142)

                at org.openjdk.jmh.runner.BaseRunner.runBenchmarksForked(BaseRunner.java:76)

                at org.openjdk.jmh.runner.ForkedRunner.run(ForkedRunner.java:72)

                at org.openjdk.jmh.runner.ForkedMain.main(ForkedMain.java:84)

 

   Locked ownable synchronizers:

                - None

 

"VM Thread" os_prio=2 tid=0x000000001c701000 nid=0x2038 runnable

 

"GC task thread#0 (ParallelGC)" os_prio=0 tid=0x000000000266e800 nid=0x1ba0 runnable

 

"GC task thread#1 (ParallelGC)" os_prio=0 tid=0x0000000002670000 nid=0x4a0 runnable

 

"GC task thread#2 (ParallelGC)" os_prio=0 tid=0x0000000002671800 nid=0xfb4 runnable

 

"GC task thread#3 (ParallelGC)" os_prio=0 tid=0x0000000002673000 nid=0x1b8 runnable

 

"GC task thread#4 (ParallelGC)" os_prio=0 tid=0x0000000002676800 nid=0x1168 runnable

 

"GC task thread#5 (ParallelGC)" os_prio=0 tid=0x0000000002677800 nid=0xa84 runnable

 

"GC task thread#6 (ParallelGC)" os_prio=0 tid=0x000000000267b000 nid=0x17dc runnable

 

"GC task thread#7 (ParallelGC)" os_prio=0 tid=0x000000000267c000 nid=0x2090 runnable

 

"VM Periodic Task Thread" os_prio=2 tid=0x000000001d981800 nid=0x2408 waiting on condition

 

JNI global references: 334

 

 

From: Viktor Klang [mailto:[hidden email]]
Sent: Wednesday, April 27, 2016 3:21 PM
To: Millies, Sebastian
Cc: concurrency-interest
Subject: Re: [concurrency-interest] ConcurrentHashMapV8 Livelock on computeIfAbsent() ?

 

Do you have a thread dump? (sorry, I don't have any spare cycles to have a stab at running it right now)

 

On Wed, Apr 27, 2016 at 3:09 PM, Millies, Sebastian <[hidden email]> wrote:

I have added https://gist.github.com/smillies/0cceb17501f74c4f53bf4930eba61889#file-fibcachedconcurrentbenchmark-java

to compute concurrent benchmarks of the 6000th Fibonacci number. Only the CF versions of course.

 

The same Fib is used by two threads in each case, the pool for the async version gets another two.

 

The bad news is they’re prone to  deadlock. Now I am no expert in JMH, perhaps it’s just the way I’ve set up the tests .(I hope so.)

 

n  Sebastian

 

 

Software AG – Sitz/Registered office: Uhlandstraße 12, 64297 Darmstadt, Germany – Registergericht/Commercial register: Darmstadt HRB 1562 - Vorstand/Management Board: Karl-Heinz Streibich (Vorsitzender/Chairman), Eric Duffaut, Dr. Wolfram Jost, Arnd Zinnhardt; - Aufsichtsratsvorsitzender/Chairman of the Supervisory Board: Dr. Andreas Bereczky - http://www.softwareag.com



 

--

Cheers,


_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: ConcurrentHashMapV8 Livelock on computeIfAbsent() ?

Dr Heinz M. Kabutz
Hi Sebastian and Viktor,

if you're going to look at scalability and also performance, you really should change the algorithm being used to calculate Fibonacci.  I would suggest Dijkstra's sum of squares.  And then choose a much larger number, for example Fib(1_000_000_000).  The problem with the current algorithm is that you are doing too much work on the stack and that is going to slow you down.  In addition, BigInteger.add() is going to be linear, as will probably be the work on the stack, so you'll end up with quadratic performance at best.

See http://www.javaspecialists.eu/archive/Issue236.html
Regards

Heinz
-- 
Dr Heinz M. Kabutz (PhD CompSci)
Author of "The Java(tm) Specialists' Newsletter"
Sun/Oracle Java Champion since 2005
JavaOne Rock Star Speaker 2012
http://www.javaspecialists.eu
Tel: +30 69 75 595 262
Skype: kabutz


Viktor Klang wrote:

Hi Sebastian,

I suspect the use of thenComposeAsync is required to trampoline on top of the Executors submission queue rather than exhausting the stack.

I completely missed that you were testing the scalability, I'm sorry. Let me have a look later today or tomorrow.

Thanks for keeping the topic alive,

V

--
Cheers,

On Apr 30, 2016 11:19 PM, "Millies, Sebastian" <[hidden email]> wrote:

Hi Viktor,

 

I’m sorry, I’ve made a mistake: there hasn’t been a deadlock at all. When running the JMH benchmark, I don’t get as much stack space as when running stand-alone, so the recursion depth of 6000 was causing a StackoverflowError. However, I didn’t see that error, it was hidden by JMH, I just saw the benchmark hanging and the JMH worker threads all being parked, and jumped to the wrong conclusion.

 

Anyway, computing the 3500th Fibonacci number, I consistently do not see any advantage of the async version over the synchronous one. In fact, it is the other way around:

 

2 Threads

Benchmark                                 Mode  Cnt  Score     Error  Units

FibCachedConcurrentBenchmark.cf3500         ss   20   4.132 ±  1.421  ms/op

FibCachedConcurrentBenchmark.cfAsync3500    ss   20   9.134 ±  0.862  ms/op

FibCachedConcurrentBenchmark.cf3500         ss   20   2.887 ±  0.571  ms/op

FibCachedConcurrentBenchmark.cfAsync3500    ss   20  10.345 ± 12.954  ms/op

FibCachedConcurrentBenchmark.cf3500         ss   20   3.500 ±  1.291  ms/op

FibCachedConcurrentBenchmark.cfAsync3500    ss   20   8.803 ±  1.679  ms/op

 

4 Threads

Benchmark                                 Mode  Cnt  Score   Error  Units

FibCachedConcurrentBenchmark.cf3500         ss   20  2.780 ± 0.430  ms/op

FibCachedConcurrentBenchmark.cfAsync3500    ss   20  8.850 ± 1.595  ms/op

FibCachedConcurrentBenchmark.cf3500         ss   20  3.034 ± 0.451  ms/op

FibCachedConcurrentBenchmark.cfAsync3500    ss   20  9.744 ± 1.669  ms/op

FibCachedConcurrentBenchmark.cf3500         ss   20  3.965 ± 1.380  ms/op

FibCachedConcurrentBenchmark.cfAsync3500    ss   20  8.430 ± 2.396  ms/op

 

Perhaps adding to BigIntegers just isn’t expensive enough to warrant the overhead of going async.

 

n  Sebastian

 

PS: Your code below I think doesn’t address the problem, namely as you suggested “to use the same Fib from multiple threads to show how it behaves under contention”. Your test(int) method below produces a new CF instance for each thread, so there is no contention. Or does it?

 

 

From: Viktor Klang [[hidden email]]
Sent: Friday, April 29, 2016 5:59 PM
To: Millies, Sebastian
Subject: Re: [concurrency-interest] ConcurrentHashMapV8 Livelock on computeIfAbsent() ?

 

Hi Sebastian,

 

could it be a thread-pool issue?

 

This works just fine for me:

 

 

package yava.klang;

 

import java.math.BigInteger;

import java.util.Map;

import java.util.concurrent.CompletableFuture;

import java.util.concurrent.CompletionStage;

import java.util.concurrent.ConcurrentHashMap;

import java.util.concurrent.ForkJoinPool;

import java.util.concurrent.Executor;

import java.util.function.Function;

 

/**

 * Demonstrates ways of caching recursive functions.

 * 

 * @author Andrew Haley, Viktor Klang, Sebastian Millies

 * @see triggered by <a href=

 *      this discussion</a> on concurrency-interest

 *

 */

public class FibCached {

 

    private static class Memoizer<T, R> {

        private final Map<T, R> memo;

 

        public Memoizer(Map<T, R> memo) {

            this.memo = memo;

        }

 

        public Function<T, R> memoize(Function<T, R> f) {

            return t -> {

                            R r = memo.get(t);

                            if (r == null) {

                                r = f.apply(t);

                                memo.put(t, r);

                            }

                            return r;

                        };

        }

    }

 

    public static class FibonacciSimple {

        private final Memoizer<Integer, BigInteger> m;

 

        public FibonacciSimple(Map<Integer, BigInteger> cache) {

            m = new Memoizer<>(cache);

        }

 

        public BigInteger fib(int n) {

            if (n <= 2) return BigInteger.ONE;

            return m.memoize(this::fib).apply(n - 1).add(

                   m.memoize(this::fib).apply(n - 2));

        }

    }

    

    public static class CF {

        private final static CompletionStage<BigInteger> csOne = CompletableFuture.completedFuture(BigInteger.ONE);

        private final Map<Integer, CompletionStage<BigInteger>> cache;

 

        public CF(Map<Integer, CompletionStage<BigInteger>> cache) {

            this.cache = cache;

        }

 

        public CompletionStage<BigInteger> fib(int n) {

            if (n <= 2) return csOne;

 

            CompletionStage<BigInteger> ret = cache.get(n);

            if (ret == null) {

                final CompletableFuture<BigInteger> compute = new CompletableFuture<>();

                ret = cache.putIfAbsent(n, compute);

                if (ret == null) {

                    ret = fib(n - 1).thenCompose(x -> 

                          fib(n - 2).thenCompose(y -> {

                                compute.complete(x.add(y));

                                return compute;

                    }));

                }

            }

            return ret;

        }

        

        // async version. It's very much possible and recommended to not make the first thenCompose an async one, 

        // as only the addition of x and y might be "expensive" (for large values).

        public CompletionStage<BigInteger> fib(int n, Executor e) {

            if (n <= 2) return csOne;

 

            CompletionStage<BigInteger> ret = cache.get(n);

            if (ret == null) {

                final CompletableFuture<BigInteger> compute = new CompletableFuture<>();

                ret = cache.putIfAbsent(n, compute);

                if (ret == null) {

                    ret = fib(n - 1, e).thenComposeAsync(x -> 

                          fib(n - 2, e).thenComposeAsync(y -> {

                                compute.complete(x.add(y));

                                return compute;

                    }, e));

                }

            }

            return ret;

        }

    }

 

    public static CompletionStage<BigInteger> test(final int n) {

        final CF fib = new CF(new ConcurrentHashMap<>());

        return fib.fib(n, ForkJoinPool.commonPool());

    }

    

}

 

On Thu, Apr 28, 2016 at 10:38 AM, Millies, Sebastian <[hidden email]> wrote:

Dropbox may not have been a good idea L Here’s a thread-dump from deadlock in the cf6000Async benchmark:

 

2016-04-28 10:29:36

Full thread dump Java HotSpot(TM) 64-Bit Server VM (25.92-b14 mixed mode):

 

"java8.concurrent.FibCachedConcurrentBenchmark.cfAsync6000-jmh-worker-2" #14 daemon prio=5 os_prio=0 tid=0x000000001fc95800 nid=0x2418 waiting on condition [0x000000001f62e000]

   java.lang.Thread.State: WAITING (parking)

                at sun.misc.Unsafe.park(Native Method)

                - parking to wait for  <0x000000076bc429b8> (a java.util.concurrent.CompletableFuture$Signaller)

                at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)

                at java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693)

                at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)

                at java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729)

                at java.util.concurrent.CompletableFuture.join(CompletableFuture.java:1934)

                at java8.concurrent.FibCachedConcurrentBenchmark.cfAsync6000(FibCachedConcurrentBenchmark.java:76)

                at java8.concurrent.generated.FibCachedConcurrentBenchmark_cfAsync6000_jmhTest.cfAsync6000_ss_jmhStub(FibCachedConcurrentBenchmark_cfAsync6000_jmhTest.java:490)

                at java8.concurrent.generated.FibCachedConcurrentBenchmark_cfAsync6000_jmhTest.cfAsync6000_SingleShotTime(FibCachedConcurrentBenchmark_cfAsync6000_jmhTest.java:433)

                at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

                at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

                at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

                at java.lang.reflect.Method.invoke(Method.java:498)

                at org.openjdk.jmh.runner.BenchmarkHandler$BenchmarkTask.call(BenchmarkHandler.java:430)

                at org.openjdk.jmh.runner.BenchmarkHandler$BenchmarkTask.call(BenchmarkHandler.java:412)

                at java.util.concurrent.FutureTask.run(FutureTask.java:266)

                at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

                at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

                at java.lang.Thread.run(Thread.java:745)

 

   Locked ownable synchronizers:

                - <0x000000076b9cda18> (a java.util.concurrent.ThreadPoolExecutor$Worker)

 

"java8.concurrent.FibCachedConcurrentBenchmark.cfAsync6000-jmh-worker-1" #13 daemon prio=5 os_prio=0 tid=0x000000001fc94800 nid=0x2414 waiting on condition [0x000000001f3df000]

   java.lang.Thread.State: WAITING (parking)

                at sun.misc.Unsafe.park(Native Method)

                - parking to wait for  <0x000000076b9517a0> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)

                at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)

                at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)

                at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)

                at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)

                at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)

                at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

                at java.lang.Thread.run(Thread.java:745)

 

   Locked ownable synchronizers:

                - None

 

"Service Thread" #10 daemon prio=9 os_prio=0 tid=0x000000001d97d000 nid=0x2404 runnable [0x0000000000000000]

   java.lang.Thread.State: RUNNABLE

 

   Locked ownable synchronizers:

                - None

 

"C1 CompilerThread3" #9 daemon prio=9 os_prio=2 tid=0x000000001c71f000 nid=0x1cb0 waiting on condition [0x0000000000000000]

   java.lang.Thread.State: RUNNABLE

 

   Locked ownable synchronizers:

                - None

 

"C2 CompilerThread2" #8 daemon prio=9 os_prio=2 tid=0x000000001c71e000 nid=0x19cc waiting on condition [0x0000000000000000]

   java.lang.Thread.State: RUNNABLE

 

   Locked ownable synchronizers:

                - None

 

"C2 CompilerThread1" #7 daemon prio=9 os_prio=2 tid=0x000000001c71b000 nid=0x1d78 waiting on condition [0x0000000000000000]

   java.lang.Thread.State: RUNNABLE

 

   Locked ownable synchronizers:

                - None

 

"C2 CompilerThread0" #6 daemon prio=9 os_prio=2 tid=0x000000001d8db000 nid=0xfec waiting on condition [0x0000000000000000]

   java.lang.Thread.State: RUNNABLE

 

   Locked ownable synchronizers:

                - None

 

"Attach Listener" #5 daemon prio=5 os_prio=2 tid=0x000000001d8d9800 nid=0x200c waiting on condition [0x0000000000000000]

   java.lang.Thread.State: RUNNABLE

 

   Locked ownable synchronizers:

                - None

 

"Signal Dispatcher" #4 daemon prio=9 os_prio=2 tid=0x000000001d8d8000 nid=0x620 runnable [0x0000000000000000]

   java.lang.Thread.State: RUNNABLE

 

   Locked ownable synchronizers:

                - None

 

"Finalizer" #3 daemon prio=8 os_prio=1 tid=0x000000001c711000 nid=0x10a4 in Object.wait() [0x000000001ec7f000]

   java.lang.Thread.State: WAITING (on object monitor)

                at java.lang.Object.wait(Native Method)

                - waiting on <0x000000076b208ee0> (a java.lang.ref.ReferenceQueue$Lock)

                at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:143)

                - locked <0x000000076b208ee0> (a java.lang.ref.ReferenceQueue$Lock)

                at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:164)

                at java.lang.ref.Finalizer$FinalizerThread.run(Finalizer.java:209)

 

   Locked ownable synchronizers:

                - None

 

"Reference Handler" #2 daemon prio=10 os_prio=2 tid=0x000000001c70a000 nid=0x1b48 in Object.wait() [0x000000001e91f000]

   java.lang.Thread.State: WAITING (on object monitor)

                at java.lang.Object.wait(Native Method)

                - waiting on <0x000000076b206b50> (a java.lang.ref.Reference$Lock)

                at java.lang.Object.wait(Object.java:502)

                at java.lang.ref.Reference.tryHandlePending(Reference.java:191)

                - locked <0x000000076b206b50> (a java.lang.ref.Reference$Lock)

                at java.lang.ref.Reference$ReferenceHandler.run(Reference.java:153)

 

   Locked ownable synchronizers:

                - None

 

"main" #1 prio=5 os_prio=0 tid=0x000000000020f800 nid=0x1e14 waiting on condition [0x0000000002b1e000]

   java.lang.Thread.State: TIMED_WAITING (parking)

                at sun.misc.Unsafe.park(Native Method)

                - parking to wait for  <0x000000076b9cd9f8> (a java.util.concurrent.FutureTask)

                at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)

                at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:426)

                at java.util.concurrent.FutureTask.get(FutureTask.java:204)

                at org.openjdk.jmh.runner.BenchmarkHandler.runIteration(BenchmarkHandler.java:376)

                at org.openjdk.jmh.runner.BaseRunner.runBenchmark(BaseRunner.java:263)

                at org.openjdk.jmh.runner.BaseRunner.runBenchmark(BaseRunner.java:235)

                at org.openjdk.jmh.runner.BaseRunner.doSingle(BaseRunner.java:142)

                at org.openjdk.jmh.runner.BaseRunner.runBenchmarksForked(BaseRunner.java:76)

                at org.openjdk.jmh.runner.ForkedRunner.run(ForkedRunner.java:72)

                at org.openjdk.jmh.runner.ForkedMain.main(ForkedMain.java:84)

 

   Locked ownable synchronizers:

                - None

 

"VM Thread" os_prio=2 tid=0x000000001c701000 nid=0x2038 runnable

 

"GC task thread#0 (ParallelGC)" os_prio=0 tid=0x000000000266e800 nid=0x1ba0 runnable

 

"GC task thread#1 (ParallelGC)" os_prio=0 tid=0x0000000002670000 nid=0x4a0 runnable

 

"GC task thread#2 (ParallelGC)" os_prio=0 tid=0x0000000002671800 nid=0xfb4 runnable

 

"GC task thread#3 (ParallelGC)" os_prio=0 tid=0x0000000002673000 nid=0x1b8 runnable

 

"GC task thread#4 (ParallelGC)" os_prio=0 tid=0x0000000002676800 nid=0x1168 runnable

 

"GC task thread#5 (ParallelGC)" os_prio=0 tid=0x0000000002677800 nid=0xa84 runnable

 

"GC task thread#6 (ParallelGC)" os_prio=0 tid=0x000000000267b000 nid=0x17dc runnable

 

"GC task thread#7 (ParallelGC)" os_prio=0 tid=0x000000000267c000 nid=0x2090 runnable

 

"VM Periodic Task Thread" os_prio=2 tid=0x000000001d981800 nid=0x2408 waiting on condition

 

JNI global references: 334

 

 

From: Viktor Klang [mailto:[hidden email]]
Sent: Wednesday, April 27, 2016 3:21 PM
To: Millies, Sebastian
Cc: concurrency-interest
Subject: Re: [concurrency-interest] ConcurrentHashMapV8 Livelock on computeIfAbsent() ?

 

Do you have a thread dump? (sorry, I don't have any spare cycles to have a stab at running it right now)

 

On Wed, Apr 27, 2016 at 3:09 PM, Millies, Sebastian <[hidden email]> wrote:

I have added https://gist.github.com/smillies/0cceb17501f74c4f53bf4930eba61889#file-fibcachedconcurrentbenchmark-java

to compute concurrent benchmarks of the 6000th Fibonacci number. Only the CF versions of course.

 

The same Fib is used by two threads in each case, the pool for the async version gets another two.

 

The bad news is they’re prone to  deadlock. Now I am no expert in JMH, perhaps it’s just the way I’ve set up the tests .(I hope so.)

 

n  Sebastian

 

 

Software AG – Sitz/Registered office: Uhlandstraße 12, 64297 Darmstadt, Germany – Registergericht/Commercial register: Darmstadt HRB 1562 - Vorstand/Management Board: Karl-Heinz Streibich (Vorsitzender/Chairman), Eric Duffaut, Dr. Wolfram Jost, Arnd Zinnhardt; - Aufsichtsratsvorsitzender/Chairman of the Supervisory Board: Dr. Andreas Bereczky - http://www.softwareag.com



 

--

Cheers,


_______________________________________________ Concurrency-interest mailing list [hidden email] http://cs.oswego.edu/mailman/listinfo/concurrency-interest

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: ConcurrentHashMapV8 Livelock on computeIfAbsent() ?

Millies, Sebastian
In reply to this post by Viktor Klang

Hi Viktor,

 

as I’m getting StackOverflowErrors I don’t think the code we’ve seen trampolines. However, below I show code that really does trampoline. That is, it combines concurrent memorization and  trampolining. With it, I can get to about the 500.000th Fibonacci number on my box, before all the cached BigInteger values cause an OutOfMemoryError. This code scales OK, I have single-shot times for  the 100.000th Fibonacci number with 2, 4, and 8 concurrent threads of about a quarter of a second.

 

This is somewhat off on a tangent, I guess. Bouncing off an Executor queue inside CF is not the most efficient implementation of a trampoline. But I think it’s still interesting.

 

n  Sebastian

 

static class ConcurrentTrampoliningMemoizer<T,R> {

  private static final Executor TRAMPOLINE = newSingleThreadExecutor(new ThreadFactoryBuilder().setDaemon(true).build());

  private final ConcurrentMap<T, CompletableFuture<R>> memo;

 

  public ConcurrentTrampoliningMemoizer(ConcurrentMap<T, CompletableFuture<R>> cache) {

    this.memo = cache;

  }

 

  public Function<T, CompletableFuture<R>> memoize(Function<T, CompletableFuture<R>> f) {

    return t -> {

      CompletableFuture<R> r = memo.get(t);

      if (r == null) {

        final CompletableFuture<R> compute = new CompletableFuture<>();

        r = memo.putIfAbsent(t, compute);

        if (r == null) {

          r = CompletableFuture.supplyAsync(() -> f.apply(t), TRAMPOLINE).thenCompose(Function.identity())

              .thenCompose(x -> {

                compute.complete(x);

                return compute;

              });

        }

       }

       return r;

    };

  }

}

 

static class Fibonacci {

  private final ConcurrentTrampoliningMemoizer<Integer, BigInteger> m;

 

  public Fibonacci(ConcurrentMap<Integer, CompletableFuture<BigInteger>> cache) {

    m = new ConcurrentTrampoliningMemoizer<>(cache);

  }

 

  public CompletableFuture<BigInteger> fib(int n) {

    if (n <= 2) return CompletableFuture.completedFuture(BigInteger.ONE);

    return m.memoize(this::fib).apply(n - 1).thenCompose(x ->

           m.memoize(this::fib).apply(n - 2).thenApply(y ->

             x.add(y)));

  }

}

 

BigInteger fib = new Fibonacci(new ConcurrentHashMap<>()).fib(500_000).join();

 

From: Viktor Klang [mailto:[hidden email]]
Sent: Sunday, May 01, 2016 10:56 AM
To: Millies, Sebasti
an
Cc: concurrency-interest
Subject: RE: [concurrency-interest] ConcurrentHashMapV8 Livelock on computeIfAbsent() ?

 

Hi Sebastian,

I suspect the use of thenComposeAsync is required to trampoline on top of the Executors submission queue rather than exhausting the stack.

I completely missed that you were testing the scalability, I'm sorry. Let me have a look later today or tomorrow.

Thanks for keeping the topic alive,

V

--
Cheers,

On Apr 30, 2016 11:19 PM, "Millies, Sebastian" <[hidden email]> wrote:

Hi Viktor,

 

I’m sorry, I’ve made a mistake: there hasn’t been a deadlock at all. When running the JMH benchmark, I don’t get as much stack space as when running stand-alone, so the recursion depth of 6000 was causing a StackoverflowError. However, I didn’t see that error, it was hidden by JMH, I just saw the benchmark hanging and the JMH worker threads all being parked, and jumped to the wrong conclusion.

 

Anyway, computing the 3500th Fibonacci number, I consistently do not see any advantage of the async version over the synchronous one. In fact, it is the other way around:

 

2 Threads

Benchmark                                 Mode  Cnt  Score     Error  Units

FibCachedConcurrentBenchmark.cf3500         ss   20   4.132 ±  1.421  ms/op

FibCachedConcurrentBenchmark.cfAsync3500    ss   20   9.134 ±  0.862  ms/op

FibCachedConcurrentBenchmark.cf3500         ss   20   2.887 ±  0.571  ms/op

FibCachedConcurrentBenchmark.cfAsync3500    ss   20  10.345 ± 12.954  ms/op

FibCachedConcurrentBenchmark.cf3500         ss   20   3.500 ±  1.291  ms/op

FibCachedConcurrentBenchmark.cfAsync3500    ss   20   8.803 ±  1.679  ms/op

 

4 Threads

Benchmark                                 Mode  Cnt  Score   Error  Units

FibCachedConcurrentBenchmark.cf3500         ss   20  2.780 ± 0.430  ms/op

FibCachedConcurrentBenchmark.cfAsync3500    ss   20  8.850 ± 1.595  ms/op

FibCachedConcurrentBenchmark.cf3500         ss   20  3.034 ± 0.451  ms/op

FibCachedConcurrentBenchmark.cfAsync3500    ss   20  9.744 ± 1.669  ms/op

FibCachedConcurrentBenchmark.cf3500         ss   20  3.965 ± 1.380  ms/op

FibCachedConcurrentBenchmark.cfAsync3500    ss   20  8.430 ± 2.396  ms/op

 

Perhaps adding to BigIntegers just isn’t expensive enough to warrant the overhead of going async.

 

n  Sebastian

 

PS: Your code below I think doesn’t address the problem, namely as you suggested “to use the same Fib from multiple threads to show how it behaves under contention”. Your test(int) method below produces a new CF instance for each thread, so there is no contention. Or does it?

 


Software AG – Sitz/Registered office: Uhlandstraße 12, 64297 Darmstadt, Germany – Registergericht/Commercial register: Darmstadt HRB 1562 - Vorstand/Management Board: Karl-Heinz Streibich (Vorsitzender/Chairman), Eric Duffaut, Dr. Wolfram Jost, Arnd Zinnhardt; - Aufsichtsratsvorsitzender/Chairman of the Supervisory Board: Dr. Andreas Bereczky - http://www.softwareag.com


_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: ConcurrentHashMapV8 Livelock on computeIfAbsent() ?

Viktor Klang
Hi Sebastian,

Controlling the bouncing from the outside seems desirable, and I think the following implementation might generate a bit less allocations than the previous one:

package fib;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Function;
import java.math.BigInteger;

class ConcurrentTrampoliningMemoizer<T,R> {
  private final ConcurrentMap<T, CompletionStage<R>> memo;
  private final Executor executor;
 
  public ConcurrentTrampoliningMemoizer(final ConcurrentMap<T, CompletionStage<R>> cache, final Executor e) {
    this.memo = cache;
    this.executor = e;
  }
 
  public Function<T, CompletionStage<R>> memoize(final Function<T, CompletionStage<R>> f) {
    return t -> {
      CompletionStage<R> r = memo.get(t);
      if (r == null) {
        final CompletableFuture<R> compute = new CompletableFuture<>();
        r = memo.putIfAbsent(t, compute);
        if (r == null) {
          r = CompletableFuture.supplyAsync(() -> f.apply(t), executor).thenCompose(Function.identity())
              .thenCompose(x -> {
                compute.complete(x);
                return compute;
              });
        }
       }
       return r;
    };
  }
}
 
public final class FibCached {
  private final Function<Integer, CompletionStage<BigInteger>> m;
  private final CompletionStage<BigInteger> ONE = CompletableFuture.completedFuture(BigInteger.ONE);
 
  public FibCached(final ConcurrentMap<Integer, CompletionStage<BigInteger>> cache, final Executor executor) {
    m = new ConcurrentTrampoliningMemoizer<Integer, BigInteger>(cache, executor).memoize(this::fib);
  }
 
  public CompletionStage<BigInteger> fib(int n) {
    if (n <= 2) return ONE;
    else return m.apply(n - 1).thenCompose(x -> m.apply(n - 2).thenApply(y -> x.add(y)));
  }
}

On Mon, May 2, 2016 at 12:03 PM, Millies, Sebastian <[hidden email]> wrote:

Hi Viktor,

 

as I’m getting StackOverflowErrors I don’t think the code we’ve seen trampolines. However, below I show code that really does trampoline. That is, it combines concurrent memorization and  trampolining. With it, I can get to about the 500.000th Fibonacci number on my box, before all the cached BigInteger values cause an OutOfMemoryError. This code scales OK, I have single-shot times for  the 100.000th Fibonacci number with 2, 4, and 8 concurrent threads of about a quarter of a second.

 

This is somewhat off on a tangent, I guess. Bouncing off an Executor queue inside CF is not the most efficient implementation of a trampoline. But I think it’s still interesting.

 

n  Sebastian

 

static class ConcurrentTrampoliningMemoizer<T,R> {

  private static final Executor TRAMPOLINE = newSingleThreadExecutor(new ThreadFactoryBuilder().setDaemon(true).build());

  private final ConcurrentMap<T, CompletableFuture<R>> memo;

 

  public ConcurrentTrampoliningMemoizer(ConcurrentMap<T, CompletableFuture<R>> cache) {

    this.memo = cache;

  }

 

  public Function<T, CompletableFuture<R>> memoize(Function<T, CompletableFuture<R>> f) {

    return t -> {

      CompletableFuture<R> r = memo.get(t);

      if (r == null) {

        final CompletableFuture<R> compute = new CompletableFuture<>();

        r = memo.putIfAbsent(t, compute);

        if (r == null) {

          r = CompletableFuture.supplyAsync(() -> f.apply(t), TRAMPOLINE).thenCompose(Function.identity())

              .thenCompose(x -> {

                compute.complete(x);

                return compute;

              });

        }

       }

       return r;

    };

  }

}

 

static class Fibonacci {

  private final ConcurrentTrampoliningMemoizer<Integer, BigInteger> m;

 

  public Fibonacci(ConcurrentMap<Integer, CompletableFuture<BigInteger>> cache) {

    m = new ConcurrentTrampoliningMemoizer<>(cache);

  }

 

  public CompletableFuture<BigInteger> fib(int n) {

    if (n <= 2) return CompletableFuture.completedFuture(BigInteger.ONE);

    return m.memoize(this::fib).apply(n - 1).thenCompose(x ->

           m.memoize(this::fib).apply(n - 2).thenApply(y ->

             x.add(y)));

  }

}

 

BigInteger fib = new Fibonacci(new ConcurrentHashMap<>()).fib(500_000).join();

 

From: Viktor Klang [mailto:[hidden email]]
Sent: Sunday, May 01, 2016 10:56 AM
To: Millies, Sebasti
an
Cc: concurrency-interest
Subject: RE: [concurrency-interest] ConcurrentHashMapV8 Livelock on computeIfAbsent() ?

 

Hi Sebastian,

I suspect the use of thenComposeAsync is required to trampoline on top of the Executors submission queue rather than exhausting the stack.

I completely missed that you were testing the scalability, I'm sorry. Let me have a look later today or tomorrow.

Thanks for keeping the topic alive,

V

--
Cheers,

On Apr 30, 2016 11:19 PM, "Millies, Sebastian" <[hidden email]> wrote:

Hi Viktor,

 

I’m sorry, I’ve made a mistake: there hasn’t been a deadlock at all. When running the JMH benchmark, I don’t get as much stack space as when running stand-alone, so the recursion depth of 6000 was causing a StackoverflowError. However, I didn’t see that error, it was hidden by JMH, I just saw the benchmark hanging and the JMH worker threads all being parked, and jumped to the wrong conclusion.

 

Anyway, computing the 3500th Fibonacci number, I consistently do not see any advantage of the async version over the synchronous one. In fact, it is the other way around:

 

2 Threads

Benchmark                                 Mode  Cnt  Score     Error  Units

FibCachedConcurrentBenchmark.cf3500         ss   20   4.132 ±  1.421  ms/op

FibCachedConcurrentBenchmark.cfAsync3500    ss   20   9.134 ±  0.862  ms/op

FibCachedConcurrentBenchmark.cf3500         ss   20   2.887 ±  0.571  ms/op

FibCachedConcurrentBenchmark.cfAsync3500    ss   20  10.345 ± 12.954  ms/op

FibCachedConcurrentBenchmark.cf3500         ss   20   3.500 ±  1.291  ms/op

FibCachedConcurrentBenchmark.cfAsync3500    ss   20   8.803 ±  1.679  ms/op

 

4 Threads

Benchmark                                 Mode  Cnt  Score   Error  Units

FibCachedConcurrentBenchmark.cf3500         ss   20  2.780 ± 0.430  ms/op

FibCachedConcurrentBenchmark.cfAsync3500    ss   20  8.850 ± 1.595  ms/op

FibCachedConcurrentBenchmark.cf3500         ss   20  3.034 ± 0.451  ms/op

FibCachedConcurrentBenchmark.cfAsync3500    ss   20  9.744 ± 1.669  ms/op

FibCachedConcurrentBenchmark.cf3500         ss   20  3.965 ± 1.380  ms/op

FibCachedConcurrentBenchmark.cfAsync3500    ss   20  8.430 ± 2.396  ms/op

 

Perhaps adding to BigIntegers just isn’t expensive enough to warrant the overhead of going async.

 

n  Sebastian

 

PS: Your code below I think doesn’t address the problem, namely as you suggested “to use the same Fib from multiple threads to show how it behaves under contention”. Your test(int) method below produces a new CF instance for each thread, so there is no contention. Or does it?

 


Software AG – Sitz/Registered office: Uhlandstraße 12, 64297 Darmstadt, Germany – Registergericht/Commercial register: Darmstadt HRB 1562 - Vorstand/Management Board: Karl-Heinz Streibich (Vorsitzender/Chairman), Eric Duffaut, Dr. Wolfram Jost, Arnd Zinnhardt; - Aufsichtsratsvorsitzender/Chairman of the Supervisory Board: Dr. Andreas Bereczky - http://www.softwareag.com




--
Cheers,

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: ConcurrentHashMapV8 Livelock on computeIfAbsent() ?

Millies, Sebastian

Hi Viktor,

 

I don’t get much more mileage. What’s swamping the heap is really the BigIntegers, not the function instances (and ONE gets allocated only once anyway).

As for passing in an executor: On my system, performance degrades when I pass in anything but a single thread executor, so I’m not sure that extra parameter really helps.

 

n  Sebastian

 

From: Viktor Klang [mailto:[hidden email]]
Sent: Monday, May 02, 2016 12:31 PM
To: Millies,
Sebastian
Cc: concurrency-interest
Subject: Re: [concurrency-interest] ConcurrentHashMapV8 Livelock on computeIfAbsent() ?

 

Hi Sebastian,

 

Controlling the bouncing from the outside seems desirable, and I think the following implementation might generate a bit less allocations than the previous one:

 

package fib;

 

import java.util.concurrent.CompletableFuture;

import java.util.concurrent.CompletionStage;

import java.util.concurrent.Executor;

import java.util.concurrent.ConcurrentMap;

import java.util.function.Function;

import java.math.BigInteger;

 

class ConcurrentTrampoliningMemoizer<T,R> {

  private final ConcurrentMap<T, CompletionStage<R>> memo;

  private final Executor executor;

 

  public ConcurrentTrampoliningMemoizer(final ConcurrentMap<T, CompletionStage<R>> cache, final Executor e) {

    this.memo = cache;

    this.executor = e;

  }

 

  public Function<T, CompletionStage<R>> memoize(final Function<T, CompletionStage<R>> f) {

    return t -> {

      CompletionStage<R> r = memo.get(t);

      if (r == null) {

        final CompletableFuture<R> compute = new CompletableFuture<>();

        r = memo.putIfAbsent(t, compute);

        if (r == null) {

          r = CompletableFuture.supplyAsync(() -> f.apply(t), executor).thenCompose(Function.identity())

              .thenCompose(x -> {

                compute.complete(x);

                return compute;

              });

        }

       }

       return r;

    };

  }

}

 

public final class FibCached {

  private final Function<Integer, CompletionStage<BigInteger>> m;

  private final CompletionStage<BigInteger> ONE = CompletableFuture.completedFuture(BigInteger.ONE);

 

  public FibCached(final ConcurrentMap<Integer, CompletionStage<BigInteger>> cache, final Executor executor) {

    m = new ConcurrentTrampoliningMemoizer<Integer, BigInteger>(cache, executor).memoize(this::fib);

  }

 

  public CompletionStage<BigInteger> fib(int n) {

    if (n <= 2) return ONE;

    else return m.apply(n - 1).thenCompose(x -> m.apply(n - 2).thenApply(y -> x.add(y)));

  }

}

 

On Mon, May 2, 2016 at 12:03 PM, Millies, Sebastian <[hidden email]> wrote:

Hi Viktor,

 

as I’m getting StackOverflowErrors I don’t think the code we’ve seen trampolines. However, below I show code that really does trampoline. That is, it combines concurrent memorization and  trampolining. With it, I can get to about the 500.000th Fibonacci number on my box, before all the cached BigInteger values cause an OutOfMemoryError. This code scales OK, I have single-shot times for  the 100.000th Fibonacci number with 2, 4, and 8 concurrent threads of about a quarter of a second.

 

This is somewhat off on a tangent, I guess. Bouncing off an Executor queue inside CF is not the most efficient implementation of a trampoline. But I think it’s still interesting.

 

n  Sebastian

 

static class ConcurrentTrampoliningMemoizer<T,R> {

  private static final Executor TRAMPOLINE = newSingleThreadExecutor(new ThreadFactoryBuilder().setDaemon(true).build());

  private final ConcurrentMap<T, CompletableFuture<R>> memo;

 

  public ConcurrentTrampoliningMemoizer(ConcurrentMap<T, CompletableFuture<R>> cache) {

    this.memo = cache;

  }

 

  public Function<T, CompletableFuture<R>> memoize(Function<T, CompletableFuture<R>> f) {

    return t -> {

      CompletableFuture<R> r = memo.get(t);

      if (r == null) {

        final CompletableFuture<R> compute = new CompletableFuture<>();

        r = memo.putIfAbsent(t, compute);

        if (r == null) {

          r = CompletableFuture.supplyAsync(() -> f.apply(t), TRAMPOLINE).thenCompose(Function.identity())

              .thenCompose(x -> {

                compute.complete(x);

                return compute;

              });

        }

       }

       return r;

    };

  }

}

 

static class Fibonacci {

  private final ConcurrentTrampoliningMemoizer<Integer, BigInteger> m;

 

  public Fibonacci(ConcurrentMap<Integer, CompletableFuture<BigInteger>> cache) {

    m = new ConcurrentTrampoliningMemoizer<>(cache);

  }

 

  public CompletableFuture<BigInteger> fib(int n) {

    if (n <= 2) return CompletableFuture.completedFuture(BigInteger.ONE);

    return m.memoize(this::fib).apply(n - 1).thenCompose(x ->

           m.memoize(this::fib).apply(n - 2).thenApply(y ->

             x.add(y)));

  }

}

 

BigInteger fib = new Fibonacci(new ConcurrentHashMap<>()).fib(500_000).join();

 

From: Viktor Klang [mailto:[hidden email]]
Sent: Sunday, May 01, 2016 10:56 AM
To: Millies, Sebasti
an
Cc: concurrency-interest
Subject: RE: [concurrency-interest] ConcurrentHashMapV8 Livelock on computeIfAbsent() ?

 

Hi Sebastian,

I suspect the use of thenComposeAsync is required to trampoline on top of the Executors submission queue rather than exhausting the stack.

I completely missed that you were testing the scalability, I'm sorry. Let me have a look later today or tomorrow.

Thanks for keeping the topic alive,

V

--
Cheers,

On Apr 30, 2016 11:19 PM, "Millies, Sebastian" <[hidden email]> wrote:

Hi Viktor,

 

I’m sorry, I’ve made a mistake: there hasn’t been a deadlock at all. When running the JMH benchmark, I don’t get as much stack space as when running stand-alone, so the recursion depth of 6000 was causing a StackoverflowError. However, I didn’t see that error, it was hidden by JMH, I just saw the benchmark hanging and the JMH worker threads all being parked, and jumped to the wrong conclusion.

 

Anyway, computing the 3500th Fibonacci number, I consistently do not see any advantage of the async version over the synchronous one. In fact, it is the other way around:

 

2 Threads

Benchmark                                 Mode  Cnt  Score     Error  Units

FibCachedConcurrentBenchmark.cf3500         ss   20   4.132 ±  1.421  ms/op

FibCachedConcurrentBenchmark.cfAsync3500    ss   20   9.134 ±  0.862  ms/op

FibCachedConcurrentBenchmark.cf3500         ss   20   2.887 ±  0.571  ms/op

FibCachedConcurrentBenchmark.cfAsync3500    ss   20  10.345 ± 12.954  ms/op

FibCachedConcurrentBenchmark.cf3500         ss   20   3.500 ±  1.291  ms/op

FibCachedConcurrentBenchmark.cfAsync3500    ss   20   8.803 ±  1.679  ms/op

 

4 Threads

Benchmark                                 Mode  Cnt  Score   Error  Units

FibCachedConcurrentBenchmark.cf3500         ss   20  2.780 ± 0.430  ms/op

FibCachedConcurrentBenchmark.cfAsync3500    ss   20  8.850 ± 1.595  ms/op

FibCachedConcurrentBenchmark.cf3500         ss   20  3.034 ± 0.451  ms/op

FibCachedConcurrentBenchmark.cfAsync3500    ss   20  9.744 ± 1.669  ms/op

FibCachedConcurrentBenchmark.cf3500         ss   20  3.965 ± 1.380  ms/op

FibCachedConcurrentBenchmark.cfAsync3500    ss   20  8.430 ± 2.396  ms/op

 

Perhaps adding to BigIntegers just isn’t expensive enough to warrant the overhead of going async.

 

n  Sebastian

 

PS: Your code below I think doesn’t address the problem, namely as you suggested “to use the same Fib from multiple threads to show how it behaves under contention”. Your test(int) method below produces a new CF instance for each thread, so there is no contention. Or does it?

 

 

Software AG – Sitz/Registered office: Uhlandstraße 12, 64297 Darmstadt, Germany – Registergericht/Commercial register: Darmstadt HRB 1562 - Vorstand/Management Board: Karl-Heinz Streibich (Vorsitzender/Chairman), Eric Duffaut, Dr. Wolfram Jost, Arnd Zinnhardt; - Aufsichtsratsvorsitzender/Chairman of the Supervisory Board: Dr. Andreas Bereczky - http://www.softwareag.com



 

--

Cheers,


_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: ConcurrentHashMapV8 Livelock on computeIfAbsent() ?

Viktor Klang

Hi Sebastian,

I always restructure code along my own neural pathways so don't view all my changes as major optimizations. :)

The reason for passing in the Executor is to make it much easier to measure performance/scalability separately from implementation.

It would also be possible to create an Executor which trampolines on the current/calling thread.

I guess next step is to change the encoding of the storage?

If we step back to the original topic: it does seem like the current approach is workable compared to computeIfAbsent.

--
Cheers,

On May 2, 2016 1:25 PM, "Millies, Sebastian" <[hidden email]> wrote:

Hi Viktor,

 

I don’t get much more mileage. What’s swamping the heap is really the BigIntegers, not the function instances (and ONE gets allocated only once anyway).

As for passing in an executor: On my system, performance degrades when I pass in anything but a single thread executor, so I’m not sure that extra parameter really helps.

 

n  Sebastian

 

From: Viktor Klang [mailto:[hidden email]]

Sent: Monday, May 02, 2016 12:31 PM
To: Millies, Sebastian
Cc: concurrency-interest
Subject: Re: [concurrency-interest] ConcurrentHashMapV8 Livelock on computeIfAbsent() ?

 

Hi Sebastian,

 

Controlling the bouncing from the outside seems desirable, and I think the following implementation might generate a bit less allocations than the previous one:

 

package fib;

 

import java.util.concurrent.CompletableFuture;

import java.util.concurrent.CompletionStage;

import java.util.concurrent.Executor;

import java.util.concurrent.ConcurrentMap;

import java.util.function.Function;

import java.math.BigInteger;

 

class ConcurrentTrampoliningMemoizer<T,R> {

  private final ConcurrentMap<T, CompletionStage<R>> memo;

  private final Executor executor;

 

  public ConcurrentTrampoliningMemoizer(final ConcurrentMap<T, CompletionStage<R>> cache, final Executor e) {

    this.memo = cache;

    this.executor = e;

  }

 

  public Function<T, CompletionStage<R>> memoize(final Function<T, CompletionStage<R>> f) {

    return t -> {

      CompletionStage<R> r = memo.get(t);

      if (r == null) {

        final CompletableFuture<R> compute = new CompletableFuture<>();

        r = memo.putIfAbsent(t, compute);

        if (r == null) {

          r = CompletableFuture.supplyAsync(() -> f.apply(t), executor).thenCompose(Function.identity())

              .thenCompose(x -> {

                compute.complete(x);

                return compute;

              });

        }

       }

       return r;

    };

  }

}

 

public final class FibCached {

  private final Function<Integer, CompletionStage<BigInteger>> m;

  private final CompletionStage<BigInteger> ONE = CompletableFuture.completedFuture(BigInteger.ONE);

 

  public FibCached(final ConcurrentMap<Integer, CompletionStage<BigInteger>> cache, final Executor executor) {

    m = new ConcurrentTrampoliningMemoizer<Integer, BigInteger>(cache, executor).memoize(this::fib);

  }

 

  public CompletionStage<BigInteger> fib(int n) {

    if (n <= 2) return ONE;

    else return m.apply(n - 1).thenCompose(x -> m.apply(n - 2).thenApply(y -> x.add(y)));

  }

}

 

On Mon, May 2, 2016 at 12:03 PM, Millies, Sebastian <[hidden email]> wrote:

Hi Viktor,

 

as I’m getting StackOverflowErrors I don’t think the code we’ve seen trampolines. However, below I show code that really does trampoline. That is, it combines concurrent memorization and  trampolining. With it, I can get to about the 500.000th Fibonacci number on my box, before all the cached BigInteger values cause an OutOfMemoryError. This code scales OK, I have single-shot times for  the 100.000th Fibonacci number with 2, 4, and 8 concurrent threads of about a quarter of a second.

 

This is somewhat off on a tangent, I guess. Bouncing off an Executor queue inside CF is not the most efficient implementation of a trampoline. But I think it’s still interesting.

 

n  Sebastian

 

static class ConcurrentTrampoliningMemoizer<T,R> {

  private static final Executor TRAMPOLINE = newSingleThreadExecutor(new ThreadFactoryBuilder().setDaemon(true).build());

  private final ConcurrentMap<T, CompletableFuture<R>> memo;

 

  public ConcurrentTrampoliningMemoizer(ConcurrentMap<T, CompletableFuture<R>> cache) {

    this.memo = cache;

  }

 

  public Function<T, CompletableFuture<R>> memoize(Function<T, CompletableFuture<R>> f) {

    return t -> {

      CompletableFuture<R> r = memo.get(t);

      if (r == null) {

        final CompletableFuture<R> compute = new CompletableFuture<>();

        r = memo.putIfAbsent(t, compute);

        if (r == null) {

          r = CompletableFuture.supplyAsync(() -> f.apply(t), TRAMPOLINE).thenCompose(Function.identity())

              .thenCompose(x -> {

                compute.complete(x);

                return compute;

              });

        }

       }

       return r;

    };

  }

}

 

static class Fibonacci {

  private final ConcurrentTrampoliningMemoizer<Integer, BigInteger> m;

 

  public Fibonacci(ConcurrentMap<Integer, CompletableFuture<BigInteger>> cache) {

    m = new ConcurrentTrampoliningMemoizer<>(cache);

  }

 

  public CompletableFuture<BigInteger> fib(int n) {

    if (n <= 2) return CompletableFuture.completedFuture(BigInteger.ONE);

    return m.memoize(this::fib).apply(n - 1).thenCompose(x ->

           m.memoize(this::fib).apply(n - 2).thenApply(y ->

             x.add(y)));

  }

}

 

BigInteger fib = new Fibonacci(new ConcurrentHashMap<>()).fib(500_000).join();

 

From: Viktor Klang [mailto:[hidden email]]
Sent: Sunday, May 01, 2016 10:56 AM
To: Millies, Sebasti
an
Cc: concurrency-interest
Subject: RE: [concurrency-interest] ConcurrentHashMapV8 Livelock on computeIfAbsent() ?

 

Hi Sebastian,

I suspect the use of thenComposeAsync is required to trampoline on top of the Executors submission queue rather than exhausting the stack.

I completely missed that you were testing the scalability, I'm sorry. Let me have a look later today or tomorrow.

Thanks for keeping the topic alive,

V

--
Cheers,

On Apr 30, 2016 11:19 PM, "Millies, Sebastian" <[hidden email]> wrote:

Hi Viktor,

 

I’m sorry, I’ve made a mistake: there hasn’t been a deadlock at all. When running the JMH benchmark, I don’t get as much stack space as when running stand-alone, so the recursion depth of 6000 was causing a StackoverflowError. However, I didn’t see that error, it was hidden by JMH, I just saw the benchmark hanging and the JMH worker threads all being parked, and jumped to the wrong conclusion.

 

Anyway, computing the 3500th Fibonacci number, I consistently do not see any advantage of the async version over the synchronous one. In fact, it is the other way around:

 

2 Threads

Benchmark                                 Mode  Cnt  Score     Error  Units

FibCachedConcurrentBenchmark.cf3500         ss   20   4.132 ±  1.421  ms/op

FibCachedConcurrentBenchmark.cfAsync3500    ss   20   9.134 ±  0.862  ms/op

FibCachedConcurrentBenchmark.cf3500         ss   20   2.887 ±  0.571  ms/op

FibCachedConcurrentBenchmark.cfAsync3500    ss   20  10.345 ± 12.954  ms/op

FibCachedConcurrentBenchmark.cf3500         ss   20   3.500 ±  1.291  ms/op

FibCachedConcurrentBenchmark.cfAsync3500    ss   20   8.803 ±  1.679  ms/op

 

4 Threads

Benchmark                                 Mode  Cnt  Score   Error  Units

FibCachedConcurrentBenchmark.cf3500         ss   20  2.780 ± 0.430  ms/op

FibCachedConcurrentBenchmark.cfAsync3500    ss   20  8.850 ± 1.595  ms/op

FibCachedConcurrentBenchmark.cf3500         ss   20  3.034 ± 0.451  ms/op

FibCachedConcurrentBenchmark.cfAsync3500    ss   20  9.744 ± 1.669  ms/op

FibCachedConcurrentBenchmark.cf3500         ss   20  3.965 ± 1.380  ms/op

FibCachedConcurrentBenchmark.cfAsync3500    ss   20  8.430 ± 2.396  ms/op

 

Perhaps adding to BigIntegers just isn’t expensive enough to warrant the overhead of going async.

 

n  Sebastian

 

PS: Your code below I think doesn’t address the problem, namely as you suggested “to use the same Fib from multiple threads to show how it behaves under contention”. Your test(int) method below produces a new CF instance for each thread, so there is no contention. Or does it?

 

 

Software AG – Sitz/Registered office: Uhlandstraße 12, 64297 Darmstadt, Germany – Registergericht/Commercial register: Darmstadt HRB 1562 - Vorstand/Management Board: Karl-Heinz Streibich (Vorsitzender/Chairman), Eric Duffaut, Dr. Wolfram Jost, Arnd Zinnhardt; - Aufsichtsratsvorsitzender/Chairman of the Supervisory Board: Dr. Andreas Bereczky - http://www.softwareag.com



 

--

Cheers,


_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
12