How to implement task-local variables (like thread local variables but for Fork/Join tasks)

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

How to implement task-local variables (like thread local variables but for Fork/Join tasks)

Per Mildner
I need something that should behave like a thread-local variable but for a tree of RecursiveAction tasks.

That is, a task inherits the task-local variables from the task that created it. Looking up a task-local variable is done by traversing the chain of parent tasks, looking for special task instances that holds task local values.

I currently implement this by letting all tasks know their parent (the “current task”) which is passed to their constructor. This way I can traverse the chain of parents towards the root of the task tree.

Keeping track of the current task is done by making all tasks set and restore themselves as the “current task” in a thread-local variable in their RecursiveAction.compute() but that seems wasteful since it is done also when the the task is running in the same thread as its parent task.

So, any ideas how to implement this efficiently?

Even better, is there a way to make something like this work with the default pool?

Regards,


Per Mildner
[hidden email]

(I sent this earlier but it never seemed to arrive at the list. Sorry if it gets duplicated.)


_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: How to implement task-local variables (like thread local variables but for Fork/Join tasks)

Per Mildner
Ping?

> On 09 Jun 2016, at 22:56, Per Mildner <[hidden email]> wrote:
>
> I need something that should behave like a thread-local variable but for a tree of RecursiveAction tasks.
>
> That is, a task inherits the task-local variables from the task that created it. Looking up a task-local variable is done by traversing the chain of parent tasks, looking for special task instances that holds task local values.
>
> I currently implement this by letting all tasks know their parent (the “current task”) which is passed to their constructor. This way I can traverse the chain of parents towards the root of the task tree.
>
> Keeping track of the current task is done by making all tasks set and restore themselves as the “current task” in a thread-local variable in their RecursiveAction.compute() but that seems wasteful since it is done also when the the task is running in the same thread as its parent task.
>
> So, any ideas how to implement this efficiently?
>
> Even better, is there a way to make something like this work with the default pool?
>
> Regards,
>
>
> Per Mildner
> [hidden email]
>
> (I sent this earlier but it never seemed to arrive at the list. Sorry if it gets duplicated.)
>
>

Per Mildner                                     [hidden email]
Swedish Institute of Computer Science (SICS Swedish ICT)

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: How to implement task-local variables (like thread local variables but for Fork/Join tasks)

Dr Heinz M. Kabutz
Could you perhaps post some code demonstrating what you are trying to achieve?

On Monday, 8 August 2016, Per Mildner <[hidden email]> wrote:
Ping?

> On 09 Jun 2016, at 22:56, Per Mildner <<a href="javascript:;" onclick="_e(event, &#39;cvml&#39;, &#39;perm@sics.se&#39;)">perm@...> wrote:
>
> I need something that should behave like a thread-local variable but for a tree of RecursiveAction tasks.
>
> That is, a task inherits the task-local variables from the task that created it. Looking up a task-local variable is done by traversing the chain of parent tasks, looking for special task instances that holds task local values.
>
> I currently implement this by letting all tasks know their parent (the “current task”) which is passed to their constructor. This way I can traverse the chain of parents towards the root of the task tree.
>
> Keeping track of the current task is done by making all tasks set and restore themselves as the “current task” in a thread-local variable in their RecursiveAction.compute() but that seems wasteful since it is done also when the the task is running in the same thread as its parent task.
>
> So, any ideas how to implement this efficiently?
>
> Even better, is there a way to make something like this work with the default pool?
>
> Regards,
>
>
> Per Mildner
> <a href="javascript:;" onclick="_e(event, &#39;cvml&#39;, &#39;Per.Mildner@sics.se&#39;)">Per.Mildner@...
>
> (I sent this earlier but it never seemed to arrive at the list. Sorry if it gets duplicated.)
>
>

Per Mildner                                     <a href="javascript:;" onclick="_e(event, &#39;cvml&#39;, &#39;Per.Mildner@sics.se&#39;)">Per.Mildner@...
Swedish Institute of Computer Science (SICS Swedish ICT)

_______________________________________________
Concurrency-interest mailing list
<a href="javascript:;" onclick="_e(event, &#39;cvml&#39;, &#39;Concurrency-interest@cs.oswego.edu&#39;)">Concurrency-interest@...
http://cs.oswego.edu/mailman/listinfo/concurrency-interest


--
Dr Heinz M. Kabutz (PhD CompSci)
Author of "The Java(tm) Specialists' Newsletter"
Sun/Oracle Java Champion
JavaOne Rockstar Speaker
http://www.javaspecialists.eu
Tel: +30 69 75 595 262
Skype: kabutz


_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: How to implement task-local variables (like thread local variables but for Fork/Join tasks)

Per Mildner

On 08 Aug 2016, at 15:19, Dr Heinz M. Kabutz <[hidden email]> wrote:

Could you perhaps post some code demonstrating what you are trying to achieve?


The following is a simplified version of the real code.

1. Each (RecursiveAction task knows its parent, and
2. it is possible for a static method to access the “current task”, implemented by letting each compute() method set a thread-local variable.
3. Once you have the “current task” you can follow the parent chain and find the ancestor KeyValuePair tasks that hold key/value pairs for the current task-sub-tree.

This works (at least the real implementation in our code does) but I am a little worried about the overhead of try+getCurrentTask()+setCurrentTask() in each compute() method, especially since it must me done by all our compute() methods, also those that never (indirectly) calls getTaskLocalValue().

However, what I really would like to be able to do is to access these ancestor KeyValuePair tasks also when I have no control over the ForkJoinTask sub-class, e.g. when using the new Java 8 Collection.parallelStream et al.

I guess that ForkJoinPool already has  the necessary information but I see no way to access it.

Note that the callers of getTaskLocalValue() are typically far away from the code that creates and calls the ForkJoinTask sub-classes, and often called via code we can not modify, so we can not easily pass the key/value pairs as extra arguments.

The example Main sets task local variables when processing some of the arguments but does not demonstrate that the key/value pair can be seen from all tasks in a task sub-tree.

package tlv;

import java.util.Arrays;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;
import java.util.function.IntConsumer;

@SuppressWarnings("WeakerAccess")
public class Main
{
    private static final int THRESHOLD = 2;
    /**
     * The {@link IntRangeTask} currently running its {@link IntRangeTask#compute()} in this thread (or its ancestor {@link TaskNode}). May be null.
     */
    private static final ThreadLocal<TaskNode> cCurrentTask = new ThreadLocal<>();
    private static final TaskNode NO_PARENT = null;


    /**
     * The task currently running in the thread, or null if none or unknown.
     */
    static TaskNode getCurrentTask()
    {
        return cCurrentTask.get();
    }

    static void setCurrentTask(TaskNode task)
    {
        cCurrentTask.set(task);
    }

    /**
     * Return the value associated with key in the task tree, closest ancestor first, or defaultValue if none.
     *
     * @param key          the key whose associated value is to be returned
     * @param defaultValue returned if key is not present
     * @return the value (possibly null) associated with key, or defaultValue if none
     * @see #withTaskLocal(Object, Object, Runnable)
     */
    // This is called fairly often.
    static Object getTaskLocalValue(Object key, Object defaultValue)
    {
        TaskNode task = getCurrentTask();

        // Go towards the task-tree root, looking for a value for key.
        while (task != null)
        {
            if (task instanceof KeyValuePair)
            {
                if (((KeyValuePair) task).getKey().equals(key))
                {
                    return ((KeyValuePair) task).getValue();
                }
            }
            task = task.getParent();
        }
        return defaultValue;
    }


    /**
     * @param key      the (non-null) value to bind.
     * @param value    the value.
     * @param runnable the runnable to run with the binding in effect.
     */
    // This is rarely called so it is not speed-critical.
    static void withTaskLocal(Object key, Object value, Runnable runnable)
    {
        KeyValuePair task = new KeyValuePair(key, value, getCurrentTask());
        TaskNode outerTask = getCurrentTask();
        try
        {
            setCurrentTask(task);
            runnable.run();
        }
        finally
        {
            setCurrentTask(outerTask);
        }
    }

    /**
     * Represents a node in the task-tree. Some nodes have associated key/value pairs.
     */
    interface TaskNode
    {
        /**
         * The parent of this task-tree node, or null if none.
         */
        TaskNode getParent();
    }

    // Similar to IncrementTask in RecursiveAction JavaDocs.
    static class IntRangeTask extends RecursiveAction implements TaskNode
    {
        private final TaskNode iParent;
        private final IntConsumer iOperation;
        private final int iLowInclusive;
        private final int iHighExclusive;

        IntRangeTask(IntConsumer operation, int lowInclusive, int highExclusive, TaskNode parent)
        {
            super();
            iParent = parent;
            iOperation = operation;
            iLowInclusive = lowInclusive;
            iHighExclusive = highExclusive;
        }

        @Override
        public TaskNode getParent()
        {
            return iParent;
        }

        // Called often, so I worry about the try+getCurrentTask()+setCurrentTask() overhead here.
        @Override
        protected void compute()
        {
            if (iHighExclusive - iLowInclusive < THRESHOLD)
            {
                TaskNode outerTask = getCurrentTask();

                try
                {
                    setCurrentTask(this);
                    for (int i = iLowInclusive; i < iHighExclusive; ++i)
                    {
                        iOperation.accept(i);
                    }
                }
                finally
                {
                    setCurrentTask(outerTask);
                }
            }
            else
            {
                int mid = iLowInclusive + ((iHighExclusive - iLowInclusive) >>> 1);
                TaskNode parent = getParent(); // could use 'this' here, instead.

                invokeAll(new IntRangeTask(iOperation, iLowInclusive, mid, parent),
                          new IntRangeTask(iOperation, mid, iHighExclusive, parent));
            }
        }
    }

    static class KeyValuePair implements TaskNode
    {
        private final Object iKey;
        private final Object iValue;
        private final TaskNode iParent;

        KeyValuePair(Object key, Object value, TaskNode parent)
        {
            assert key != null;
            iKey = key;
            iValue = value;
            iParent = parent;
        }

        @Override
        public TaskNode getParent()
        {
            return iParent;
        }

        Object getKey()
        {
            return iKey;
        }

        Object getValue()
        {
            return iValue;
        }
    }

    /* java tlv.Main a b c d e f g h i j k l m n o p q r s t u v a
    Round 0
    ...
    Round 1
    a-->a@Thread[main,5,main] Task local value for a
    b-->b@Thread[main,5,main] DefaultResult
    c-->c@Thread[main,5,main] DefaultResult
    d-->d@Thread[main,5,main] DefaultResult
    e-->e@Thread[main,5,main] DefaultResult
    f-->f@Thread[ForkJoinPool.commonPool-worker-6,5,main] DefaultResult
    g-->g@Thread[ForkJoinPool.commonPool-worker-2,5,main] DefaultResult
    h-->h@Thread[ForkJoinPool.commonPool-worker-6,5,main] DefaultResult
    i-->i@Thread[ForkJoinPool.commonPool-worker-7,5,main] DefaultResult
    j-->j@Thread[ForkJoinPool.commonPool-worker-1,5,main] DefaultResult
    k-->k@Thread[ForkJoinPool.commonPool-worker-7,5,main] DefaultResult
    l-->l@Thread[ForkJoinPool.commonPool-worker-1,5,main] Task local value for l
    m-->m@Thread[ForkJoinPool.commonPool-worker-1,5,main] DefaultResult
    n-->n@Thread[ForkJoinPool.commonPool-worker-1,5,main] DefaultResult
    o-->o@Thread[ForkJoinPool.commonPool-worker-6,5,main] DefaultResult
    p-->p@Thread[ForkJoinPool.commonPool-worker-6,5,main] DefaultResult
    q-->q@Thread[ForkJoinPool.commonPool-worker-6,5,main] DefaultResult
    r-->r@Thread[ForkJoinPool.commonPool-worker-2,5,main] DefaultResult
    s-->s@Thread[ForkJoinPool.commonPool-worker-2,5,main] DefaultResult
    t-->t@Thread[ForkJoinPool.commonPool-worker-2,5,main] Task local value for t
    u-->u@Thread[ForkJoinPool.commonPool-worker-7,5,main] DefaultResult
    v-->v@Thread[ForkJoinPool.commonPool-worker-7,5,main] DefaultResult
    a-->a@Thread[ForkJoinPool.commonPool-worker-7,5,main] Task local value for a
    Round 2
    ...
    */
    public static void main(String[] args)
    {
        // These arguments will run with task local values.
        final String[] keys = {"a", "t", "l"};

        final String[] results = new String[args.length];

        final ForkJoinPool pool = ForkJoinPool.commonPool();

        for (int j = 0; j < 5; j++)
        {
            System.out.println("Round " + j);

            pool.invoke(new IntRangeTask((i) ->
                                         {
                                             final String arg = args[i];

                                             if (Arrays.asList(keys).contains(arg))
                                             {
                                                 withTaskLocal(arg, "Task local value for " + arg, () -> results[i] = doSomething(arg));
                                             }
                                             else
                                             {
                                                 results[i] = doSomething(arg);
                                             }
                                         },
                                         0, args.length, NO_PARENT));

            for (int i = 0; i < results.length; i++)
            {
                System.out.println(args[i] + "-->" + results[i]);
            }
        }
    }

    private static String doSomething(final String arg)
    {
        return arg + "@" + Thread.currentThread() + " " + getTaskLocalValue(arg, "DefaultResult");
    }
}



On Monday, 8 August 2016, Per Mildner <[hidden email]> wrote:
Ping?

> On 09 Jun 2016, at 22:56, Per Mildner <[hidden email]> wrote:
>
> I need something that should behave like a thread-local variable but for a tree of RecursiveAction tasks.
>
> That is, a task inherits the task-local variables from the task that created it. Looking up a task-local variable is done by traversing the chain of parent tasks, looking for special task instances that holds task local values.
>
> I currently implement this by letting all tasks know their parent (the “current task”) which is passed to their constructor. This way I can traverse the chain of parents towards the root of the task tree.
>
> Keeping track of the current task is done by making all tasks set and restore themselves as the “current task” in a thread-local variable in their RecursiveAction.compute() but that seems wasteful since it is done also when the the task is running in the same thread as its parent task.
>
> So, any ideas how to implement this efficiently?
>
> Even better, is there a way to make something like this work with the default pool?
>
> Regards,
...


-- 
Dr Heinz M. Kabutz (PhD CompSci)
Author of "The Java(tm) Specialists' Newsletter"
Sun/Oracle Java Champion
JavaOne Rockstar Speaker
http://www.javaspecialists.eu
Tel: +30 69 75 595 262
Skype: kabutz




Per Mildner




_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest