Tracking task context across fork operations

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

Tracking task context across fork operations

JSR166 Concurrency mailing list
Hello,

I have a need to supply some ThreadLocal-style context to a
ForkJoinPool operation; namely, a parallel stream operation.  I found
no way to make this work without modifying java.util.concurrent.  (If
you have a way, I'm all ears!)

The modification I made was quite modest.  It's in the patch below.
I'm wondering what folks think about making this change for real?
This could be a solution to the long-stanging "task-local context"
problem.  It's effectively a one-line change that allows a
ForkJoinPool thread to take some action whenever a task is forked by
that thread.

With this change, it becomes possible to create a ForkJoinPool with a
thread factory whose threads wrap every forked task in order to
propagate context from the forking thread (which already has the right
context) into the forked task (which can run on another thread).

Here is the patch.  (It's based on Java 8, but the relevant code
hasn't changed.)

diff --git a/icepick/src/main/java/java/util/concurrent/ForkJoinTask.java
b/icepick/src/main/java/java/util/concurrent/ForkJoinTask.java
index fd4bc819..0af40c2d 100755
--- a/icepick/src/main/java/java/util/concurrent/ForkJoinTask.java
+++ b/icepick/src/main/java/java/util/concurrent/ForkJoinTask.java
@@ -697,7 +697,7 @@ public abstract class ForkJoinTask<V> implements
Future<V>, Serializable {
     public final ForkJoinTask<V> fork() {
         Thread t;
         if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
-            ((ForkJoinWorkerThread)t).workQueue.push(this);
+            ((ForkJoinWorkerThread)t).forkTask(this);
         else
             ForkJoinPool.common.externalPush(this);
         return this;
diff --git a/icepick/src/main/java/java/util/concurrent/ForkJoinWorkerThread.java
b/icepick/src/main/java/java/util/concurrent/ForkJoinWorkerThread.java
index d146f29f..b4abb8af 100755
--- a/icepick/src/main/java/java/util/concurrent/ForkJoinWorkerThread.java
+++ b/icepick/src/main/java/java/util/concurrent/ForkJoinWorkerThread.java
@@ -170,6 +170,10 @@ public class ForkJoinWorkerThread extends Thread {
         }
     }

+    protected <T> void forkTask(ForkJoinTask<T> task) {
+       workQueue.push(task);
+    }
+
     /**
      * Erases ThreadLocals by nulling out Thread maps.
      */

The wrapping code looks like the following.  This doesn't need to be
in the class library; normal user code can do this.

@Override
protected <T> void forkTask(ForkJoinTask<T> task) {
   C inheritedContext = context.get();  // "context" is a ThreadLocal
   super.forkTask(ForkJoinTask.adapt(()->{
      C oldContext = context.get();
      context.set(inheritedContext);
      try {
        return task.invoke();
      } finally {
        context.set(oldContext);
      }
   }));
}

--
Patrick Doyle
[hidden email]
_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest