Hello,
I have a need to supply some ThreadLocal-style context to a ForkJoinPool operation; namely, a parallel stream operation. I found no way to make this work without modifying java.util.concurrent. (If you have a way, I'm all ears!) The modification I made was quite modest. It's in the patch below. I'm wondering what folks think about making this change for real? This could be a solution to the long-stanging "task-local context" problem. It's effectively a one-line change that allows a ForkJoinPool thread to take some action whenever a task is forked by that thread. With this change, it becomes possible to create a ForkJoinPool with a thread factory whose threads wrap every forked task in order to propagate context from the forking thread (which already has the right context) into the forked task (which can run on another thread). Here is the patch. (It's based on Java 8, but the relevant code hasn't changed.) diff --git a/icepick/src/main/java/java/util/concurrent/ForkJoinTask.java b/icepick/src/main/java/java/util/concurrent/ForkJoinTask.java index fd4bc819..0af40c2d 100755 --- a/icepick/src/main/java/java/util/concurrent/ForkJoinTask.java +++ b/icepick/src/main/java/java/util/concurrent/ForkJoinTask.java @@ -697,7 +697,7 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable { public final ForkJoinTask<V> fork() { Thread t; if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) - ((ForkJoinWorkerThread)t).workQueue.push(this); + ((ForkJoinWorkerThread)t).forkTask(this); else ForkJoinPool.common.externalPush(this); return this; diff --git a/icepick/src/main/java/java/util/concurrent/ForkJoinWorkerThread.java b/icepick/src/main/java/java/util/concurrent/ForkJoinWorkerThread.java index d146f29f..b4abb8af 100755 --- a/icepick/src/main/java/java/util/concurrent/ForkJoinWorkerThread.java +++ b/icepick/src/main/java/java/util/concurrent/ForkJoinWorkerThread.java @@ -170,6 +170,10 @@ public class ForkJoinWorkerThread extends Thread { } } + protected <T> void forkTask(ForkJoinTask<T> task) { + workQueue.push(task); + } + /** * Erases ThreadLocals by nulling out Thread maps. */ The wrapping code looks like the following. This doesn't need to be in the class library; normal user code can do this. @Override protected <T> void forkTask(ForkJoinTask<T> task) { C inheritedContext = context.get(); // "context" is a ThreadLocal super.forkTask(ForkJoinTask.adapt(()->{ C oldContext = context.get(); context.set(inheritedContext); try { return task.invoke(); } finally { context.set(oldContext); } })); } -- Patrick Doyle [hidden email] _______________________________________________ Concurrency-interest mailing list [hidden email] http://cs.oswego.edu/mailman/listinfo/concurrency-interest |
Free forum by Nabble | Edit this page |