Using custom ThreadPool inside parallelstream causing deadlock

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

Using custom ThreadPool inside parallelstream causing deadlock

JSR166 Concurrency mailing list
The following code is causing the deadlock. In the sample code, I am
processing a list of String  calling processString method in parallel using
parallelStream (using Default common ForkJoin pool). The processString
method in turn calls the processCharacter in parallel using parallelStream
but this time in custom ThreadPool. The code gets deadlocked but not always.
Only once in 5-6 runs it gets deadlocked.

In parallelStream, using custom Thread, processCharacter is executing on 2
threads of custom ExecutorService (as obvious from output logs), so I am not
able to understand why it is getting deadlocked. 3 threads of
Common-ForkJoinPool are occupied by processString but 2 Threads of
CustomerPool should be able to complete processing of each character. So,
processing should not be blocked. It looks like I am missing something when
parallelStream is using custom ThreadPool as deadlock never happens if I am
using ForkJoinPool instead of FixedThreadPool.

My initial conjecture is that is something to do with calling future.get()
on outer parallelStream which is blocking ForkJoinThreads but why it is not
happening always and only once in 5-6 runs.

*Code: *
    public class ForkJoinTest {
      static final ExecutorService originalExecutor =
Executors.newFixedThreadPool(2);
      static final ExecutorService EXECUTOR =
MoreExecutors.getExitingExecutorService(
            (ThreadPoolExecutor) originalExecutor, 5, TimeUnit.SECONDS);

      public static void main(String[] args) {
        System.out
                .println("Forkjoin pool size: " +
(Runtime.getRuntime().availableProcessors() - 1));
        final List<String> strings = listOfRandomString(10000);
        strings.parallelStream().forEach(str -> processString(str));
      }

      private static void processString(final String string) {
        try {
            System.out.println("Processing string pause: " + string + ": " +
Thread.currentThread()
                    .getName());
            System.out.println("Processing string resume: " + string);
            final List<Character> chars = new ArrayList<>();
            for (Character ch : string.toCharArray()) {
                chars.add(ch);
            }
            final Runnable updateTask = () -> {
                chars.parallelStream().forEach(ch ->
processCharacter(string,
                        ch));
            };
            Future future = EXECUTOR.submit(updateTask);
            System.out.println("Wait of Future on Thread: " +
Thread.currentThread().getName());
            future.get();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    private static void processCharacter(final String str, final Character
character) {
        try {
            System.out
                    .println("processing character: " + character +
"Processing string pause: " + str + ": " + Thread
                            .currentThread()
                            .getName());
            Thread.sleep(2);

        } catch (Exception e) {
            e.printStackTrace();
        }
      }

      private static List<String> listOfRandomString(final int size) {
        final List<String> list = new ArrayList<>(size);
        for (int i = 0; i < size; i++) {
            list.add(UUID.randomUUID().toString());
        }
        return list;
      }
    }


*When the code runs with deadlock, it produces following output:*

    Forkjoin pool size: 3
    Processing string pause: 706fdcc5-235d-4513-aa73-ccc5ff3b7c96:
ForkJoinPool.commonPool-worker-2
    Processing string pause: 19814b98-8326-4531-b9f3-535c5a646076:
ForkJoinPool.commonPool-worker-3
    Processing string resume: 19814b98-8326-4531-b9f3-535c5a646076
    Processing string pause: 2f2a3e3f-3def-4663-b024-0560f374e5fb:
ForkJoinPool.commonPool-worker-1
    Processing string resume: 2f2a3e3f-3def-4663-b024-0560f374e5fb
    Processing string pause: 21c38331-7010-479b-bf8a-67e0557fee22: main
    Processing string resume: 706fdcc5-235d-4513-aa73-ccc5ff3b7c96
    Processing string resume: 21c38331-7010-479b-bf8a-67e0557fee22
    Wait of Future on Thread: ForkJoinPool.commonPool-worker-2
    Wait of Future on Thread: ForkJoinPool.commonPool-worker-3
    Wait of Future on Thread: main
    Wait of Future on Thread: ForkJoinPool.commonPool-worker-1
    processing character: 3Processing string pause:
19814b98-8326-4531-b9f3-535c5a646076: pool-1-thread-1
    processing character: 4Processing string pause:
2f2a3e3f-3def-4663-b024-0560f374e5fb: pool-1-thread-2
    processing character: -Processing string pause:
19814b98-8326-4531-b9f3-535c5a646076: pool-1-thread-1
    processing character: -Processing string pause:
2f2a3e3f-3def-4663-b024-0560f374e5fb: pool-1-thread-2
    processing character: 5Processing string pause:
19814b98-8326-4531-b9f3-535c5a646076: pool-1-thread-1
    processing character: eProcessing string pause:
2f2a3e3f-3def-4663-b024-0560f374e5fb: pool-1-thread-2
    processing character: 3Processing string pause:
19814b98-8326-4531-b9f3-535c5a646076: pool-1-thread-1
    processing character: fProcessing string pause:
2f2a3e3f-3def-4663-b024-0560f374e5fb: pool-1-thread-2
    processing character: 5Processing string pause:
19814b98-8326-4531-b9f3-535c5a646076: pool-1-thread-1
    processing character: 3Processing string pause:
2f2a3e3f-3def-4663-b024-0560f374e5fb: pool-1-thread-2
    processing character: dProcessing string pause:
2f2a3e3f-3def-4663-b024-0560f374e5fb: pool-1-thread-2
    processing character: 2Processing string pause:
19814b98-8326-4531-b9f3-535c5a646076: pool-1-thread-1
    processing character: 6Processing string pause:
19814b98-8326-4531-b9f3-535c5a646076: pool-1-thread-1
    processing character: 4Processing string pause:
2f2a3e3f-3def-4663-b024-0560f374e5fb: pool-1-thread-2
    processing character: eProcessing string pause:
2f2a3e3f-3def-4663-b024-0560f374e5fb: pool-1-thread-2
    processing character: 4Processing string pause:
19814b98-8326-4531-b9f3-535c5a646076: pool-1-thread-1
    processing character: 0Processing string pause:
2f2a3e3f-3def-4663-b024-0560f374e5fb: pool-1-thread-2
    processing character: 6Processing string pause:
19814b98-8326-4531-b9f3-535c5a646076: pool-1-thread-1
    processing character: 2Processing string pause:
2f2a3e3f-3def-4663-b024-0560f374e5fb: pool-1-thread-2
    processing character: 9Processing string pause:
19814b98-8326-4531-b9f3-535c5a646076: pool-1-thread-1
    processing character: 0Processing string pause:
2f2a3e3f-3def-4663-b024-0560f374e5fb: pool-1-thread-2
    processing character: fProcessing string pause:
19814b98-8326-4531-b9f3-535c5a646076: pool-1-thread-1
    processing character: 5Processing string pause:
2f2a3e3f-3def-4663-b024-0560f374e5fb: pool-1-thread-2
    processing character: -Processing string pause:
19814b98-8326-4531-b9f3-535c5a646076: pool-1-thread-1
    processing character: bProcessing string pause:
19814b98-8326-4531-b9f3-535c5a646076: pool-1-thread-1
    processing character: 6Processing string pause:
2f2a3e3f-3def-4663-b024-0560f374e5fb: pool-1-thread-2
    processing character: -Processing string pause:
2f2a3e3f-3def-4663-b024-0560f374e5fb: pool-1-thread-2
    processing character: bProcessing string pause:
2f2a3e3f-3def-4663-b024-0560f374e5fb: pool-1-thread-2
    processing character: 3Processing string pause:
2f2a3e3f-3def-4663-b024-0560f374e5fb: pool-1-thread-2
    processing character: eProcessing string pause:
2f2a3e3f-3def-4663-b024-0560f374e5fb: pool-1-thread-2
    processing character: 3Processing string pause:
2f2a3e3f-3def-4663-b024-0560f374e5fb: pool-1-thread-2
    processing character: fProcessing string pause:
2f2a3e3f-3def-4663-b024-0560f374e5fb: pool-1-thread-2
    processing character: -Processing string pause:
2f2a3e3f-3def-4663-b024-0560f374e5fb: pool-1-thread-2
    processing character: 2Processing string pause:
2f2a3e3f-3def-4663-b024-0560f374e5fb: pool-1-thread-2
    processing character: aProcessing string pause:
2f2a3e3f-3def-4663-b024-0560f374e5fb: pool-1-thread-2
    processing character: 2Processing string pause:
2f2a3e3f-3def-4663-b024-0560f374e5fb: pool-1-thread-2
    processing character: fProcessing string pause:
2f2a3e3f-3def-4663-b024-0560f374e5fb: pool-1-thread-2
    processing character: 6Processing string pause:
2f2a3e3f-3def-4663-b024-0560f374e5fb: pool-1-thread-2
    processing character: 6Processing string pause:
2f2a3e3f-3def-4663-b024-0560f374e5fb: pool-1-thread-2
    processing character: 3Processing string pause:
2f2a3e3f-3def-4663-b024-0560f374e5fb: pool-1-thread-2
    processing character: -Processing string pause:
2f2a3e3f-3def-4663-b024-0560f374e5fb: pool-1-thread-2
    processing character: 4Processing string pause:
2f2a3e3f-3def-4663-b024-0560f374e5fb: pool-1-thread-2


*When code does not deadlocks, output is something like this:*

    Forkjoin pool size: 3
    Processing string pause: 8c8aeba8-fbf8-4d80-8fb7-bb97ccf05e68:
ForkJoinPool.commonPool-worker-1
    Processing string pause: a1311746-ba82-4541-9fbd-37d25857944d:
ForkJoinPool.commonPool-worker-3
    Processing string resume: a1311746-ba82-4541-9fbd-37d25857944d
    Processing string pause: 4e5f6b79-bd48-4328-8d5c-252781bf9359:
ForkJoinPool.commonPool-worker-2
    Processing string resume: 4e5f6b79-bd48-4328-8d5c-252781bf9359
    Processing string pause: 48ebca74-2fa9-486b-9467-8b40b3f2617f: main
    Processing string resume: 8c8aeba8-fbf8-4d80-8fb7-bb97ccf05e68
    Processing string resume: 48ebca74-2fa9-486b-9467-8b40b3f2617f
    Wait of Future on Thread: ForkJoinPool.commonPool-worker-1
    Wait of Future on Thread: ForkJoinPool.commonPool-worker-3
    Wait of Future on Thread: ForkJoinPool.commonPool-worker-2
    Wait of Future on Thread: main
    processing character: cProcessing string pause:
4e5f6b79-bd48-4328-8d5c-252781bf9359: pool-1-thread-2
    processing character: 7Processing string pause:
8c8aeba8-fbf8-4d80-8fb7-bb97ccf05e68: pool-1-thread-1
    processing character: -Processing string pause:
8c8aeba8-fbf8-4d80-8fb7-bb97ccf05e68: pool-1-thread-1
    processing character: -Processing string pause:
4e5f6b79-bd48-4328-8d5c-252781bf9359: pool-1-thread-2
    processing character: 2Processing string pause:
4e5f6b79-bd48-4328-8d5c-252781bf9359: pool-1-thread-2
    processing character: bProcessing string pause:
8c8aeba8-fbf8-4d80-8fb7-bb97ccf05e68: pool-1-thread-1
    processing character: bProcessing string pause:
8c8aeba8-fbf8-4d80-8fb7-bb97ccf05e68: pool-1-thread-1
    processing character: 5Processing string pause:
4e5f6b79-bd48-4328-8d5c-252781bf9359: pool-1-thread-2
    processing character: 9Processing string pause:
8c8aeba8-fbf8-4d80-8fb7-bb97ccf05e68: pool-1-thread-1
    processing character: 2Processing string pause:
4e5f6b79-bd48-4328-8d5c-252781bf9359: pool-1-thread-2
    processing character: dProcessing string pause:
4e5f6b79-bd48-4328-8d5c-252781bf9359: pool-1-thread-2
    processing character: fProcessing string pause:
8c8aeba8-fbf8-4d80-8fb7-bb97ccf05e68: pool-1-thread-1
    processing character: 5Processing string pause:
4e5f6b79-bd48-4328-8d5c-252781bf9359: pool-1-thread-2
    processing character: bProcessing string pause:
8c8aeba8-fbf8-4d80-8fb7-bb97ccf05e68: pool-1-thread-1
    processing character: -Processing string pause:
8c8aeba8-fbf8-4d80-8fb7-bb97ccf05e68: pool-1-thread-1
    processing character: -Processing string pause:
4e5f6b79-bd48-4328-8d5c-252781bf9359: pool-1-thread-2
    processing character: 8Processing string pause:
8c8aeba8-fbf8-4d80-8fb7-bb97ccf05e68: pool-1-thread-1
    processing character: 8Processing string pause:
4e5f6b79-bd48-4328-8d5c-252781bf9359: pool-1-thread-2
    processing character: fProcessing string pause:
4e5f6b79-bd48-4328-8d5c-252781bf9359: pool-1-thread-2
    processing character: 0Processing string pause:
8c8aeba8-fbf8-4d80-8fb7-bb97ccf05e68: pool-1-thread-1
    processing character: 9Processing string pause:
4e5f6b79-bd48-4328-8d5c-252781bf9359: pool-1-thread-2
    processing character: 5Processing string pause:
8c8aeba8-fbf8-4d80-8fb7-bb97ccf05e68: pool-1-thread-1
    processing character: eProcessing string pause:
8c8aeba8-fbf8-4d80-8fb7-bb97ccf05e68: pool-1-thread-1
    processing character: 3Processing string pause:
4e5f6b79-bd48-4328-8d5c-252781bf9359: pool-1-thread-2
    processing character: 6Processing string pause:
8c8aeba8-fbf8-4d80-8fb7-bb97ccf05e68: pool-1-thread-1
    processing character: 5Processing string pause:
4e5f6b79-bd48-4328-8d5c-252781bf9359: pool-1-thread-2
    processing character: 8Processing string pause:
8c8aeba8-fbf8-4d80-8fb7-bb97ccf05e68: pool-1-thread-1
    processing character: 9Processing string pause:
4e5f6b79-bd48-4328-8d5c-252781bf9359: pool-1-thread-2
    processing character: 1Processing string pause:
4e5f6b79-bd48-4328-8d5c-252781bf9359: pool-1-thread-2
    processing character: cProcessing string pause:
8c8aeba8-fbf8-4d80-8fb7-bb97ccf05e68: pool-1-thread-1
    processing character: fProcessing string pause:
8c8aeba8-fbf8-4d80-8fb7-bb97ccf05e68: pool-1-thread-1
    processing character: bProcessing string pause:
4e5f6b79-bd48-4328-8d5c-252781bf9359: pool-1-thread-2
    processing character: 7Processing string pause:
4e5f6b79-bd48-4328-8d5c-252781bf9359: pool-1-thread-2
    processing character: 7Processing string pause:
8c8aeba8-fbf8-4d80-8fb7-bb97ccf05e68: pool-1-thread-1
    processing character: 8Processing string pause:
4e5f6b79-bd48-4328-8d5c-252781bf9359: pool-1-thread-2
    processing character: cProcessing string pause:
8c8aeba8-fbf8-4d80-8fb7-bb97ccf05e68: pool-1-thread-1
    processing character: 4Processing string pause:
4e5f6b79-bd48-4328-8d5c-252781bf9359: pool-1-thread-2
    processing character: fProcessing string pause:
8c8aeba8-fbf8-4d80-8fb7-bb97ccf05e68: pool-1-thread-1
    processing character: 8Processing string pause:
4e5f6b79-bd48-4328-8d5c-252781bf9359: pool-1-thread-2
    processing character: 8Processing string pause:
8c8aeba8-fbf8-4d80-8fb7-bb97ccf05e68: pool-1-thread-1
    processing character: bProcessing string pause:
4e5f6b79-bd48-4328-8d5c-252781bf9359: pool-1-thread-2
    processing character: fProcessing string pause:
8c8aeba8-fbf8-4d80-8fb7-bb97ccf05e68: pool-1-thread-1
    processing character: dProcessing string pause:
4e5f6b79-bd48-4328-8d5c-252781bf9359: pool-1-thread-2
    processing character: bProcessing string pause:
8c8aeba8-fbf8-4d80-8fb7-bb97ccf05e68: pool-1-thread-1
    processing character: 3Processing string pause:
4e5f6b79-bd48-4328-8d5c-252781bf9359: pool-1-thread-2
    processing character: dProcessing string pause:
8c8aeba8-fbf8-4d80-8fb7-bb97ccf05e68: pool-1-thread-1
    processing character: 2Processing string pause:
4e5f6b79-bd48-4328-8d5c-252781bf9359: pool-1-thread-2
    processing character: 8Processing string pause:
8c8aeba8-fbf8-4d80-8fb7-bb97ccf05e68: pool-1-thread-1
    processing character: 8Processing string pause:
4e5f6b79-bd48-4328-8d5c-252781bf9359: pool-1-thread-2
    processing character: 0Processing string pause:
8c8aeba8-fbf8-4d80-8fb7-bb97ccf05e68: pool-1-thread-1
    processing character: -Processing string pause:
4e5f6b79-bd48-4328-8d5c-252781bf9359: pool-1-thread-2
    processing character: -Processing string pause:
8c8aeba8-fbf8-4d80-8fb7-bb97ccf05e68: pool-1-thread-1
    processing character: 4Processing string pause:
4e5f6b79-bd48-4328-8d5c-252781bf9359: pool-1-thread-2
    processing character: 4Processing string pause:
8c8aeba8-fbf8-4d80-8fb7-bb97ccf05e68: pool-1-thread-1
    processing character: 6Processing string pause:
4e5f6b79-bd48-4328-8d5c-252781bf9359: pool-1-thread-2
    processing character: eProcessing string pause:
8c8aeba8-fbf8-4d80-8fb7-bb97ccf05e68: pool-1-thread-1
    processing character: bProcessing string pause:
4e5f6b79-bd48-4328-8d5c-252781bf9359: pool-1-thread-2
    processing character: bProcessing string pause:
8c8aeba8-fbf8-4d80-8fb7-bb97ccf05e68: pool-1-thread-1
    processing character: aProcessing string pause:
8c8aeba8-fbf8-4d80-8fb7-bb97ccf05e68: pool-1-thread-1
    processing character: 7Processing string pause:
4e5f6b79-bd48-4328-8d5c-252781bf9359: pool-1-thread-2
    processing character: 8Processing string pause:
8c8aeba8-fbf8-4d80-8fb7-bb97ccf05e68: pool-1-thread-1
    processing character: 9Processing string pause:
4e5f6b79-bd48-4328-8d5c-252781bf9359: pool-1-thread-2
    processing character: -Processing string pause:
4e5f6b79-bd48-4328-8d5c-252781bf9359: pool-1-thread-2
    processing character: -Processing string pause:
8c8aeba8-fbf8-4d80-8fb7-bb97ccf05e68: pool-1-thread-1
    processing character: 5Processing string pause:
4e5f6b79-bd48-4328-8d5c-252781bf9359: pool-1-thread-2
    processing character: 8Processing string pause:
8c8aeba8-fbf8-4d80-8fb7-bb97ccf05e68: pool-1-thread-1
    processing character: aProcessing string pause:
8c8aeba8-fbf8-4d80-8fb7-bb97ccf05e68: pool-1-thread-1
    processing character: fProcessing string pause:
4e5f6b79-bd48-4328-8d5c-252781bf9359: pool-1-thread-2
    processing character: 8Processing string pause:
8c8aeba8-fbf8-4d80-8fb7-bb97ccf05e68: pool-1-thread-1
    processing character: 4Processing string pause:
4e5f6b79-bd48-4328-8d5c-252781bf9359: pool-1-thread-2
    processing character: cProcessing string pause:
8c8aeba8-fbf8-4d80-8fb7-bb97ccf05e68: pool-1-thread-1
    processing character: eProcessing string pause:
4e5f6b79-bd48-4328-8d5c-252781bf9359: pool-1-thread-2
    Processing string pause: 58fa7ebd-6f4a-4ec0-8515-84148d870b0c:
ForkJoinPool.commonPool-worker-1
    Processing string resume: 58fa7ebd-6f4a-4ec0-8515-84148d870b0c
    processing character: dProcessing string pause:
a1311746-ba82-4541-9fbd-37d25857944d: pool-1-thread-1
    Wait of Future on Thread: ForkJoinPool.commonPool-worker-1
    Processing string pause: b941108a-c7a9-499e-b951-e94206ffa553:
ForkJoinPool.commonPool-worker-2
    Processing string resume: b941108a-c7a9-499e-b951-e94206ffa553
    Wait of Future on Thread: ForkJoinPool.commonPool-worker-2
    ....

*Thread dump for the deadlocked run:*

    "main" #1 prio=5 os_prio=31 tid=0x00007fba84002000 nid=0x1b03 waiting on
condition [0x000070000b253000]
       java.lang.Thread.State: WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  <0x000000076d8bc370> (a
java.util.concurrent.FutureTask)
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:429)
    at java.util.concurrent.FutureTask.get(FutureTask.java:191)
    at com.amazon.ForkJoinTest.processString(ForkJoinTest.java:46)
    at com.amazon.ForkJoinTest.lambda$main$0(ForkJoinTest.java:29)
    at com.amazon.ForkJoinTest$$Lambda$1/806353501.accept(Unknown Source)
    at
java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
    at
java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
    at
java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
    at java.util.stream.ForEachOps$ForEachTask.compute(ForEachOps.java:291)
    at
java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:731)
    at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
    at java.util.concurrent.ForkJoinTask.doInvoke(ForkJoinTask.java:401)
    at java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:734)
    at
java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:160)
    at
java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateParallel(ForEachOps.java:174)
    at
java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233)
    at
java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)
    at
java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:583)
    at com.amazon.ForkJoinTest.main(ForkJoinTest.java:29)
   
   
    "pool-1-thread-2" #15 daemon prio=5 os_prio=31 tid=0x00007fba831a0000
nid=0x5703 in Object.wait() [0x000070000c58c000]
       java.lang.Thread.State: WAITING (on object monitor)
    at java.lang.Object.wait(Native Method)
    - waiting on <0x000000076db792e8> (a
java.util.stream.ForEachOps$ForEachTask)
    at
java.util.concurrent.ForkJoinTask.externalAwaitDone(ForkJoinTask.java:334)
    - locked <0x000000076db792e8> (a
java.util.stream.ForEachOps$ForEachTask)
    at java.util.concurrent.ForkJoinTask.doInvoke(ForkJoinTask.java:405)
    at java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:734)
    at
java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:160)
    at
java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateParallel(ForEachOps.java:174)
    at
java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233)
    at
java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)
    at
java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:583)
    at com.amazon.ForkJoinTest.lambda$processString$2(ForkJoinTest.java:43)
    at com.amazon.ForkJoinTest$$Lambda$2/581138573.run(Unknown Source)
    at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
   
   
    "pool-1-thread-1" #14 daemon prio=5 os_prio=31 tid=0x00007fba83199800
nid=0x5503 in Object.wait() [0x000070000c489000]
       java.lang.Thread.State: WAITING (on object monitor)
    at java.lang.Object.wait(Native Method)
    - waiting on <0x000000076dad5480> (a
java.util.stream.ForEachOps$ForEachTask)
    at
java.util.concurrent.ForkJoinTask.externalAwaitDone(ForkJoinTask.java:334)
    - locked <0x000000076dad5480> (a
java.util.stream.ForEachOps$ForEachTask)
    at java.util.concurrent.ForkJoinTask.doInvoke(ForkJoinTask.java:405)
    at java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:734)
    at
java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:160)
    at
java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateParallel(ForEachOps.java:174)
    at
java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233)
    at
java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)
    at
java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:583)
    at com.amazon.ForkJoinTest.lambda$processString$2(ForkJoinTest.java:43)
    at com.amazon.ForkJoinTest$$Lambda$2/581138573.run(Unknown Source)
    at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
   
   
    "ForkJoinPool.commonPool-worker-3" #13 daemon prio=5 os_prio=31
tid=0x00007fba83197000 nid=0x5303 waiting on condition [0x000070000c386000]
       java.lang.Thread.State: WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  <0x000000076da34ad0> (a
java.util.concurrent.FutureTask)
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:429)
    at java.util.concurrent.FutureTask.get(FutureTask.java:191)
    at com.amazon.ForkJoinTest.processString(ForkJoinTest.java:46)
    at com.amazon.ForkJoinTest.lambda$main$0(ForkJoinTest.java:29)
    at com.amazon.ForkJoinTest$$Lambda$1/806353501.accept(Unknown Source)
    at
java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
    at
java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
    at
java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
    at java.util.stream.ForEachOps$ForEachTask.compute(ForEachOps.java:291)
    at
java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:731)
    at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
    at
java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
    at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
    at
java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
   
   
    "ForkJoinPool.commonPool-worker-2" #12 daemon prio=5 os_prio=31
tid=0x00007fba83831000 nid=0x5103 waiting on condition [0x000070000c283000]
       java.lang.Thread.State: WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  <0x000000076d985fd0> (a
java.util.concurrent.FutureTask)
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:429)
    at java.util.concurrent.FutureTask.get(FutureTask.java:191)
    at com.amazon.ForkJoinTest.processString(ForkJoinTest.java:46)
    at com.amazon.ForkJoinTest.lambda$main$0(ForkJoinTest.java:29)
    at com.amazon.ForkJoinTest$$Lambda$1/806353501.accept(Unknown Source)
    at
java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
    at
java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
    at
java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
    at java.util.stream.ForEachOps$ForEachTask.compute(ForEachOps.java:291)
    at
java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:731)
    at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
    at
java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
    at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
    at
java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
   
   
    "ForkJoinPool.commonPool-worker-1" #11 daemon prio=5 os_prio=31
tid=0x00007fba8395c000 nid=0x4f03 waiting on condition [0x000070000c180000]
       java.lang.Thread.State: WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  <0x000000076d8da2d0> (a
java.util.concurrent.FutureTask)
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:429)
    at java.util.concurrent.FutureTask.get(FutureTask.java:191)
    at com.amazon.ForkJoinTest.processString(ForkJoinTest.java:46)
    at com.amazon.ForkJoinTest.lambda$main$0(ForkJoinTest.java:29)
    at com.amazon.ForkJoinTest$$Lambda$1/806353501.accept(Unknown Source)
    at
java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
    at
java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
    at
java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
    at java.util.stream.ForEachOps$ForEachTask.compute(ForEachOps.java:291)
    at
java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:731)
    at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
    at
java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
    at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
    at
java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
   
   
    "Monitor Ctrl-Break" #5 daemon prio=5 os_prio=31 tid=0x00007fba840c9000
nid=0x4303 runnable [0x000070000bb6e000]
       java.lang.Thread.State: RUNNABLE
    at java.net.SocketInputStream.socketRead0(Native Method)
    at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
    at java.net.SocketInputStream.read(SocketInputStream.java:171)
    at java.net.SocketInputStream.read(SocketInputStream.java:141)
    at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284)
    at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326)
    at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
    - locked <0x000000076eb0e968> (a java.io.InputStreamReader)
    at java.io.InputStreamReader.read(InputStreamReader.java:184)
    at java.io.BufferedReader.fill(BufferedReader.java:161)
    at java.io.BufferedReader.readLine(BufferedReader.java:324)
    - locked <0x000000076eb0e968> (a java.io.InputStreamReader)
    at java.io.BufferedReader.readLine(BufferedReader.java:389)
    at
com.intellij.rt.execution.application.AppMainV2$1.run(AppMainV2.java:64
   

   



--
Sent from: http://jsr166-concurrency.10961.n7.nabble.com/
_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: Using custom ThreadPool inside parallelstream causing deadlock

JSR166 Concurrency mailing list
Hi Akhil,

as far as I can see, it is a classical resource deadlock.  We are
calling "get()" inside the parallel code, which blocks the common FJ
pool thread.

If instead of a Fixed Thread Pool we use a ForkJoinPool for the
originalExecutor, then there is a special mechanism to create additional
threads to keep the parallelism high, similarly to how the
ManagedBlocker works in Phaser and CompletableFuture.

If you add this at the top of main() you will see the difference:

    Runtime.getRuntime().addShutdownHook(new Thread(() ->
        System.out.println("ForkJoinPool.commonPool() = " +
ForkJoinPool.commonPool())));


With FixedThreadPool, size == parallelism.  No additional threads were
created during the blocking get() operations.

ForkJoinPool.commonPool() =
java.util.concurrent.ForkJoinPool@682a0b20[Running, parallelism = 3,
size = 3, active = 3, running = 0, steals = 0, tasks = 4, submissions = 9]

With ForkJoinPool, size >= parallelism.  If one of the parallel stream
threads blocks, another is created to keep the parallelism high:

ForkJoinPool.commonPool() =
java.util.concurrent.ForkJoinPool@1b28cdfa[Running, parallelism = 3,
size = 15, active = 0, running = 0, steals = 0, tasks = 0, submissions = 0]

Regards

Heinz
--
Dr Heinz M. Kabutz (PhD CompSci)
Author of "The Java™ Specialists' Newsletter" - www.javaspecialists.eu
Java Champion - www.javachampions.org
JavaOne Rock Star Speaker
Tel: +30 69 75 595 262
Skype: kabutz



akhilpratap1991 via Concurrency-interest wrote:

> The following code is causing the deadlock. In the sample code, I am
> processing a list of String  calling processString method in parallel using
> parallelStream (using Default common ForkJoin pool). The processString
> method in turn calls the processCharacter in parallel using parallelStream
> but this time in custom ThreadPool. The code gets deadlocked but not always.
> Only once in 5-6 runs it gets deadlocked.
>
> In parallelStream, using custom Thread, processCharacter is executing on 2
> threads of custom ExecutorService (as obvious from output logs), so I am not
> able to understand why it is getting deadlocked. 3 threads of
> Common-ForkJoinPool are occupied by processString but 2 Threads of
> CustomerPool should be able to complete processing of each character. So,
> processing should not be blocked. It looks like I am missing something when
> parallelStream is using custom ThreadPool as deadlock never happens if I am
> using ForkJoinPool instead of FixedThreadPool.
>
> My initial conjecture is that is something to do with calling future.get()
> on outer parallelStream which is blocking ForkJoinThreads but why it is not
> happening always and only once in 5-6 runs.
>
> *Code: *
>     public class ForkJoinTest {
>       static final ExecutorService originalExecutor =
> Executors.newFixedThreadPool(2);
>       static final ExecutorService EXECUTOR =
> MoreExecutors.getExitingExecutorService(
>             (ThreadPoolExecutor) originalExecutor, 5, TimeUnit.SECONDS);
>
>       public static void main(String[] args) {
>         System.out
>                 .println("Forkjoin pool size: " +
> (Runtime.getRuntime().availableProcessors() - 1));
>         final List<String> strings = listOfRandomString(10000);
>         strings.parallelStream().forEach(str -> processString(str));
>       }
>
>       private static void processString(final String string) {
>         try {
>             System.out.println("Processing string pause: " + string + ": " +
> Thread.currentThread()
>                     .getName());
>             System.out.println("Processing string resume: " + string);
>             final List<Character> chars = new ArrayList<>();
>             for (Character ch : string.toCharArray()) {
>                 chars.add(ch);
>             }
>             final Runnable updateTask = () -> {
>                 chars.parallelStream().forEach(ch ->
> processCharacter(string,
>                         ch));
>             };
>             Future future = EXECUTOR.submit(updateTask);
>             System.out.println("Wait of Future on Thread: " +
> Thread.currentThread().getName());
>             future.get();
>         } catch (Exception e) {
>             e.printStackTrace();
>         }
>     }
>
>     private static void processCharacter(final String str, final Character
> character) {
>         try {
>             System.out
>                     .println("processing character: " + character +
> "Processing string pause: " + str + ": " + Thread
>                             .currentThread()
>                             .getName());
>             Thread.sleep(2);
>
>         } catch (Exception e) {
>             e.printStackTrace();
>         }
>       }
>
>       private static List<String> listOfRandomString(final int size) {
>         final List<String> list = new ArrayList<>(size);
>         for (int i = 0; i < size; i++) {
>             list.add(UUID.randomUUID().toString());
>         }
>         return list;
>       }
>     }
>
>
> *When the code runs with deadlock, it produces following output:*
>
>     Forkjoin pool size: 3
>     Processing string pause: 706fdcc5-235d-4513-aa73-ccc5ff3b7c96:
> ForkJoinPool.commonPool-worker-2
>     Processing string pause: 19814b98-8326-4531-b9f3-535c5a646076:
> ForkJoinPool.commonPool-worker-3
>     Processing string resume: 19814b98-8326-4531-b9f3-535c5a646076
>     Processing string pause: 2f2a3e3f-3def-4663-b024-0560f374e5fb:
> ForkJoinPool.commonPool-worker-1
>     Processing string resume: 2f2a3e3f-3def-4663-b024-0560f374e5fb
>     Processing string pause: 21c38331-7010-479b-bf8a-67e0557fee22: main
>     Processing string resume: 706fdcc5-235d-4513-aa73-ccc5ff3b7c96
>     Processing string resume: 21c38331-7010-479b-bf8a-67e0557fee22
>     Wait of Future on Thread: ForkJoinPool.commonPool-worker-2
>     Wait of Future on Thread: ForkJoinPool.commonPool-worker-3
>     Wait of Future on Thread: main
>     Wait of Future on Thread: ForkJoinPool.commonPool-worker-1
>     processing character: 3Processing string pause:
> 19814b98-8326-4531-b9f3-535c5a646076: pool-1-thread-1
>     processing character: 4Processing string pause:
> 2f2a3e3f-3def-4663-b024-0560f374e5fb: pool-1-thread-2
>     processing character: -Processing string pause:
> 19814b98-8326-4531-b9f3-535c5a646076: pool-1-thread-1
>     processing character: -Processing string pause:
> 2f2a3e3f-3def-4663-b024-0560f374e5fb: pool-1-thread-2
>     processing character: 5Processing string pause:
> 19814b98-8326-4531-b9f3-535c5a646076: pool-1-thread-1
>     processing character: eProcessing string pause:
> 2f2a3e3f-3def-4663-b024-0560f374e5fb: pool-1-thread-2
>     processing character: 3Processing string pause:
> 19814b98-8326-4531-b9f3-535c5a646076: pool-1-thread-1
>     processing character: fProcessing string pause:
> 2f2a3e3f-3def-4663-b024-0560f374e5fb: pool-1-thread-2
>     processing character: 5Processing string pause:
> 19814b98-8326-4531-b9f3-535c5a646076: pool-1-thread-1
>     processing character: 3Processing string pause:
> 2f2a3e3f-3def-4663-b024-0560f374e5fb: pool-1-thread-2
>     processing character: dProcessing string pause:
> 2f2a3e3f-3def-4663-b024-0560f374e5fb: pool-1-thread-2
>     processing character: 2Processing string pause:
> 19814b98-8326-4531-b9f3-535c5a646076: pool-1-thread-1
>     processing character: 6Processing string pause:
> 19814b98-8326-4531-b9f3-535c5a646076: pool-1-thread-1
>     processing character: 4Processing string pause:
> 2f2a3e3f-3def-4663-b024-0560f374e5fb: pool-1-thread-2
>     processing character: eProcessing string pause:
> 2f2a3e3f-3def-4663-b024-0560f374e5fb: pool-1-thread-2
>     processing character: 4Processing string pause:
> 19814b98-8326-4531-b9f3-535c5a646076: pool-1-thread-1
>     processing character: 0Processing string pause:
> 2f2a3e3f-3def-4663-b024-0560f374e5fb: pool-1-thread-2
>     processing character: 6Processing string pause:
> 19814b98-8326-4531-b9f3-535c5a646076: pool-1-thread-1
>     processing character: 2Processing string pause:
> 2f2a3e3f-3def-4663-b024-0560f374e5fb: pool-1-thread-2
>     processing character: 9Processing string pause:
> 19814b98-8326-4531-b9f3-535c5a646076: pool-1-thread-1
>     processing character: 0Processing string pause:
> 2f2a3e3f-3def-4663-b024-0560f374e5fb: pool-1-thread-2
>     processing character: fProcessing string pause:
> 19814b98-8326-4531-b9f3-535c5a646076: pool-1-thread-1
>     processing character: 5Processing string pause:
> 2f2a3e3f-3def-4663-b024-0560f374e5fb: pool-1-thread-2
>     processing character: -Processing string pause:
> 19814b98-8326-4531-b9f3-535c5a646076: pool-1-thread-1
>     processing character: bProcessing string pause:
> 19814b98-8326-4531-b9f3-535c5a646076: pool-1-thread-1
>     processing character: 6Processing string pause:
> 2f2a3e3f-3def-4663-b024-0560f374e5fb: pool-1-thread-2
>     processing character: -Processing string pause:
> 2f2a3e3f-3def-4663-b024-0560f374e5fb: pool-1-thread-2
>     processing character: bProcessing string pause:
> 2f2a3e3f-3def-4663-b024-0560f374e5fb: pool-1-thread-2
>     processing character: 3Processing string pause:
> 2f2a3e3f-3def-4663-b024-0560f374e5fb: pool-1-thread-2
>     processing character: eProcessing string pause:
> 2f2a3e3f-3def-4663-b024-0560f374e5fb: pool-1-thread-2
>     processing character: 3Processing string pause:
> 2f2a3e3f-3def-4663-b024-0560f374e5fb: pool-1-thread-2
>     processing character: fProcessing string pause:
> 2f2a3e3f-3def-4663-b024-0560f374e5fb: pool-1-thread-2
>     processing character: -Processing string pause:
> 2f2a3e3f-3def-4663-b024-0560f374e5fb: pool-1-thread-2
>     processing character: 2Processing string pause:
> 2f2a3e3f-3def-4663-b024-0560f374e5fb: pool-1-thread-2
>     processing character: aProcessing string pause:
> 2f2a3e3f-3def-4663-b024-0560f374e5fb: pool-1-thread-2
>     processing character: 2Processing string pause:
> 2f2a3e3f-3def-4663-b024-0560f374e5fb: pool-1-thread-2
>     processing character: fProcessing string pause:
> 2f2a3e3f-3def-4663-b024-0560f374e5fb: pool-1-thread-2
>     processing character: 6Processing string pause:
> 2f2a3e3f-3def-4663-b024-0560f374e5fb: pool-1-thread-2
>     processing character: 6Processing string pause:
> 2f2a3e3f-3def-4663-b024-0560f374e5fb: pool-1-thread-2
>     processing character: 3Processing string pause:
> 2f2a3e3f-3def-4663-b024-0560f374e5fb: pool-1-thread-2
>     processing character: -Processing string pause:
> 2f2a3e3f-3def-4663-b024-0560f374e5fb: pool-1-thread-2
>     processing character: 4Processing string pause:
> 2f2a3e3f-3def-4663-b024-0560f374e5fb: pool-1-thread-2
>
>
> *When code does not deadlocks, output is something like this:*
>
>     Forkjoin pool size: 3
>     Processing string pause: 8c8aeba8-fbf8-4d80-8fb7-bb97ccf05e68:
> ForkJoinPool.commonPool-worker-1
>     Processing string pause: a1311746-ba82-4541-9fbd-37d25857944d:
> ForkJoinPool.commonPool-worker-3
>     Processing string resume: a1311746-ba82-4541-9fbd-37d25857944d
>     Processing string pause: 4e5f6b79-bd48-4328-8d5c-252781bf9359:
> ForkJoinPool.commonPool-worker-2
>     Processing string resume: 4e5f6b79-bd48-4328-8d5c-252781bf9359
>     Processing string pause: 48ebca74-2fa9-486b-9467-8b40b3f2617f: main
>     Processing string resume: 8c8aeba8-fbf8-4d80-8fb7-bb97ccf05e68
>     Processing string resume: 48ebca74-2fa9-486b-9467-8b40b3f2617f
>     Wait of Future on Thread: ForkJoinPool.commonPool-worker-1
>     Wait of Future on Thread: ForkJoinPool.commonPool-worker-3
>     Wait of Future on Thread: ForkJoinPool.commonPool-worker-2
>     Wait of Future on Thread: main
>     processing character: cProcessing string pause:
> 4e5f6b79-bd48-4328-8d5c-252781bf9359: pool-1-thread-2
>     processing character: 7Processing string pause:
> 8c8aeba8-fbf8-4d80-8fb7-bb97ccf05e68: pool-1-thread-1
>     processing character: -Processing string pause:
> 8c8aeba8-fbf8-4d80-8fb7-bb97ccf05e68: pool-1-thread-1
>     processing character: -Processing string pause:
> 4e5f6b79-bd48-4328-8d5c-252781bf9359: pool-1-thread-2
>     processing character: 2Processing string pause:
> 4e5f6b79-bd48-4328-8d5c-252781bf9359: pool-1-thread-2
>     processing character: bProcessing string pause:
> 8c8aeba8-fbf8-4d80-8fb7-bb97ccf05e68: pool-1-thread-1
>     processing character: bProcessing string pause:
> 8c8aeba8-fbf8-4d80-8fb7-bb97ccf05e68: pool-1-thread-1
>     processing character: 5Processing string pause:
> 4e5f6b79-bd48-4328-8d5c-252781bf9359: pool-1-thread-2
>     processing character: 9Processing string pause:
> 8c8aeba8-fbf8-4d80-8fb7-bb97ccf05e68: pool-1-thread-1
>     processing character: 2Processing string pause:
> 4e5f6b79-bd48-4328-8d5c-252781bf9359: pool-1-thread-2
>     processing character: dProcessing string pause:
> 4e5f6b79-bd48-4328-8d5c-252781bf9359: pool-1-thread-2
>     processing character: fProcessing string pause:
> 8c8aeba8-fbf8-4d80-8fb7-bb97ccf05e68: pool-1-thread-1
>     processing character: 5Processing string pause:
> 4e5f6b79-bd48-4328-8d5c-252781bf9359: pool-1-thread-2
>     processing character: bProcessing string pause:
> 8c8aeba8-fbf8-4d80-8fb7-bb97ccf05e68: pool-1-thread-1
>     processing character: -Processing string pause:
> 8c8aeba8-fbf8-4d80-8fb7-bb97ccf05e68: pool-1-thread-1
>     processing character: -Processing string pause:
> 4e5f6b79-bd48-4328-8d5c-252781bf9359: pool-1-thread-2
>     processing character: 8Processing string pause:
> 8c8aeba8-fbf8-4d80-8fb7-bb97ccf05e68: pool-1-thread-1
>     processing character: 8Processing string pause:
> 4e5f6b79-bd48-4328-8d5c-252781bf9359: pool-1-thread-2
>     processing character: fProcessing string pause:
> 4e5f6b79-bd48-4328-8d5c-252781bf9359: pool-1-thread-2
>     processing character: 0Processing string pause:
> 8c8aeba8-fbf8-4d80-8fb7-bb97ccf05e68: pool-1-thread-1
>     processing character: 9Processing string pause:
> 4e5f6b79-bd48-4328-8d5c-252781bf9359: pool-1-thread-2
>     processing character: 5Processing string pause:
> 8c8aeba8-fbf8-4d80-8fb7-bb97ccf05e68: pool-1-thread-1
>     processing character: eProcessing string pause:
> 8c8aeba8-fbf8-4d80-8fb7-bb97ccf05e68: pool-1-thread-1
>     processing character: 3Processing string pause:
> 4e5f6b79-bd48-4328-8d5c-252781bf9359: pool-1-thread-2
>     processing character: 6Processing string pause:
> 8c8aeba8-fbf8-4d80-8fb7-bb97ccf05e68: pool-1-thread-1
>     processing character: 5Processing string pause:
> 4e5f6b79-bd48-4328-8d5c-252781bf9359: pool-1-thread-2
>     processing character: 8Processing string pause:
> 8c8aeba8-fbf8-4d80-8fb7-bb97ccf05e68: pool-1-thread-1
>     processing character: 9Processing string pause:
> 4e5f6b79-bd48-4328-8d5c-252781bf9359: pool-1-thread-2
>     processing character: 1Processing string pause:
> 4e5f6b79-bd48-4328-8d5c-252781bf9359: pool-1-thread-2
>     processing character: cProcessing string pause:
> 8c8aeba8-fbf8-4d80-8fb7-bb97ccf05e68: pool-1-thread-1
>     processing character: fProcessing string pause:
> 8c8aeba8-fbf8-4d80-8fb7-bb97ccf05e68: pool-1-thread-1
>     processing character: bProcessing string pause:
> 4e5f6b79-bd48-4328-8d5c-252781bf9359: pool-1-thread-2
>     processing character: 7Processing string pause:
> 4e5f6b79-bd48-4328-8d5c-252781bf9359: pool-1-thread-2
>     processing character: 7Processing string pause:
> 8c8aeba8-fbf8-4d80-8fb7-bb97ccf05e68: pool-1-thread-1
>     processing character: 8Processing string pause:
> 4e5f6b79-bd48-4328-8d5c-252781bf9359: pool-1-thread-2
>     processing character: cProcessing string pause:
> 8c8aeba8-fbf8-4d80-8fb7-bb97ccf05e68: pool-1-thread-1
>     processing character: 4Processing string pause:
> 4e5f6b79-bd48-4328-8d5c-252781bf9359: pool-1-thread-2
>     processing character: fProcessing string pause:
> 8c8aeba8-fbf8-4d80-8fb7-bb97ccf05e68: pool-1-thread-1
>     processing character: 8Processing string pause:
> 4e5f6b79-bd48-4328-8d5c-252781bf9359: pool-1-thread-2
>     processing character: 8Processing string pause:
> 8c8aeba8-fbf8-4d80-8fb7-bb97ccf05e68: pool-1-thread-1
>     processing character: bProcessing string pause:
> 4e5f6b79-bd48-4328-8d5c-252781bf9359: pool-1-thread-2
>     processing character: fProcessing string pause:
> 8c8aeba8-fbf8-4d80-8fb7-bb97ccf05e68: pool-1-thread-1
>     processing character: dProcessing string pause:
> 4e5f6b79-bd48-4328-8d5c-252781bf9359: pool-1-thread-2
>     processing character: bProcessing string pause:
> 8c8aeba8-fbf8-4d80-8fb7-bb97ccf05e68: pool-1-thread-1
>     processing character: 3Processing string pause:
> 4e5f6b79-bd48-4328-8d5c-252781bf9359: pool-1-thread-2
>     processing character: dProcessing string pause:
> 8c8aeba8-fbf8-4d80-8fb7-bb97ccf05e68: pool-1-thread-1
>     processing character: 2Processing string pause:
> 4e5f6b79-bd48-4328-8d5c-252781bf9359: pool-1-thread-2
>     processing character: 8Processing string pause:
> 8c8aeba8-fbf8-4d80-8fb7-bb97ccf05e68: pool-1-thread-1
>     processing character: 8Processing string pause:
> 4e5f6b79-bd48-4328-8d5c-252781bf9359: pool-1-thread-2
>     processing character: 0Processing string pause:
> 8c8aeba8-fbf8-4d80-8fb7-bb97ccf05e68: pool-1-thread-1
>     processing character: -Processing string pause:
> 4e5f6b79-bd48-4328-8d5c-252781bf9359: pool-1-thread-2
>     processing character: -Processing string pause:
> 8c8aeba8-fbf8-4d80-8fb7-bb97ccf05e68: pool-1-thread-1
>     processing character: 4Processing string pause:
> 4e5f6b79-bd48-4328-8d5c-252781bf9359: pool-1-thread-2
>     processing character: 4Processing string pause:
> 8c8aeba8-fbf8-4d80-8fb7-bb97ccf05e68: pool-1-thread-1
>     processing character: 6Processing string pause:
> 4e5f6b79-bd48-4328-8d5c-252781bf9359: pool-1-thread-2
>     processing character: eProcessing string pause:
> 8c8aeba8-fbf8-4d80-8fb7-bb97ccf05e68: pool-1-thread-1
>     processing character: bProcessing string pause:
> 4e5f6b79-bd48-4328-8d5c-252781bf9359: pool-1-thread-2
>     processing character: bProcessing string pause:
> 8c8aeba8-fbf8-4d80-8fb7-bb97ccf05e68: pool-1-thread-1
>     processing character: aProcessing string pause:
> 8c8aeba8-fbf8-4d80-8fb7-bb97ccf05e68: pool-1-thread-1
>     processing character: 7Processing string pause:
> 4e5f6b79-bd48-4328-8d5c-252781bf9359: pool-1-thread-2
>     processing character: 8Processing string pause:
> 8c8aeba8-fbf8-4d80-8fb7-bb97ccf05e68: pool-1-thread-1
>     processing character: 9Processing string pause:
> 4e5f6b79-bd48-4328-8d5c-252781bf9359: pool-1-thread-2
>     processing character: -Processing string pause:
> 4e5f6b79-bd48-4328-8d5c-252781bf9359: pool-1-thread-2
>     processing character: -Processing string pause:
> 8c8aeba8-fbf8-4d80-8fb7-bb97ccf05e68: pool-1-thread-1
>     processing character: 5Processing string pause:
> 4e5f6b79-bd48-4328-8d5c-252781bf9359: pool-1-thread-2
>     processing character: 8Processing string pause:
> 8c8aeba8-fbf8-4d80-8fb7-bb97ccf05e68: pool-1-thread-1
>     processing character: aProcessing string pause:
> 8c8aeba8-fbf8-4d80-8fb7-bb97ccf05e68: pool-1-thread-1
>     processing character: fProcessing string pause:
> 4e5f6b79-bd48-4328-8d5c-252781bf9359: pool-1-thread-2
>     processing character: 8Processing string pause:
> 8c8aeba8-fbf8-4d80-8fb7-bb97ccf05e68: pool-1-thread-1
>     processing character: 4Processing string pause:
> 4e5f6b79-bd48-4328-8d5c-252781bf9359: pool-1-thread-2
>     processing character: cProcessing string pause:
> 8c8aeba8-fbf8-4d80-8fb7-bb97ccf05e68: pool-1-thread-1
>     processing character: eProcessing string pause:
> 4e5f6b79-bd48-4328-8d5c-252781bf9359: pool-1-thread-2
>     Processing string pause: 58fa7ebd-6f4a-4ec0-8515-84148d870b0c:
> ForkJoinPool.commonPool-worker-1
>     Processing string resume: 58fa7ebd-6f4a-4ec0-8515-84148d870b0c
>     processing character: dProcessing string pause:
> a1311746-ba82-4541-9fbd-37d25857944d: pool-1-thread-1
>     Wait of Future on Thread: ForkJoinPool.commonPool-worker-1
>     Processing string pause: b941108a-c7a9-499e-b951-e94206ffa553:
> ForkJoinPool.commonPool-worker-2
>     Processing string resume: b941108a-c7a9-499e-b951-e94206ffa553
>     Wait of Future on Thread: ForkJoinPool.commonPool-worker-2
>     ....
>
> *Thread dump for the deadlocked run:*
>
>     "main" #1 prio=5 os_prio=31 tid=0x00007fba84002000 nid=0x1b03 waiting on
> condition [0x000070000b253000]
>        java.lang.Thread.State: WAITING (parking)
>     at sun.misc.Unsafe.park(Native Method)
>     - parking to wait for  <0x000000076d8bc370> (a
> java.util.concurrent.FutureTask)
>     at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>     at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:429)
>     at java.util.concurrent.FutureTask.get(FutureTask.java:191)
>     at com.amazon.ForkJoinTest.processString(ForkJoinTest.java:46)
>     at com.amazon.ForkJoinTest.lambda$main$0(ForkJoinTest.java:29)
>     at com.amazon.ForkJoinTest$$Lambda$1/806353501.accept(Unknown Source)
>     at
> java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
>     at
> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
>     at
> java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
>     at java.util.stream.ForEachOps$ForEachTask.compute(ForEachOps.java:291)
>     at
> java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:731)
>     at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
>     at java.util.concurrent.ForkJoinTask.doInvoke(ForkJoinTask.java:401)
>     at java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:734)
>     at
> java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:160)
>     at
> java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateParallel(ForEachOps.java:174)
>     at
> java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233)
>     at
> java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)
>     at
> java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:583)
>     at com.amazon.ForkJoinTest.main(ForkJoinTest.java:29)
>    
>    
>     "pool-1-thread-2" #15 daemon prio=5 os_prio=31 tid=0x00007fba831a0000
> nid=0x5703 in Object.wait() [0x000070000c58c000]
>        java.lang.Thread.State: WAITING (on object monitor)
>     at java.lang.Object.wait(Native Method)
>     - waiting on <0x000000076db792e8> (a
> java.util.stream.ForEachOps$ForEachTask)
>     at
> java.util.concurrent.ForkJoinTask.externalAwaitDone(ForkJoinTask.java:334)
>     - locked <0x000000076db792e8> (a
> java.util.stream.ForEachOps$ForEachTask)
>     at java.util.concurrent.ForkJoinTask.doInvoke(ForkJoinTask.java:405)
>     at java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:734)
>     at
> java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:160)
>     at
> java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateParallel(ForEachOps.java:174)
>     at
> java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233)
>     at
> java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)
>     at
> java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:583)
>     at com.amazon.ForkJoinTest.lambda$processString$2(ForkJoinTest.java:43)
>     at com.amazon.ForkJoinTest$$Lambda$2/581138573.run(Unknown Source)
>     at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>     at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>     at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>     at java.lang.Thread.run(Thread.java:748)
>    
>    
>     "pool-1-thread-1" #14 daemon prio=5 os_prio=31 tid=0x00007fba83199800
> nid=0x5503 in Object.wait() [0x000070000c489000]
>        java.lang.Thread.State: WAITING (on object monitor)
>     at java.lang.Object.wait(Native Method)
>     - waiting on <0x000000076dad5480> (a
> java.util.stream.ForEachOps$ForEachTask)
>     at
> java.util.concurrent.ForkJoinTask.externalAwaitDone(ForkJoinTask.java:334)
>     - locked <0x000000076dad5480> (a
> java.util.stream.ForEachOps$ForEachTask)
>     at java.util.concurrent.ForkJoinTask.doInvoke(ForkJoinTask.java:405)
>     at java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:734)
>     at
> java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:160)
>     at
> java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateParallel(ForEachOps.java:174)
>     at
> java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233)
>     at
> java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)
>     at
> java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:583)
>     at com.amazon.ForkJoinTest.lambda$processString$2(ForkJoinTest.java:43)
>     at com.amazon.ForkJoinTest$$Lambda$2/581138573.run(Unknown Source)
>     at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>     at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>     at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>     at java.lang.Thread.run(Thread.java:748)
>    
>    
>     "ForkJoinPool.commonPool-worker-3" #13 daemon prio=5 os_prio=31
> tid=0x00007fba83197000 nid=0x5303 waiting on condition [0x000070000c386000]
>        java.lang.Thread.State: WAITING (parking)
>     at sun.misc.Unsafe.park(Native Method)
>     - parking to wait for  <0x000000076da34ad0> (a
> java.util.concurrent.FutureTask)
>     at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>     at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:429)
>     at java.util.concurrent.FutureTask.get(FutureTask.java:191)
>     at com.amazon.ForkJoinTest.processString(ForkJoinTest.java:46)
>     at com.amazon.ForkJoinTest.lambda$main$0(ForkJoinTest.java:29)
>     at com.amazon.ForkJoinTest$$Lambda$1/806353501.accept(Unknown Source)
>     at
> java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
>     at
> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
>     at
> java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
>     at java.util.stream.ForEachOps$ForEachTask.compute(ForEachOps.java:291)
>     at
> java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:731)
>     at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
>     at
> java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
>     at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
>     at
> java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
>    
>    
>     "ForkJoinPool.commonPool-worker-2" #12 daemon prio=5 os_prio=31
> tid=0x00007fba83831000 nid=0x5103 waiting on condition [0x000070000c283000]
>        java.lang.Thread.State: WAITING (parking)
>     at sun.misc.Unsafe.park(Native Method)
>     - parking to wait for  <0x000000076d985fd0> (a
> java.util.concurrent.FutureTask)
>     at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>     at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:429)
>     at java.util.concurrent.FutureTask.get(FutureTask.java:191)
>     at com.amazon.ForkJoinTest.processString(ForkJoinTest.java:46)
>     at com.amazon.ForkJoinTest.lambda$main$0(ForkJoinTest.java:29)
>     at com.amazon.ForkJoinTest$$Lambda$1/806353501.accept(Unknown Source)
>     at
> java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
>     at
> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
>     at
> java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
>     at java.util.stream.ForEachOps$ForEachTask.compute(ForEachOps.java:291)
>     at
> java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:731)
>     at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
>     at
> java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
>     at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
>     at
> java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
>    
>    
>     "ForkJoinPool.commonPool-worker-1" #11 daemon prio=5 os_prio=31
> tid=0x00007fba8395c000 nid=0x4f03 waiting on condition [0x000070000c180000]
>        java.lang.Thread.State: WAITING (parking)
>     at sun.misc.Unsafe.park(Native Method)
>     - parking to wait for  <0x000000076d8da2d0> (a
> java.util.concurrent.FutureTask)
>     at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>     at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:429)
>     at java.util.concurrent.FutureTask.get(FutureTask.java:191)
>     at com.amazon.ForkJoinTest.processString(ForkJoinTest.java:46)
>     at com.amazon.ForkJoinTest.lambda$main$0(ForkJoinTest.java:29)
>     at com.amazon.ForkJoinTest$$Lambda$1/806353501.accept(Unknown Source)
>     at
> java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
>     at
> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
>     at
> java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
>     at java.util.stream.ForEachOps$ForEachTask.compute(ForEachOps.java:291)
>     at
> java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:731)
>     at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
>     at
> java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
>     at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
>     at
> java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
>    
>    
>     "Monitor Ctrl-Break" #5 daemon prio=5 os_prio=31 tid=0x00007fba840c9000
> nid=0x4303 runnable [0x000070000bb6e000]
>        java.lang.Thread.State: RUNNABLE
>     at java.net.SocketInputStream.socketRead0(Native Method)
>     at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
>     at java.net.SocketInputStream.read(SocketInputStream.java:171)
>     at java.net.SocketInputStream.read(SocketInputStream.java:141)
>     at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284)
>     at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326)
>     at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
>     - locked <0x000000076eb0e968> (a java.io.InputStreamReader)
>     at java.io.InputStreamReader.read(InputStreamReader.java:184)
>     at java.io.BufferedReader.fill(BufferedReader.java:161)
>     at java.io.BufferedReader.readLine(BufferedReader.java:324)
>     - locked <0x000000076eb0e968> (a java.io.InputStreamReader)
>     at java.io.BufferedReader.readLine(BufferedReader.java:389)
>     at
> com.intellij.rt.execution.application.AppMainV2$1.run(AppMainV2.java:64
>    
>
>    
>
>
>
> --
> Sent from: http://jsr166-concurrency.10961.n7.nabble.com/
> _______________________________________________
> Concurrency-interest mailing list
> [hidden email]
> http://cs.oswego.edu/mailman/listinfo/concurrency-interest
>  
_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: Using custom ThreadPool inside parallelstream causing deadlock

JSR166 Concurrency mailing list
Heinz,

I know and understand using the ForkJoinPool would solve the porblem. But my
concern here is why my code is getting deadlocked as I see two threads from
Custom Fixed ThreadPool are free to take care of inner parallelStream. And
why it is not always getting deadlocked.



--
Sent from: http://jsr166-concurrency.10961.n7.nabble.com/
_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: Using custom ThreadPool inside parallelstream causing deadlock

JSR166 Concurrency mailing list
Hi Akhil,

you might be onto something.  I would also expect that the two threads
should just carry on doing the work themselves if the common FJP is
occupied.  All my experiments have shown such behaviour.

I've reduced the code a bit to show this effect more consistently and to
remove as many distractions as possible:

import java.util.concurrent.*;
import java.util.stream.*;

public class ForkJoinTestBasic {
  static final ExecutorService pool = Executors.newFixedThreadPool(2);

  public static void main(String... args) throws InterruptedException {
    ForkJoinPool common = ForkJoinPool.commonPool();
    System.out.println("Forkjoin pool size: " +
ForkJoinPool.getCommonPoolParallelism());
    parallelStream().forEach(val -> process());
    pool.shutdown();
  }

  private static void process() {
    try {
      System.out.println("Processing: " + Thread.currentThread().getName());
      Runnable updateTask = () -> parallelStream().forEach(val -> { });
      Future<?> future = pool.submit(updateTask);
      System.out.println("Waiting: " + Thread.currentThread().getName());
      future.get(10, TimeUnit.HOURS);
    } catch (TimeoutException e) {
      System.err.println("Threads timed out");
    } catch (InterruptedException e) {
      e.printStackTrace();
    } catch (ExecutionException e) {
      throw new IllegalStateException(e.getCause());
    }
  }

  private static IntStream parallelStream() {
    return IntStream.range(0, Runtime.getRuntime().availableProcessors()
* 8)
        .parallel();
  }
}

Both threads are stuck waiting on "externalAwaitDone()" after calling
invoke().

  java.lang.Thread.State: WAITING
      at java.lang.Object.wait(Object.java:-1)
      at
java.util.concurrent.ForkJoinTask.externalAwaitDone(ForkJoinTask.java:330)
      at java.util.concurrent.ForkJoinTask.doInvoke(ForkJoinTask.java:412)
      at java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:736)
      at
java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:159)
      at
java.util.stream.ForEachOps$ForEachOp$OfInt.evaluateParallel(ForEachOps.java:188)
      at
java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233)
      at java.util.stream.IntPipeline.forEach(IntPipeline.java:439)
      at java.util.stream.IntPipeline$Head.forEach(IntPipeline.java:596)
      at ForkJoinTestBasic.lambda$process$2(ForkJoinTestBasic.java:17)

In both cases, the state of the ForkJoinTask is 65536, meaning SIGNAL.

Regards

Heinz
--
Dr Heinz M. Kabutz (PhD CompSci)
Author of "The Java™ Specialists' Newsletter" - www.javaspecialists.eu
Java Champion - www.javachampions.org
JavaOne Rock Star Speaker
Tel: +30 69 75 595 262
Skype: kabutz



akhilpratap1991 via Concurrency-interest wrote:

> Heinz,
>
> I know and understand using the ForkJoinPool would solve the porblem. But my
> concern here is why my code is getting deadlocked as I see two threads from
> Custom Fixed ThreadPool are free to take care of inner parallelStream. And
> why it is not always getting deadlocked.
>
>
>
> --
> Sent from: http://jsr166-concurrency.10961.n7.nabble.com/
> _______________________________________________
> Concurrency-interest mailing list
> [hidden email]
> http://cs.oswego.edu/mailman/listinfo/concurrency-interest
>  
_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: Using custom ThreadPool inside parallelstream causing deadlock

JSR166 Concurrency mailing list
In reply to this post by JSR166 Concurrency mailing list
Hi Akhil,

I've been looking at this off-and-on over the last few days, whenever I
had a few minutes.  It is indeed interesting.  Of course we should not
block in tasks within a parallel stream.  But it should still work, and
usually does.  On my 1-2-2 machine, it fails rarely in Java 8 - about
1/1000.  Java 9 and 10 is more often, about a dozen times per 1000.  And
then it seems to get even more often since Java 11.

Obviously the FJP would have changed between Java 8 and 9 with the
addition of VarHandles and fences and opaque.

I have not found the "smoking gun" yet, but am going to look at this
particular problem in my webinar tomorrow at 16:00 UTC -
https://www.javaspecialists.eu/webinars

We have some very smart people joining the webinar and perhaps we'll
figure it out collectively.

Regards

Heinz
--
Dr Heinz M. Kabutz (PhD CompSci)
Author of "The Java™ Specialists' Newsletter" - www.javaspecialists.eu
Java Champion - www.javachampions.org
JavaOne Rock Star Speaker
Tel: +30 69 75 595 262
Skype: kabutz



akhilpratap1991 via Concurrency-interest wrote:

> Heinz,
>
> I know and understand using the ForkJoinPool would solve the porblem. But my
> concern here is why my code is getting deadlocked as I see two threads from
> Custom Fixed ThreadPool are free to take care of inner parallelStream. And
> why it is not always getting deadlocked.
>
>
>
> --
> Sent from: http://jsr166-concurrency.10961.n7.nabble.com/
> _______________________________________________
> Concurrency-interest mailing list
> [hidden email]
> http://cs.oswego.edu/mailman/listinfo/concurrency-interest
>  
_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: Using custom ThreadPool inside parallelstream causing deadlock

JSR166 Concurrency mailing list
One socket, two cores, 2 hyperthreads per core: 1-2-2 :-)

My server is a 2-4-1 machine.

Well if there are no common threads available, then usually the submitting work does the work by itself.  You can easily verify this, even in JShell:

// Single threaded - only the main thread
jshell> IntStream.range(0, 1000).mapToObj(i -> Thread.currentThread()).collect(Collectors.toSet())
$1 ==> [Thread[main,5,main]]

// parallel - the main thread and the three common pool worker threads
jshell> IntStream.range(0, 1000).parallel().mapToObj(i -> Thread.currentThread()).collect(Collectors.toSet())
$2 ==> [Thread[ForkJoinPool.commonPool-worker-3,5,main], Thread[main,5,main], Thread[ForkJoinPool.commonPool-worker-5,5,main], Thread[ForkJoinPool.commonPool-worker-7,5,main]]

// now we block up all the common pool threads
jshell> new Thread(() -> IntStream.range(0, 1000).parallel().forEach(i -> java.util.concurrent.locks.LockSupport.park())).start()

// and now when we go parallel, it still "works", but uses the submitting thread
jshell> IntStream.range(0, 1000).parallel().mapToObj(i -> Thread.currentThread()).collect(Collectors.toSet())
$4 ==> [Thread[main,5,main]]


Regards

Heinz
-- 
Dr Heinz M. Kabutz (PhD CompSci)
Author of "The Java™ Specialists' Newsletter" - www.javaspecialists.eu
Java Champion - www.javachampions.org
JavaOne Rock Star Speaker
Tel: +30 69 75 595 262
Skype: kabutz


Olivier Peyrusse wrote:
Hello, 

I agree with Heinz's first analysis. You have a resource deadlock because the common pool, as any FJP, cannot detect that you are blocking its threads doing Future#get. 
The spare threads in your custom thread pool cannot help the FJP to progress. They can only submit more tasks that won't be executed. 

The most suprising is that it often succeeds. Though I haven't run Heinz's sample yet, I suspect that it is a race condition. All tasks are submitted in a shared work queue of the fjp and randomly stolen by its workers. My guess is that it blocks on the rare occasions where all "outer" tasks are started before any "inner" task start. 

Btw, @Heinz, what do you mean by your 1-2-2 machine? :-$

Cheers


Le mer. 31 juil. 2019 à 21:37, Dr Heinz M. Kabutz via Concurrency-interest <[hidden email]> a écrit :
Hi Akhil,

I've been looking at this off-and-on over the last few days, whenever I
had a few minutes.  It is indeed interesting.  Of course we should not
block in tasks within a parallel stream.  But it should still work, and
usually does.  On my 1-2-2 machine, it fails rarely in Java 8 - about
1/1000.  Java 9 and 10 is more often, about a dozen times per 1000.  And
then it seems to get even more often since Java 11.

Obviously the FJP would have changed between Java 8 and 9 with the
addition of VarHandles and fences and opaque.

I have not found the "smoking gun" yet, but am going to look at this
particular problem in my webinar tomorrow at 16:00 UTC -
https://www.javaspecialists.eu/webinars

We have some very smart people joining the webinar and perhaps we'll
figure it out collectively.

Regards

Heinz
--
Dr Heinz M. Kabutz (PhD CompSci)
Author of "The Java™ Specialists' Newsletter" - www.javaspecialists.eu
Java Champion - www.javachampions.org
JavaOne Rock Star Speaker
Tel: +30 69 75 595 262
Skype: kabutz



akhilpratap1991 via Concurrency-interest wrote:
> Heinz,
>
> I know and understand using the ForkJoinPool would solve the porblem. But my
> concern here is why my code is getting deadlocked as I see two threads from
> Custom Fixed ThreadPool are free to take care of inner parallelStream. And
> why it is not always getting deadlocked.
>
>
>
> --
> Sent from: http://jsr166-concurrency.10961.n7.nabble.com/
> _______________________________________________
> Concurrency-interest mailing list
> [hidden email]
> http://cs.oswego.edu/mailman/listinfo/concurrency-interest
>   
_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: Using custom ThreadPool inside parallelstream causing deadlock

JSR166 Concurrency mailing list
Hi Dr. Heinz,

On 8/1/19 3:22 PM, Dr Heinz M. Kabutz via Concurrency-interest wrote:
One socket, two cores, 2 hyperthreads per core: 1-2-2 :-)

My server is a 2-4-1 machine.

Well if there are no common threads available, then usually the submitting work does the work by itself.  You can easily verify this, even in JShell:

// Single threaded - only the main thread
jshell> IntStream.range(0, 1000).mapToObj(i -> Thread.currentThread()).collect(Collectors.toSet())
$1 ==> [Thread[main,5,main]]

// parallel - the main thread and the three common pool worker threads
jshell> IntStream.range(0, 1000).parallel().mapToObj(i -> Thread.currentThread()).collect(Collectors.toSet())
$2 ==> [Thread[ForkJoinPool.commonPool-worker-3,5,main], Thread[main,5,main], Thread[ForkJoinPool.commonPool-worker-5,5,main], Thread[ForkJoinPool.commonPool-worker-7,5,main]]

// now we block up all the common pool threads
jshell> new Thread(() -> IntStream.range(0, 1000).parallel().forEach(i -> java.util.concurrent.locks.LockSupport.park())).start()

// and now when we go parallel, it still "works", but uses the submitting thread
jshell> IntStream.range(0, 1000).parallel().mapToObj(i -> Thread.currentThread()).collect(Collectors.toSet())
$4 ==> [Thread[main,5,main]]


I tried your simplified example on my PC (1-4-2) and couldn't get it to deadlock. I then tried with java.util.concurrent.ForkJoinPool.common.parallelism=3 (which would be the default on your laptop) and immediately deadlocked it. Here's what I think happens:

- common FJ pool uses up all its threads to execute tasks in the outer parallel stream.
- those tasks execute the process() method that 1st submits updateTask(s) to the fixed pool (non-blocking as the pool has an unbounded queue) and then await for tasks to be finished with future.get() which blocks
- the tasks executing in fixed pool spawn inner parallel stream tasks using ForkJoinTask.invoke() (the mechanism used by parallel stream to submit the root task). This method calls FJT.doInvoke():

    private int doInvoke() {
        int s; Thread t; ForkJoinWorkerThread wt;
        return (s = doExec()) < 0 ? s :
            ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
            (wt = (ForkJoinWorkerThread)t).pool.
            awaitJoin(wt.workQueue, this, 0L) :
            externalAwaitDone();
    }

...this method 1st calls doExec():

    final int doExec() {
        int s; boolean completed;
        if ((s = status) >= 0) {
            try {
                completed = exec();
            } catch (Throwable rex) {
                completed = false;
                s = setExceptionalCompletion(rex);
            }
            if (completed)
                s = setDone();
        }
        return s;
    }

...and since from the stack traces of submitting threads (fixed pool threads) we see that externalAwaitDone() is called afterwards, the doExec() must have called exec() that returned false.

We know that parallel stream is implemented with special subclass of ForkJoinTask called CountedCompleter. Here's the CountedCompleter.exec() implementation:

    protected final boolean exec() {
        compute();
        return false;
    }

...which supports the above claim.

So far, the guts of the CountedCompleter.compute() has executed, and the fixed-pool thread blocks in externalAwaitDone(). Let's look at the ForEachTask.compute() implementation that has executed:

        // Similar to AbstractTask but doesn't need to track child tasks
        public void compute() {
            Spliterator<S> rightSplit = spliterator, leftSplit;
            long sizeEstimate = rightSplit.estimateSize(), sizeThreshold;
            if ((sizeThreshold = targetSize) == 0L)
                targetSize = sizeThreshold = AbstractTask.suggestTargetSize(sizeEstimate);
            boolean isShortCircuit = StreamOpFlag.SHORT_CIRCUIT.isKnown(helper.getStreamAndOpFlags());
            boolean forkRight = false;
            Sink<S> taskSink = sink;
            ForEachTask<S, T> task = this;
            while (!isShortCircuit || !taskSink.cancellationRequested()) {
                if (sizeEstimate <= sizeThreshold ||
                    (leftSplit = rightSplit.trySplit()) == null) {
                    task.helper.copyInto(taskSink, rightSplit);
                    break;
                }
                ForEachTask<S, T> leftTask = new ForEachTask<>(task, leftSplit);
                task.addToPendingCount(1);
                ForEachTask<S, T> taskToFork;
                if (forkRight) {
                    forkRight = false;
                    rightSplit = leftSplit;
                    taskToFork = task;
                    task = leftTask;
                }
                else {
                    forkRight = true;
                    taskToFork = leftTask;
                }
                taskToFork.fork();
                sizeEstimate = rightSplit.estimateSize();
            }
            task.spliterator = null;
            task.propagateCompletion();
        }

The isShortCircut flag is evaluated to 'false' (checked with the debugger, helper.combinedFlags == 101).

There is a 'forkRight' flag that initially starts at false and then alternates in the while loop.

There is a 'task' local variable that initially starts with 'this'.

Observing the state of ForEackTask instance while the thread is blocked in its externalAwaitDone() I can see the 'targetSize' being 5, while sizeEstimate is evaluated to 4, which means that this is now a "leaf" task. But has it actually executed its split yet?

The Task's compute() method has finished, but that doesn't mean it has already executed its "leaf" split and sent the elements to the lambda Consumer. Observing its state while the thread is blocked in externalAwaitDone(), I can see the 'completer' being null. Which means that ForEachTask was constructed with the following constructor:

        ForEachTask(PipelineHelper<T> helper,
                    Spliterator<S> spliterator,
                    Sink<S> sink) {
            super(null);
            this.sink = sink;
            this.helper = helper;
            this.spliterator = spliterator;
            this.targetSize = 0L;
        }

as the root task which initializes the 'pending' count to 0. But I see pending count being 2. So, it must have been incremented afterwards. It must have spawned-off some children. A child is spawned by creating it and then calling fork() either on the child or the parent !!! (depending on the value of forkRight flag which alternates). Since I can also see the state of task still holding its spliterator, it means that the final statements of compute():

            task.spliterator = null;
            task.propagateCompletion();

...have not been called on this task yet, but on a child task. The fork must have been called on this task then.

ForkJoinTask.fork() javadocs say:

     * ... While
     * it is not necessarily enforced, it is a usage error to fork a
     * task more than once unless it has completed and been
     * reinitialized.

Does this also mean that fork() should not be called on a task that is in the middle of executing its execute() method? I don't know.

Since we are executing in non-ForkJoinThread, fork() amounts to commonFJPool.externalPush(task). This method just pushes the task to a (common)FJPool queue and eventually calls FJPool.signalWork(), which I assume, does not add a worker thread if there are already max. threads in the pool even if all threads are occupied. So the task we are calling externalAwaitDone() on has been swept into the FJPool queue in hope that some JFPool thread will pick it up and (re)execute it. But all FJPool threads are busy waiting for results from fixed-pool tasks.

Before the patch for 8020040, the ForEachTask.compute() looked like this:

        public void compute() {
            boolean isShortCircuit = StreamOpFlag.SHORT_CIRCUIT.isKnown(helper.getStreamAndOpFlags());
            while (true) {
                if (isShortCircuit && sink.cancellationRequested()) {
                    propagateCompletion();
                    spliterator = null;
                    return;
                }

                Spliterator<S> split;
                if (!AbstractTask.suggestSplit(spliterator, targetSize)
                    || (split = spliterator.trySplit()) == null) {
                    helper.copyInto(sink, spliterator);
                    propagateCompletion();
                    spliterator = null;
                    return;
                }
                else {
                    addToPendingCount(1);
                    new ForEachTask<>(this, split).fork();
                }
            }
        }

Which guaranteed that only child tasks were forked-off while the root task's compute() always completed with clearing the spliterator and propagateCompletion() call in the submitting thread. Would that variant prevent the deadlock? I don't think as the root task would still have a non-zero pending count waiting for child tasks to decrement it and eventually complete it, while child tasks would be sitting in the FJPool forever.

If all this is true, then this can be called resource exhaustion. If external threads are executing FJTask.fork() while the FJPool is already fully occupied by other tasks, those new forked tasks will only be queued, not executed in the external threads.

Could this be improved? Hm...

Regards, Peter

 

Regards

Heinz
-- 
Dr Heinz M. Kabutz (PhD CompSci)
Author of "The Java™ Specialists' Newsletter" - www.javaspecialists.eu
Java Champion - www.javachampions.org
JavaOne Rock Star Speaker
Tel: +30 69 75 595 262
Skype: kabutz


Olivier Peyrusse wrote:
Hello, 

I agree with Heinz's first analysis. You have a resource deadlock because the common pool, as any FJP, cannot detect that you are blocking its threads doing Future#get. 
The spare threads in your custom thread pool cannot help the FJP to progress. They can only submit more tasks that won't be executed. 

The most suprising is that it often succeeds. Though I haven't run Heinz's sample yet, I suspect that it is a race condition. All tasks are submitted in a shared work queue of the fjp and randomly stolen by its workers. My guess is that it blocks on the rare occasions where all "outer" tasks are started before any "inner" task start. 

Btw, @Heinz, what do you mean by your 1-2-2 machine? :-$

Cheers


Le mer. 31 juil. 2019 à 21:37, Dr Heinz M. Kabutz via Concurrency-interest <[hidden email]> a écrit :
Hi Akhil,

I've been looking at this off-and-on over the last few days, whenever I
had a few minutes.  It is indeed interesting.  Of course we should not
block in tasks within a parallel stream.  But it should still work, and
usually does.  On my 1-2-2 machine, it fails rarely in Java 8 - about
1/1000.  Java 9 and 10 is more often, about a dozen times per 1000.  And
then it seems to get even more often since Java 11.

Obviously the FJP would have changed between Java 8 and 9 with the
addition of VarHandles and fences and opaque.

I have not found the "smoking gun" yet, but am going to look at this
particular problem in my webinar tomorrow at 16:00 UTC -
https://www.javaspecialists.eu/webinars

We have some very smart people joining the webinar and perhaps we'll
figure it out collectively.

Regards

Heinz
--
Dr Heinz M. Kabutz (PhD CompSci)
Author of "The Java™ Specialists' Newsletter" - www.javaspecialists.eu
Java Champion - www.javachampions.org
JavaOne Rock Star Speaker
Tel: +30 69 75 595 262
Skype: kabutz



akhilpratap1991 via Concurrency-interest wrote:
> Heinz,
>
> I know and understand using the ForkJoinPool would solve the porblem. But my
> concern here is why my code is getting deadlocked as I see two threads from
> Custom Fixed ThreadPool are free to take care of inner parallelStream. And
> why it is not always getting deadlocked.
>
>
>
> --
> Sent from: http://jsr166-concurrency.10961.n7.nabble.com/
> _______________________________________________
> Concurrency-interest mailing list
> [hidden email]
> http://cs.oswego.edu/mailman/listinfo/concurrency-interest
>   
_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest


_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: Using custom ThreadPool inside parallelstream causing deadlock

JSR166 Concurrency mailing list
Hi Peter,

in our webinar yesterday afternoon, Henri Tremblay suggested we try using "Ordered".  When asked why this could possibly help, he mentioned that this uses a different task.  So we tried it out and indeed, I could not get it to fail.  We also were looking at the SHORT_CIRCUIT, wondering if that was the cause.

How to fix it?  That's a good question :-)
Regards

Heinz
-- 
Dr Heinz M. Kabutz (PhD CompSci)
Author of "The Java™ Specialists' Newsletter" - www.javaspecialists.eu
Java Champion - www.javachampions.org
JavaOne Rock Star Speaker
Tel: +30 69 75 595 262
Skype: kabutz


Peter Levart wrote:
Hi Dr. Heinz,

On 8/1/19 3:22 PM, Dr Heinz M. Kabutz via Concurrency-interest wrote:
One socket, two cores, 2 hyperthreads per core: 1-2-2 :-)

My server is a 2-4-1 machine.

Well if there are no common threads available, then usually the submitting work does the work by itself.  You can easily verify this, even in JShell:

// Single threaded - only the main thread
jshell> IntStream.range(0, 1000).mapToObj(i -> Thread.currentThread()).collect(Collectors.toSet())
$1 ==> [Thread[main,5,main]]

// parallel - the main thread and the three common pool worker threads
jshell> IntStream.range(0, 1000).parallel().mapToObj(i -> Thread.currentThread()).collect(Collectors.toSet())
$2 ==> [Thread[ForkJoinPool.commonPool-worker-3,5,main], Thread[main,5,main], Thread[ForkJoinPool.commonPool-worker-5,5,main], Thread[ForkJoinPool.commonPool-worker-7,5,main]]

// now we block up all the common pool threads
jshell> new Thread(() -> IntStream.range(0, 1000).parallel().forEach(i -> java.util.concurrent.locks.LockSupport.park())).start()

// and now when we go parallel, it still "works", but uses the submitting thread
jshell> IntStream.range(0, 1000).parallel().mapToObj(i -> Thread.currentThread()).collect(Collectors.toSet())
$4 ==> [Thread[main,5,main]]


I tried your simplified example on my PC (1-4-2) and couldn't get it to deadlock. I then tried with java.util.concurrent.ForkJoinPool.common.parallelism=3 (which would be the default on your laptop) and immediately deadlocked it. Here's what I think happens:

- common FJ pool uses up all its threads to execute tasks in the outer parallel stream.
- those tasks execute the process() method that 1st submits updateTask(s) to the fixed pool (non-blocking as the pool has an unbounded queue) and then await for tasks to be finished with future.get() which blocks
- the tasks executing in fixed pool spawn inner parallel stream tasks using ForkJoinTask.invoke() (the mechanism used by parallel stream to submit the root task). This method calls FJT.doInvoke():

    private int doInvoke() {
        int s; Thread t; ForkJoinWorkerThread wt;
        return (s = doExec()) < 0 ? s :
            ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
            (wt = (ForkJoinWorkerThread)t).pool.
            awaitJoin(wt.workQueue, this, 0L) :
            externalAwaitDone();
    }

...this method 1st calls doExec():

    final int doExec() {
        int s; boolean completed;
        if ((s = status) >= 0) {
            try {
                completed = exec();
            } catch (Throwable rex) {
                completed = false;
                s = setExceptionalCompletion(rex);
            }
            if (completed)
                s = setDone();
        }
        return s;
    }

...and since from the stack traces of submitting threads (fixed pool threads) we see that externalAwaitDone() is called afterwards, the doExec() must have called exec() that returned false.

We know that parallel stream is implemented with special subclass of ForkJoinTask called CountedCompleter. Here's the CountedCompleter.exec() implementation:

    protected final boolean exec() {
        compute();
        return false;
    }

...which supports the above claim.

So far, the guts of the CountedCompleter.compute() has executed, and the fixed-pool thread blocks in externalAwaitDone(). Let's look at the ForEachTask.compute() implementation that has executed:

        // Similar to AbstractTask but doesn't need to track child tasks
        public void compute() {
            Spliterator<S> rightSplit = spliterator, leftSplit;
            long sizeEstimate = rightSplit.estimateSize(), sizeThreshold;
            if ((sizeThreshold = targetSize) == 0L)
                targetSize = sizeThreshold = AbstractTask.suggestTargetSize(sizeEstimate);
            boolean isShortCircuit = StreamOpFlag.SHORT_CIRCUIT.isKnown(helper.getStreamAndOpFlags());
            boolean forkRight = false;
            Sink<S> taskSink = sink;
            ForEachTask<S, T> task = this;
            while (!isShortCircuit || !taskSink.cancellationRequested()) {
                if (sizeEstimate <= sizeThreshold ||
                    (leftSplit = rightSplit.trySplit()) == null) {
                    task.helper.copyInto(taskSink, rightSplit);
                    break;
                }
                ForEachTask<S, T> leftTask = new ForEachTask<>(task, leftSplit);
                task.addToPendingCount(1);
                ForEachTask<S, T> taskToFork;
                if (forkRight) {
                    forkRight = false;
                    rightSplit = leftSplit;
                    taskToFork = task;
                    task = leftTask;
                }
                else {
                    forkRight = true;
                    taskToFork = leftTask;
                }
                taskToFork.fork();
                sizeEstimate = rightSplit.estimateSize();
            }
            task.spliterator = null;
            task.propagateCompletion();
        }

The isShortCircut flag is evaluated to 'false' (checked with the debugger, helper.combinedFlags == 101).

There is a 'forkRight' flag that initially starts at false and then alternates in the while loop.

There is a 'task' local variable that initially starts with 'this'.

Observing the state of ForEackTask instance while the thread is blocked in its externalAwaitDone() I can see the 'targetSize' being 5, while sizeEstimate is evaluated to 4, which means that this is now a "leaf" task. But has it actually executed its split yet?

The Task's compute() method has finished, but that doesn't mean it has already executed its "leaf" split and sent the elements to the lambda Consumer. Observing its state while the thread is blocked in externalAwaitDone(), I can see the 'completer' being null. Which means that ForEachTask was constructed with the following constructor:

        ForEachTask(PipelineHelper<T> helper,
                    Spliterator<S> spliterator,
                    Sink<S> sink) {
            super(null);
            this.sink = sink;
            this.helper = helper;
            this.spliterator = spliterator;
            this.targetSize = 0L;
        }

as the root task which initializes the 'pending' count to 0. But I see pending count being 2. So, it must have been incremented afterwards. It must have spawned-off some children. A child is spawned by creating it and then calling fork() either on the child or the parent !!! (depending on the value of forkRight flag which alternates). Since I can also see the state of task still holding its spliterator, it means that the final statements of compute():

            task.spliterator = null;
            task.propagateCompletion();

...have not been called on this task yet, but on a child task. The fork must have been called on this task then.

ForkJoinTask.fork() javadocs say:

     * ... While
     * it is not necessarily enforced, it is a usage error to fork a
     * task more than once unless it has completed and been
     * reinitialized.

Does this also mean that fork() should not be called on a task that is in the middle of executing its execute() method? I don't know.

Since we are executing in non-ForkJoinThread, fork() amounts to commonFJPool.externalPush(task). This method just pushes the task to a (common)FJPool queue and eventually calls FJPool.signalWork(), which I assume, does not add a worker thread if there are already max. threads in the pool even if all threads are occupied. So the task we are calling externalAwaitDone() on has been swept into the FJPool queue in hope that some JFPool thread will pick it up and (re)execute it. But all FJPool threads are busy waiting for results from fixed-pool tasks.

Before the patch for 8020040, the ForEachTask.compute() looked like this:

        public void compute() {
            boolean isShortCircuit = StreamOpFlag.SHORT_CIRCUIT.isKnown(helper.getStreamAndOpFlags());
            while (true) {
                if (isShortCircuit && sink.cancellationRequested()) {
                    propagateCompletion();
                    spliterator = null;
                    return;
                }

                Spliterator<S> split;
                if (!AbstractTask.suggestSplit(spliterator, targetSize)
                    || (split = spliterator.trySplit()) == null) {
                    helper.copyInto(sink, spliterator);
                    propagateCompletion();
                    spliterator = null;
                    return;
                }
                else {
                    addToPendingCount(1);
                    new ForEachTask<>(this, split).fork();
                }
            }
        }

Which guaranteed that only child tasks were forked-off while the root task's compute() always completed with clearing the spliterator and propagateCompletion() call in the submitting thread. Would that variant prevent the deadlock? I don't think as the root task would still have a non-zero pending count waiting for child tasks to decrement it and eventually complete it, while child tasks would be sitting in the FJPool forever.

If all this is true, then this can be called resource exhaustion. If external threads are executing FJTask.fork() while the FJPool is already fully occupied by other tasks, those new forked tasks will only be queued, not executed in the external threads.

Could this be improved? Hm...

Regards, Peter

 

Regards

Heinz
-- 
Dr Heinz M. Kabutz (PhD CompSci)
Author of "The Java™ Specialists' Newsletter" - www.javaspecialists.eu
Java Champion - www.javachampions.org
JavaOne Rock Star Speaker
Tel: +30 69 75 595 262
Skype: kabutz
    


Olivier Peyrusse wrote:
Hello, 

I agree with Heinz's first analysis. You have a resource deadlock because the common pool, as any FJP, cannot detect that you are blocking its threads doing Future#get. 
The spare threads in your custom thread pool cannot help the FJP to progress. They can only submit more tasks that won't be executed. 

The most suprising is that it often succeeds. Though I haven't run Heinz's sample yet, I suspect that it is a race condition. All tasks are submitted in a shared work queue of the fjp and randomly stolen by its workers. My guess is that it blocks on the rare occasions where all "outer" tasks are started before any "inner" task start. 

Btw, @Heinz, what do you mean by your 1-2-2 machine? :-$

Cheers


Le mer. 31 juil. 2019 à 21:37, Dr Heinz M. Kabutz via Concurrency-interest <[hidden email]> a écrit :
Hi Akhil,

I've been looking at this off-and-on over the last few days, whenever I
had a few minutes.  It is indeed interesting.  Of course we should not
block in tasks within a parallel stream.  But it should still work, and
usually does.  On my 1-2-2 machine, it fails rarely in Java 8 - about
1/1000.  Java 9 and 10 is more often, about a dozen times per 1000.  And
then it seems to get even more often since Java 11.

Obviously the FJP would have changed between Java 8 and 9 with the
addition of VarHandles and fences and opaque.

I have not found the "smoking gun" yet, but am going to look at this
particular problem in my webinar tomorrow at 16:00 UTC -
https://www.javaspecialists.eu/webinars

We have some very smart people joining the webinar and perhaps we'll
figure it out collectively.

Regards

Heinz
--
Dr Heinz M. Kabutz (PhD CompSci)
Author of "The Java™ Specialists' Newsletter" - www.javaspecialists.eu
Java Champion - www.javachampions.org
JavaOne Rock Star Speaker
Tel: +30 69 75 595 262
Skype: kabutz



akhilpratap1991 via Concurrency-interest wrote:
> Heinz,
>
> I know and understand using the ForkJoinPool would solve the porblem. But my
> concern here is why my code is getting deadlocked as I see two threads from
> Custom Fixed ThreadPool are free to take care of inner parallelStream. And
> why it is not always getting deadlocked.
>
>
>
> --
> Sent from: http://jsr166-concurrency.10961.n7.nabble.com/
> _______________________________________________
> Concurrency-interest mailing list
> [hidden email]
> http://cs.oswego.edu/mailman/listinfo/concurrency-interest
>   
_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
    


_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest