Spot the bug in stream processing, IntConsumer and Executor

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

Spot the bug in stream processing, IntConsumer and Executor

KedarMhaswade
​Perhaps I had a long day. So, this might completely be a silly mistake, but I need congenial help to figure it out.

To demonstrate race condition, I wrote the following program. The program has ten concurrent tasks that modify count, a volatile shared variable. If I use the increment task (line 39), I get different results (like, e.g. [2]) every time I run it, demonstrating the race condition.

However, if I use the incrementStream task instead (line 39), then the count variable is not updated at all. The output is like [1] every time. In a separate program not involving threads, I have verified that the lambda expression like incrementStream updates a member variable as expected.

What am I doing wrong?

Regards,
Kedar


public class RaceCondition {

    private static volatile int count = 0;
    public static void main(String[] args) {
        // update the shared variable traditionally
        Runnable increment = () -> {
            for (int i = 0; i < 1000; i++)
                count++;
        };
        // update the shared variable as a side effect of an IntConsumer#accept
        Runnable incrementStream = () -> IntStream.rangeClosed(1, 100).forEach(i -> count++);
        ExecutorService exec = Executors.newCachedThreadPool(); // short lived tasks
        try {
            for (int i = 0; i < 10; i++) {
             
​// ​
exec.execute(incrementStream)
​;        
// line 38​
​​

              exec.execute(increment);
​         // line 39​

                System.out.println("task: " + i + " updates count to: " + count);
            }
        } finally {
            exec.shutdown();
            System.out.println("final: " + count);
        }
    }

}
[1]
task: 0 updates count to: 0
task: 1 updates count to: 0
task: 2 updates count to: 0
task: 3 updates count to: 0
task: 4 updates count to: 0
task: 5 updates count to: 0
task: 6 updates count to: 0
task: 7 updates count to: 0
task: 8 updates count to: 0
task: 9 updates count to: 0
final: 0
[2]
task: 0 updates count to: 0
task: 1 updates count to: 1000
task: 2 updates count to: 1000
task: 3 updates count to: 2000
task: 4 updates count to: 4027
task: 5 updates count to: 5000
task: 6 updates count to: 6000
task: 7 updates count to: 7005
task: 8 updates count to: 8000
task: 9 updates count to: 8185
final: 9381


_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: Spot the bug in stream processing, IntConsumer and Executor

Benjamin Manes
You need to wait until the executor has completed, or else the main method may complete prior to the task running.

exec.awaitTermination(1, TimeUnit.MINUTES);


On Fri, Feb 24, 2017 at 8:53 PM, kedar mhaswade <[hidden email]> wrote:
​Perhaps I had a long day. So, this might completely be a silly mistake, but I need congenial help to figure it out.

To demonstrate race condition, I wrote the following program. The program has ten concurrent tasks that modify count, a volatile shared variable. If I use the increment task (line 39), I get different results (like, e.g. [2]) every time I run it, demonstrating the race condition.

However, if I use the incrementStream task instead (line 39), then the count variable is not updated at all. The output is like [1] every time. In a separate program not involving threads, I have verified that the lambda expression like incrementStream updates a member variable as expected.

What am I doing wrong?

Regards,
Kedar


public class RaceCondition {

    private static volatile int count = 0;
    public static void main(String[] args) {
        // update the shared variable traditionally
        Runnable increment = () -> {
            for (int i = 0; i < 1000; i++)
                count++;
        };
        // update the shared variable as a side effect of an IntConsumer#accept
        Runnable incrementStream = () -> IntStream.rangeClosed(1, 100).forEach(i -> count++);
        ExecutorService exec = Executors.newCachedThreadPool(); // short lived tasks
        try {
            for (int i = 0; i < 10; i++) {
             
​// ​
exec.execute(incrementStream)
​;        
// line 38​
​​

              exec.execute(increment);
​         // line 39​

                System.out.println("task: " + i + " updates count to: " + count);
            }
        } finally {
            exec.shutdown();
            System.out.println("final: " + count);
        }
    }

}
[1]
task: 0 updates count to: 0
task: 1 updates count to: 0
task: 2 updates count to: 0
task: 3 updates count to: 0
task: 4 updates count to: 0
task: 5 updates count to: 0
task: 6 updates count to: 0
task: 7 updates count to: 0
task: 8 updates count to: 0
task: 9 updates count to: 0
final: 0
[2]
task: 0 updates count to: 0
task: 1 updates count to: 1000
task: 2 updates count to: 1000
task: 3 updates count to: 2000
task: 4 updates count to: 4027
task: 5 updates count to: 5000
task: 6 updates count to: 6000
task: 7 updates count to: 7005
task: 8 updates count to: 8000
task: 9 updates count to: 8185
final: 9381


_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest



_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: Spot the bug in stream processing, IntConsumer and Executor

Tom Lee
Hi Kedar,

Disclaimer: I'd generally say something about this approach to having multiple threads incrementing a variable being a bad idea in general, but it sounds like you're just trying to explore the behavior of this race right?

Benjamin's on the right track: you're seeing zeros because the code submitted to the executor hasn't run by the time you print out the "task ... updates count to ..." messages. Still, it's not a complete fix & you'll very likely continue to see very different results even if you add an awaitTermination call after the shutdown() call. Also probably depends on how fast your machine is etc. etc. too (e.g. perhaps on slower machines / fewer cores you'd see similar output for both).

It's still kind of interesting why the exhibited behavior is so different. Here's a bit of a hint -- with the following code I get very similar output irrespective of whether I'm using "increment" or "incrementStream":
private static volatile int count = 0;
public static void main(String[] args) throws Exception {

// warmup (slower)
run(false);

// do it for real (faster)
count = 0;
run(true);
}

private static void run(final boolean show) throws Exception {
Runnable increment = () -> {
for (int i = 0; i < 1000; i++)
count++;
};

Runnable incrementStream = () -> {
IntStream.rangeClosed(1, 1000).forEach(i -> count++);
};
ExecutorService exec = Executors.newCachedThreadPool();
try {
for (int i = 0; i < 10; i++) {
exec.execute(increment);
// exec.execute(incrementStream);
if (show) System.out.println("task: " + i + " updates count to: " + count);
}
}
finally {
exec.shutdown();
exec.awaitTermination(10, TimeUnit.SECONDS);
if (show) System.out.println("final: " + count);
}
}
Weird right?

Another hint: I added code to println() a message to the end of incrementStream() in your original code ("hello" or something silly like that). I didn't see any "hello" messages until after all the "task ... updates count to ..." messages were displayed.

Without going too deep, I suspect all your incrementStream() threads are held up by class loading etc. Specifically, the stream APIs you're using in incrementStream pull in maybe 40 additional classes on an Oracle JVM. To see for yourself, run the JVM with -verbose:class and run both increment and incrementStream -- notice the latter does a bunch of extra work. Since classes are sort of loaded on-demand, that work needs to happen before your incrementStream threads can run. Thus why the warmup step above improves the situation.

Put another way: even though logically increment and incrementStream are doing something very similar, the latter has to do a bunch of additional work before any of the stuff that touches the count variable even gets to run. And if it's not obvious it should be noted that your scenario here is very small/fast and this effect will be less pronounced (but not entirely absent) in a larger test.

Cheers,
Tom


On Fri, Feb 24, 2017 at 9:07 PM, Benjamin Manes <[hidden email]> wrote:
You need to wait until the executor has completed, or else the main method may complete prior to the task running.

exec.awaitTermination(1, TimeUnit.MINUTES);


On Fri, Feb 24, 2017 at 8:53 PM, kedar mhaswade <[hidden email]> wrote:
​Perhaps I had a long day. So, this might completely be a silly mistake, but I need congenial help to figure it out.

To demonstrate race condition, I wrote the following program. The program has ten concurrent tasks that modify count, a volatile shared variable. If I use the increment task (line 39), I get different results (like, e.g. [2]) every time I run it, demonstrating the race condition.

However, if I use the incrementStream task instead (line 39), then the count variable is not updated at all. The output is like [1] every time. In a separate program not involving threads, I have verified that the lambda expression like incrementStream updates a member variable as expected.

What am I doing wrong?

Regards,
Kedar


public class RaceCondition {

    private static volatile int count = 0;
    public static void main(String[] args) {
        // update the shared variable traditionally
        Runnable increment = () -> {
            for (int i = 0; i < 1000; i++)
                count++;
        };
        // update the shared variable as a side effect of an IntConsumer#accept
        Runnable incrementStream = () -> IntStream.rangeClosed(1, 100).forEach(i -> count++);
        ExecutorService exec = Executors.newCachedThreadPool(); // short lived tasks
        try {
            for (int i = 0; i < 10; i++) {
             
​// ​
exec.execute(incrementStream)
​;        
// line 38​
​​

              exec.execute(increment);
​         // line 39​

                System.out.println("task: " + i + " updates count to: " + count);
            }
        } finally {
            exec.shutdown();
            System.out.println("final: " + count);
        }
    }

}
[1]
task: 0 updates count to: 0
task: 1 updates count to: 0
task: 2 updates count to: 0
task: 3 updates count to: 0
task: 4 updates count to: 0
task: 5 updates count to: 0
task: 6 updates count to: 0
task: 7 updates count to: 0
task: 8 updates count to: 0
task: 9 updates count to: 0
final: 0
[2]
task: 0 updates count to: 0
task: 1 updates count to: 1000
task: 2 updates count to: 1000
task: 3 updates count to: 2000
task: 4 updates count to: 4027
task: 5 updates count to: 5000
task: 6 updates count to: 6000
task: 7 updates count to: 7005
task: 8 updates count to: 8000
task: 9 updates count to: 8185
final: 9381


_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest



_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest



_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: Spot the bug in stream processing, IntConsumer and Executor

David Holmes-6

Also note that count++ is not an atomic operation so you will be losing updates and that will also cause erratic results.

 

David

 

From: Concurrency-interest [mailto:[hidden email]] On Behalf Of Tom Lee
Sent: Saturday, February 25, 2017 6:09 PM
To: Benjamin Manes <[hidden email]>
Cc: [hidden email]
Subject: Re: [concurrency-interest] Spot the bug in stream processing, IntConsumer and Executor

 

Hi Kedar,

 

Disclaimer: I'd generally say something about this approach to having multiple threads incrementing a variable being a bad idea in general, but it sounds like you're just trying to explore the behavior of this race right?

 

Benjamin's on the right track: you're seeing zeros because the code submitted to the executor hasn't run by the time you print out the "task ... updates count to ..." messages. Still, it's not a complete fix & you'll very likely continue to see very different results even if you add an awaitTermination call after the shutdown() call. Also probably depends on how fast your machine is etc. etc. too (e.g. perhaps on slower machines / fewer cores you'd see similar output for both).

 

It's still kind of interesting why the exhibited behavior is so different. Here's a bit of a hint -- with the following code I get very similar output irrespective of whether I'm using "increment" or "incrementStream":

private static volatile int count = 0;
public static void main(String[] args) throws Exception {

    // warmup (slower)
    run(false);

    // do it for real (faster)
    count = 0;
    run(true);
}

private static void run(final boolean show) throws Exception {
    Runnable increment = () -> {
        for (int i = 0; i < 1000; i++)
            count++;
    };

    Runnable incrementStream = () -> {
        IntStream.rangeClosed(1, 1000).forEach(i -> count++);
    };
    ExecutorService exec = Executors.newCachedThreadPool();
    try {
        for (int i = 0; i < 10; i++) {
            exec.execute(increment);
            // exec.execute(incrementStream);
            if (show) System.out.println("task: " + i + " updates count to: " + count);
        }
    }
    finally {
        exec.shutdown();
        exec.awaitTermination(10, TimeUnit.SECONDS);
        if (show) System.out.println("final: " + count);
    }
}

Weird right?

 

Another hint: I added code to println() a message to the end of incrementStream() in your original code ("hello" or something silly like that). I didn't see any "hello" messages until after all the "task ... updates count to ..." messages were displayed.

 

Without going too deep, I suspect all your incrementStream() threads are held up by class loading etc. Specifically, the stream APIs you're using in incrementStream pull in maybe 40 additional classes on an Oracle JVM. To see for yourself, run the JVM with -verbose:class and run both increment and incrementStream -- notice the latter does a bunch of extra work. Since classes are sort of loaded on-demand, that work needs to happen before your incrementStream threads can run. Thus why the warmup step above improves the situation.

 

Put another way: even though logically increment and incrementStream are doing something very similar, the latter has to do a bunch of additional work before any of the stuff that touches the count variable even gets to run. And if it's not obvious it should be noted that your scenario here is very small/fast and this effect will be less pronounced (but not entirely absent) in a larger test.

 

Cheers,

Tom

 

 

On Fri, Feb 24, 2017 at 9:07 PM, Benjamin Manes <[hidden email]> wrote:

You need to wait until the executor has completed, or else the main method may complete prior to the task running.

exec.awaitTermination(1, TimeUnit.MINUTES);

 

 

On Fri, Feb 24, 2017 at 8:53 PM, kedar mhaswade <[hidden email]> wrote:

​Perhaps I had a long day. So, this might completely be a silly mistake, but I need congenial help to figure it out.

 

To demonstrate race condition, I wrote the following program. The program has ten concurrent tasks that modify count, a volatile shared variable. If I use the increment task (line 39), I get different results (like, e.g. [2]) every time I run it, demonstrating the race condition.

 

However, if I use the incrementStream task instead (line 39), then the count variable is not updated at all. The output is like [1] every time. In a separate program not involving threads, I have verified that the lambda expression like incrementStream updates a member variable as expected.

 

What am I doing wrong?

 

Regards,

Kedar

 

 

public class RaceCondition {

    private static volatile int count = 0;
    public static void main(String[] args) {
        // update the shared variable traditionally
        Runnable increment = () -> {
            for (int i = 0; i < 1000; i++)
                count++;
        };
        // update the shared variable as a side effect of an IntConsumer#accept
        Runnable incrementStream = () -> IntStream.rangeClosed(1, 100).forEach(i -> count++);
        ExecutorService exec = Executors.newCachedThreadPool(); // short lived tasks
        try {
            for (int i = 0; i < 10; i++) {
             

//

exec.execute(incrementStream)

;        

// line 38

​​


              exec.execute(increment);

        // line 39


                System.out.println("task: " + i + " updates count to: " + count);
            }
        } finally {
            exec.shutdown();
            System.out.println("final: " + count);
        }
    }

}

[1]

task: 0 updates count to: 0

task: 1 updates count to: 0

task: 2 updates count to: 0

task: 3 updates count to: 0

task: 4 updates count to: 0

task: 5 updates count to: 0

task: 6 updates count to: 0

task: 7 updates count to: 0

task: 8 updates count to: 0

task: 9 updates count to: 0

final: 0

[2]

task: 0 updates count to: 0

task: 1 updates count to: 1000

task: 2 updates count to: 1000

task: 3 updates count to: 2000

task: 4 updates count to: 4027

task: 5 updates count to: 5000

task: 6 updates count to: 6000

task: 7 updates count to: 7005

task: 8 updates count to: 8000

task: 9 updates count to: 8185

final: 9381

 

 

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest

 


_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest

 


_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: Spot the bug in stream processing, IntConsumer and Executor

KedarMhaswade
In reply to this post by Tom Lee
Thanks Tom and Benjamin. Yes, I am trying to demonstrate a race condition, so yes, this code is for illustration purposes only.

And upon a closer look, I found the bug in my code:

        Runnable increment = () -> {
            for (int i = 0; i < 1000; i++)
                count++;
        };

        // update the shared variable as a side effect of an IntConsumer#accept
        Runnable incrementStream = () -> IntStream.rangeClosed(1, 100).forEach(i -> count++);


The stream-oriented task was getting a 100 instead of a 1000 :-P. Fixing this gives very similar results. Of course, it clearly demonstrates that count++ is not an atomic operation and some updates are lost as expected. I also verified that by using an AtomicInteger instead of a simple volatile, the final value of count is 10,000, always.

Regards,
Kedar


On Sat, Feb 25, 2017 at 12:09 AM, Tom Lee <[hidden email]> wrote:
Hi Kedar,

Disclaimer: I'd generally say something about this approach to having multiple threads incrementing a variable being a bad idea in general, but it sounds like you're just trying to explore the behavior of this race right?

Benjamin's on the right track: you're seeing zeros because the code submitted to the executor hasn't run by the time you print out the "task ... updates count to ..." messages. Still, it's not a complete fix & you'll very likely continue to see very different results even if you add an awaitTermination call after the shutdown() call. Also probably depends on how fast your machine is etc. etc. too (e.g. perhaps on slower machines / fewer cores you'd see similar output for both).

It's still kind of interesting why the exhibited behavior is so different. Here's a bit of a hint -- with the following code I get very similar output irrespective of whether I'm using "increment" or "incrementStream":
private static volatile int count = 0;
public static void main(String[] args) throws Exception {

// warmup (slower)
run(false);

// do it for real (faster)
count = 0;
run(true);
}

private static void run(final boolean show) throws Exception {
Runnable increment = () -> {
for (int i = 0; i < 1000; i++)
count++;
};

Runnable incrementStream = () -> {
IntStream.rangeClosed(1, 1000).forEach(i -> count++);
};
ExecutorService exec = Executors.newCachedThreadPool();
try {
for (int i = 0; i < 10; i++) {
exec.execute(increment);
// exec.execute(incrementStream);
if (show) System.out.println("task: " + i + " updates count to: " + count);
}
}
finally {
exec.shutdown();
exec.awaitTermination(10, TimeUnit.SECONDS);
if (show) System.out.println("final: " + count);
}
}
Weird right?

Another hint: I added code to println() a message to the end of incrementStream() in your original code ("hello" or something silly like that). I didn't see any "hello" messages until after all the "task ... updates count to ..." messages were displayed.

Without going too deep, I suspect all your incrementStream() threads are held up by class loading etc. Specifically, the stream APIs you're using in incrementStream pull in maybe 40 additional classes on an Oracle JVM. To see for yourself, run the JVM with -verbose:class and run both increment and incrementStream -- notice the latter does a bunch of extra work. Since classes are sort of loaded on-demand, that work needs to happen before your incrementStream threads can run. Thus why the warmup step above improves the situation.

Put another way: even though logically increment and incrementStream are doing something very similar, the latter has to do a bunch of additional work before any of the stuff that touches the count variable even gets to run. And if it's not obvious it should be noted that your scenario here is very small/fast and this effect will be less pronounced (but not entirely absent) in a larger test.

Cheers,
Tom


On Fri, Feb 24, 2017 at 9:07 PM, Benjamin Manes <[hidden email]> wrote:
You need to wait until the executor has completed, or else the main method may complete prior to the task running.

exec.awaitTermination(1, TimeUnit.MINUTES);


On Fri, Feb 24, 2017 at 8:53 PM, kedar mhaswade <[hidden email]> wrote:
​Perhaps I had a long day. So, this might completely be a silly mistake, but I need congenial help to figure it out.

To demonstrate race condition, I wrote the following program. The program has ten concurrent tasks that modify count, a volatile shared variable. If I use the increment task (line 39), I get different results (like, e.g. [2]) every time I run it, demonstrating the race condition.

However, if I use the incrementStream task instead (line 39), then the count variable is not updated at all. The output is like [1] every time. In a separate program not involving threads, I have verified that the lambda expression like incrementStream updates a member variable as expected.

What am I doing wrong?

Regards,
Kedar


public class RaceCondition {

    private static volatile int count = 0;
    public static void main(String[] args) {
        // update the shared variable traditionally
        Runnable increment = () -> {
            for (int i = 0; i < 1000; i++)
                count++;
        };
        // update the shared variable as a side effect of an IntConsumer#accept
        Runnable incrementStream = () -> IntStream.rangeClosed(1, 100).forEach(i -> count++);
        ExecutorService exec = Executors.newCachedThreadPool(); // short lived tasks
        try {
            for (int i = 0; i < 10; i++) {
             
​// ​
exec.execute(incrementStream)
​;        
// line 38​
​​

              exec.execute(increment);
​         // line 39​

                System.out.println("task: " + i + " updates count to: " + count);
            }
        } finally {
            exec.shutdown();
            System.out.println("final: " + count);
        }
    }

}
[1]
task: 0 updates count to: 0
task: 1 updates count to: 0
task: 2 updates count to: 0
task: 3 updates count to: 0
task: 4 updates count to: 0
task: 5 updates count to: 0
task: 6 updates count to: 0
task: 7 updates count to: 0
task: 8 updates count to: 0
task: 9 updates count to: 0
final: 0
[2]
task: 0 updates count to: 0
task: 1 updates count to: 1000
task: 2 updates count to: 1000
task: 3 updates count to: 2000
task: 4 updates count to: 4027
task: 5 updates count to: 5000
task: 6 updates count to: 6000
task: 7 updates count to: 7005
task: 8 updates count to: 8000
task: 9 updates count to: 8185
final: 9381


_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest



_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest




_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: Spot the bug in stream processing, IntConsumer and Executor

Tom Lee
FWIW I noticed that 100/1000 thing too, and still got "bad" results (all zeros) after changing the 100 to 1000.

On Feb 25, 2017 9:32 AM, "kedar mhaswade" <[hidden email]> wrote:
Thanks Tom and Benjamin. Yes, I am trying to demonstrate a race condition, so yes, this code is for illustration purposes only.

And upon a closer look, I found the bug in my code:

        Runnable increment = () -> {
            for (int i = 0; i < 1000; i++)
                count++;
        };

        // update the shared variable as a side effect of an IntConsumer#accept
        Runnable incrementStream = () -> IntStream.rangeClosed(1, 100).forEach(i -> count++);


The stream-oriented task was getting a 100 instead of a 1000 :-P. Fixing this gives very similar results. Of course, it clearly demonstrates that count++ is not an atomic operation and some updates are lost as expected. I also verified that by using an AtomicInteger instead of a simple volatile, the final value of count is 10,000, always.

Regards,
Kedar


On Sat, Feb 25, 2017 at 12:09 AM, Tom Lee <[hidden email]> wrote:
Hi Kedar,

Disclaimer: I'd generally say something about this approach to having multiple threads incrementing a variable being a bad idea in general, but it sounds like you're just trying to explore the behavior of this race right?

Benjamin's on the right track: you're seeing zeros because the code submitted to the executor hasn't run by the time you print out the "task ... updates count to ..." messages. Still, it's not a complete fix & you'll very likely continue to see very different results even if you add an awaitTermination call after the shutdown() call. Also probably depends on how fast your machine is etc. etc. too (e.g. perhaps on slower machines / fewer cores you'd see similar output for both).

It's still kind of interesting why the exhibited behavior is so different. Here's a bit of a hint -- with the following code I get very similar output irrespective of whether I'm using "increment" or "incrementStream":
private static volatile int count = 0;
public static void main(String[] args) throws Exception {

// warmup (slower)
run(false);

// do it for real (faster)
count = 0;
run(true);
}

private static void run(final boolean show) throws Exception {
Runnable increment = () -> {
for (int i = 0; i < 1000; i++)
count++;
};

Runnable incrementStream = () -> {
IntStream.rangeClosed(1, 1000).forEach(i -> count++);
};
ExecutorService exec = Executors.newCachedThreadPool();
try {
for (int i = 0; i < 10; i++) {
exec.execute(increment);
// exec.execute(incrementStream);
if (show) System.out.println("task: " + i + " updates count to: " + count);
}
}
finally {
exec.shutdown();
exec.awaitTermination(10, TimeUnit.SECONDS);
if (show) System.out.println("final: " + count);
}
}
Weird right?

Another hint: I added code to println() a message to the end of incrementStream() in your original code ("hello" or something silly like that). I didn't see any "hello" messages until after all the "task ... updates count to ..." messages were displayed.

Without going too deep, I suspect all your incrementStream() threads are held up by class loading etc. Specifically, the stream APIs you're using in incrementStream pull in maybe 40 additional classes on an Oracle JVM. To see for yourself, run the JVM with -verbose:class and run both increment and incrementStream -- notice the latter does a bunch of extra work. Since classes are sort of loaded on-demand, that work needs to happen before your incrementStream threads can run. Thus why the warmup step above improves the situation.

Put another way: even though logically increment and incrementStream are doing something very similar, the latter has to do a bunch of additional work before any of the stuff that touches the count variable even gets to run. And if it's not obvious it should be noted that your scenario here is very small/fast and this effect will be less pronounced (but not entirely absent) in a larger test.

Cheers,
Tom


On Fri, Feb 24, 2017 at 9:07 PM, Benjamin Manes <[hidden email]> wrote:
You need to wait until the executor has completed, or else the main method may complete prior to the task running.

exec.awaitTermination(1, TimeUnit.MINUTES);


On Fri, Feb 24, 2017 at 8:53 PM, kedar mhaswade <[hidden email]> wrote:
​Perhaps I had a long day. So, this might completely be a silly mistake, but I need congenial help to figure it out.

To demonstrate race condition, I wrote the following program. The program has ten concurrent tasks that modify count, a volatile shared variable. If I use the increment task (line 39), I get different results (like, e.g. [2]) every time I run it, demonstrating the race condition.

However, if I use the incrementStream task instead (line 39), then the count variable is not updated at all. The output is like [1] every time. In a separate program not involving threads, I have verified that the lambda expression like incrementStream updates a member variable as expected.

What am I doing wrong?

Regards,
Kedar


public class RaceCondition {

    private static volatile int count = 0;
    public static void main(String[] args) {
        // update the shared variable traditionally
        Runnable increment = () -> {
            for (int i = 0; i < 1000; i++)
                count++;
        };
        // update the shared variable as a side effect of an IntConsumer#accept
        Runnable incrementStream = () -> IntStream.rangeClosed(1, 100).forEach(i -> count++);
        ExecutorService exec = Executors.newCachedThreadPool(); // short lived tasks
        try {
            for (int i = 0; i < 10; i++) {
             
​// ​
exec.execute(incrementStream)
​;        
// line 38​
​​

              exec.execute(increment);
​         // line 39​

                System.out.println("task: " + i + " updates count to: " + count);
            }
        } finally {
            exec.shutdown();
            System.out.println("final: " + count);
        }
    }

}
[1]
task: 0 updates count to: 0
task: 1 updates count to: 0
task: 2 updates count to: 0
task: 3 updates count to: 0
task: 4 updates count to: 0
task: 5 updates count to: 0
task: 6 updates count to: 0
task: 7 updates count to: 0
task: 8 updates count to: 0
task: 9 updates count to: 0
final: 0
[2]
task: 0 updates count to: 0
task: 1 updates count to: 1000
task: 2 updates count to: 1000
task: 3 updates count to: 2000
task: 4 updates count to: 4027
task: 5 updates count to: 5000
task: 6 updates count to: 6000
task: 7 updates count to: 7005
task: 8 updates count to: 8000
task: 9 updates count to: 8185
final: 9381


_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest



_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest




_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: Spot the bug in stream processing, IntConsumer and Executor

KedarMhaswade

​Sorry for the delay, Tom.​

On Sat, Feb 25, 2017 at 9:37 AM, Tom Lee <[hidden email]> wrote:
FWIW I noticed that 100/1000 thing too, and still got "bad" results (all zeros) after changing the 100 to 1000.

​I think there was another mistake in my original code, in that the main thread was printing the value of the volatile count. Perhaps, as you were suggesting, in the case of incrementStream, the class loading or some other issues appear to cause the submitted tasks to take longer to start (than in increment).

With the 100->1000 change, I also moved the print statement inside the task [1] and then I got the comparable (albeit incorrect because of the race owing to nonatomic count++) results.


[1]

public class RaceCondition {

    private static volatile int count = 0;
    public static void main(String[] args) throws InterruptedException {
        // update the shared variable traditionally
        Runnable increment = () -> {
            System.out.println("count was: " + count + " in thread: " + Thread.currentThread());
            for (int i = 0; i < 1000; i++)
                count++;
            System.out.println("count updated to: " + count + " in thread: " + Thread.currentThread());
        };
        // update the shared variable as a side effect of an IntConsumer#accept
        Runnable incrementStream = () -> {
            System.out.println("count was: " + count + " in thread: " + Thread.currentThread());
            IntStream.rangeClosed(1, 1000).forEach(i -> count++);
            System.out.println("count updated to: " + count + " in thread: " + Thread.currentThread());
        };
        Runnable bulkUpdate = () -> count += 1000;
        ExecutorService exec = Executors.newCachedThreadPool(); // short lived tasks
        try {
            for (int i = 0; i < 10; i++) {
//                exec.execute(incrementStream);
                  exec.execute(increment);
                  // statement printing the count was here previously
            }
        } finally {
            exec.awaitTermination(5, TimeUnit.SECONDS);
            exec.shutdown();
            System.out.println("final: " + count);
        }
    }
}

On Feb 25, 2017 9:32 AM, "kedar mhaswade" <[hidden email]> wrote:
Thanks Tom and Benjamin. Yes, I am trying to demonstrate a race condition, so yes, this code is for illustration purposes only.

And upon a closer look, I found the bug in my code:

        Runnable increment = () -> {
            for (int i = 0; i < 1000; i++)
                count++;
        };

        // update the shared variable as a side effect of an IntConsumer#accept
        Runnable incrementStream = () -> IntStream.rangeClosed(1, 100).forEach(i -> count++);


The stream-oriented task was getting a 100 instead of a 1000 :-P. Fixing this gives very similar results. Of course, it clearly demonstrates that count++ is not an atomic operation and some updates are lost as expected. I also verified that by using an AtomicInteger instead of a simple volatile, the final value of count is 10,000, always.

Regards,
Kedar


On Sat, Feb 25, 2017 at 12:09 AM, Tom Lee <[hidden email]> wrote:
Hi Kedar,

Disclaimer: I'd generally say something about this approach to having multiple threads incrementing a variable being a bad idea in general, but it sounds like you're just trying to explore the behavior of this race right?

Benjamin's on the right track: you're seeing zeros because the code submitted to the executor hasn't run by the time you print out the "task ... updates count to ..." messages. Still, it's not a complete fix & you'll very likely continue to see very different results even if you add an awaitTermination call after the shutdown() call. Also probably depends on how fast your machine is etc. etc. too (e.g. perhaps on slower machines / fewer cores you'd see similar output for both).

It's still kind of interesting why the exhibited behavior is so different. Here's a bit of a hint -- with the following code I get very similar output irrespective of whether I'm using "increment" or "incrementStream":
private static volatile int count = 0;
public static void main(String[] args) throws Exception {

// warmup (slower)
run(false);

// do it for real (faster)
count = 0;
run(true);
}

private static void run(final boolean show) throws Exception {
Runnable increment = () -> {
for (int i = 0; i < 1000; i++)
count++;
};

Runnable incrementStream = () -> {
IntStream.rangeClosed(1, 1000).forEach(i -> count++);
};
ExecutorService exec = Executors.newCachedThreadPool();
try {
for (int i = 0; i < 10; i++) {
exec.execute(increment);
// exec.execute(incrementStream);
if (show) System.out.println("task: " + i + " updates count to: " + count);
}
}
finally {
exec.shutdown();
exec.awaitTermination(10, TimeUnit.SECONDS);
if (show) System.out.println("final: " + count);
}
}
Weird right?

Another hint: I added code to println() a message to the end of incrementStream() in your original code ("hello" or something silly like that). I didn't see any "hello" messages until after all the "task ... updates count to ..." messages were displayed.

Without going too deep, I suspect all your incrementStream() threads are held up by class loading etc. Specifically, the stream APIs you're using in incrementStream pull in maybe 40 additional classes on an Oracle JVM. To see for yourself, run the JVM with -verbose:class and run both increment and incrementStream -- notice the latter does a bunch of extra work. Since classes are sort of loaded on-demand, that work needs to happen before your incrementStream threads can run. Thus why the warmup step above improves the situation.

Put another way: even though logically increment and incrementStream are doing something very similar, the latter has to do a bunch of additional work before any of the stuff that touches the count variable even gets to run. And if it's not obvious it should be noted that your scenario here is very small/fast and this effect will be less pronounced (but not entirely absent) in a larger test.

Cheers,
Tom


On Fri, Feb 24, 2017 at 9:07 PM, Benjamin Manes <[hidden email]> wrote:
You need to wait until the executor has completed, or else the main method may complete prior to the task running.

exec.awaitTermination(1, TimeUnit.MINUTES);


On Fri, Feb 24, 2017 at 8:53 PM, kedar mhaswade <[hidden email]> wrote:
​Perhaps I had a long day. So, this might completely be a silly mistake, but I need congenial help to figure it out.

To demonstrate race condition, I wrote the following program. The program has ten concurrent tasks that modify count, a volatile shared variable. If I use the increment task (line 39), I get different results (like, e.g. [2]) every time I run it, demonstrating the race condition.

However, if I use the incrementStream task instead (line 39), then the count variable is not updated at all. The output is like [1] every time. In a separate program not involving threads, I have verified that the lambda expression like incrementStream updates a member variable as expected.

What am I doing wrong?

Regards,
Kedar


public class RaceCondition {

    private static volatile int count = 0;
    public static void main(String[] args) {
        // update the shared variable traditionally
        Runnable increment = () -> {
            for (int i = 0; i < 1000; i++)
                count++;
        };
        // update the shared variable as a side effect of an IntConsumer#accept
        Runnable incrementStream = () -> IntStream.rangeClosed(1, 100).forEach(i -> count++);
        ExecutorService exec = Executors.newCachedThreadPool(); // short lived tasks
        try {
            for (int i = 0; i < 10; i++) {
             
​// ​
exec.execute(incrementStream)
​;        
// line 38​
​​

              exec.execute(increment);
​         // line 39​

                System.out.println("task: " + i + " updates count to: " + count);
            }
        } finally {
            exec.shutdown();
            System.out.println("final: " + count);
        }
    }

}
[1]
task: 0 updates count to: 0
task: 1 updates count to: 0
task: 2 updates count to: 0
task: 3 updates count to: 0
task: 4 updates count to: 0
task: 5 updates count to: 0
task: 6 updates count to: 0
task: 7 updates count to: 0
task: 8 updates count to: 0
task: 9 updates count to: 0
final: 0
[2]
task: 0 updates count to: 0
task: 1 updates count to: 1000
task: 2 updates count to: 1000
task: 3 updates count to: 2000
task: 4 updates count to: 4027
task: 5 updates count to: 5000
task: 6 updates count to: 6000
task: 7 updates count to: 7005
task: 8 updates count to: 8000
task: 9 updates count to: 8185
final: 9381


_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest



_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest





_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: Spot the bug in stream processing, IntConsumer and Executor

Tom Lee
On Wed, Mar 1, 2017 at 11:00 AM, kedar mhaswade <[hidden email]> wrote:

​Sorry for the delay, Tom.​

Not at all.


On Sat, Feb 25, 2017 at 9:37 AM, Tom Lee <[hidden email]> wrote:
FWIW I noticed that 100/1000 thing too, and still got "bad" results (all zeros) after changing the 100 to 1000.

​I think there was another mistake in my original code, in that the main thread was printing the value of the volatile count. Perhaps, as you were suggesting, in the case of incrementStream, the class loading or some other issues appear to cause the submitted tasks to take longer to start (than in increment).

With the 100->1000 change, I also moved the print statement inside the task [1] and then I got the comparable (albeit incorrect because of the race owing to nonatomic count++) results.

Ah yep, that would probably explain it :)



[1]

public class RaceCondition {

    private static volatile int count = 0;
    public static void main(String[] args) throws InterruptedException {
        // update the shared variable traditionally
        Runnable increment = () -> {
            System.out.println("count was: " + count + " in thread: " + Thread.currentThread());
            for (int i = 0; i < 1000; i++)
                count++;
            System.out.println("count updated to: " + count + " in thread: " + Thread.currentThread());
        };
        // update the shared variable as a side effect of an IntConsumer#accept
        Runnable incrementStream = () -> {
            System.out.println("count was: " + count + " in thread: " + Thread.currentThread());
            IntStream.rangeClosed(1, 1000).forEach(i -> count++);
            System.out.println("count updated to: " + count + " in thread: " + Thread.currentThread());
        };
        Runnable bulkUpdate = () -> count += 1000;
        ExecutorService exec = Executors.newCachedThreadPool(); // short lived tasks
        try {
            for (int i = 0; i < 10; i++) {
//                exec.execute(incrementStream);
                  exec.execute(increment);
                  // statement printing the count was here previously
            }
        } finally {
            exec.awaitTermination(5, TimeUnit.SECONDS);

            exec.shutdown();
            System.out.println("final: " + count);
        }
    }
}

On Feb 25, 2017 9:32 AM, "kedar mhaswade" <[hidden email]> wrote:
Thanks Tom and Benjamin. Yes, I am trying to demonstrate a race condition, so yes, this code is for illustration purposes only.

And upon a closer look, I found the bug in my code:

        Runnable increment = () -> {
            for (int i = 0; i < 1000; i++)
                count++;
        };

        // update the shared variable as a side effect of an IntConsumer#accept
        Runnable incrementStream = () -> IntStream.rangeClosed(1, 100).forEach(i -> count++);


The stream-oriented task was getting a 100 instead of a 1000 :-P. Fixing this gives very similar results. Of course, it clearly demonstrates that count++ is not an atomic operation and some updates are lost as expected. I also verified that by using an AtomicInteger instead of a simple volatile, the final value of count is 10,000, always.

Regards,
Kedar


On Sat, Feb 25, 2017 at 12:09 AM, Tom Lee <[hidden email]> wrote:
Hi Kedar,

Disclaimer: I'd generally say something about this approach to having multiple threads incrementing a variable being a bad idea in general, but it sounds like you're just trying to explore the behavior of this race right?

Benjamin's on the right track: you're seeing zeros because the code submitted to the executor hasn't run by the time you print out the "task ... updates count to ..." messages. Still, it's not a complete fix & you'll very likely continue to see very different results even if you add an awaitTermination call after the shutdown() call. Also probably depends on how fast your machine is etc. etc. too (e.g. perhaps on slower machines / fewer cores you'd see similar output for both).

It's still kind of interesting why the exhibited behavior is so different. Here's a bit of a hint -- with the following code I get very similar output irrespective of whether I'm using "increment" or "incrementStream":
private static volatile int count = 0;
public static void main(String[] args) throws Exception {

// warmup (slower)
run(false);

// do it for real (faster)
count = 0;
run(true);
}

private static void run(final boolean show) throws Exception {
Runnable increment = () -> {
for (int i = 0; i < 1000; i++)
count++;
};

Runnable incrementStream = () -> {
IntStream.rangeClosed(1, 1000).forEach(i -> count++);
};
ExecutorService exec = Executors.newCachedThreadPool();
try {
for (int i = 0; i < 10; i++) {
exec.execute(increment);
// exec.execute(incrementStream);
if (show) System.out.println("task: " + i + " updates count to: " + count);
}
}
finally {
exec.shutdown();
exec.awaitTermination(10, TimeUnit.SECONDS);
if (show) System.out.println("final: " + count);
}
}
Weird right?

Another hint: I added code to println() a message to the end of incrementStream() in your original code ("hello" or something silly like that). I didn't see any "hello" messages until after all the "task ... updates count to ..." messages were displayed.

Without going too deep, I suspect all your incrementStream() threads are held up by class loading etc. Specifically, the stream APIs you're using in incrementStream pull in maybe 40 additional classes on an Oracle JVM. To see for yourself, run the JVM with -verbose:class and run both increment and incrementStream -- notice the latter does a bunch of extra work. Since classes are sort of loaded on-demand, that work needs to happen before your incrementStream threads can run. Thus why the warmup step above improves the situation.

Put another way: even though logically increment and incrementStream are doing something very similar, the latter has to do a bunch of additional work before any of the stuff that touches the count variable even gets to run. And if it's not obvious it should be noted that your scenario here is very small/fast and this effect will be less pronounced (but not entirely absent) in a larger test.

Cheers,
Tom


On Fri, Feb 24, 2017 at 9:07 PM, Benjamin Manes <[hidden email]> wrote:
You need to wait until the executor has completed, or else the main method may complete prior to the task running.

exec.awaitTermination(1, TimeUnit.MINUTES);


On Fri, Feb 24, 2017 at 8:53 PM, kedar mhaswade <[hidden email]> wrote:
​Perhaps I had a long day. So, this might completely be a silly mistake, but I need congenial help to figure it out.

To demonstrate race condition, I wrote the following program. The program has ten concurrent tasks that modify count, a volatile shared variable. If I use the increment task (line 39), I get different results (like, e.g. [2]) every time I run it, demonstrating the race condition.

However, if I use the incrementStream task instead (line 39), then the count variable is not updated at all. The output is like [1] every time. In a separate program not involving threads, I have verified that the lambda expression like incrementStream updates a member variable as expected.

What am I doing wrong?

Regards,
Kedar


public class RaceCondition {

    private static volatile int count = 0;
    public static void main(String[] args) {
        // update the shared variable traditionally
        Runnable increment = () -> {
            for (int i = 0; i < 1000; i++)
                count++;
        };
        // update the shared variable as a side effect of an IntConsumer#accept
        Runnable incrementStream = () -> IntStream.rangeClosed(1, 100).forEach(i -> count++);
        ExecutorService exec = Executors.newCachedThreadPool(); // short lived tasks
        try {
            for (int i = 0; i < 10; i++) {
             
​// ​
exec.execute(incrementStream)
​;        
// line 38​
​​

              exec.execute(increment);
​         // line 39​

                System.out.println("task: " + i + " updates count to: " + count);
            }
        } finally {
            exec.shutdown();
            System.out.println("final: " + count);
        }
    }

}
[1]
task: 0 updates count to: 0
task: 1 updates count to: 0
task: 2 updates count to: 0
task: 3 updates count to: 0
task: 4 updates count to: 0
task: 5 updates count to: 0
task: 6 updates count to: 0
task: 7 updates count to: 0
task: 8 updates count to: 0
task: 9 updates count to: 0
final: 0
[2]
task: 0 updates count to: 0
task: 1 updates count to: 1000
task: 2 updates count to: 1000
task: 3 updates count to: 2000
task: 4 updates count to: 4027
task: 5 updates count to: 5000
task: 6 updates count to: 6000
task: 7 updates count to: 7005
task: 8 updates count to: 8000
task: 9 updates count to: 8185
final: 9381


_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest



_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest






_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest