ForkJoinPool deadlock - bug?

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

ForkJoinPool deadlock - bug?

JSR166 Concurrency mailing list
Dear experts,

We are using ForkJoinPool as part of a software project. In our integration tests, we occasionally observe deadlocks with the following characteristics:
- The ForkJoinPool stops executing new tasks.
- All worker threads are waiting inside of ForkJoinPool.managedBlock.
- The number of running threads is 0, the number of active threads is 1.

After increasing the "parallelism" parameter, the problem still occurred. Increasing the "minimumRunnable" parameter from 1 to 2 made the problem disappear. So we have a workaround for the moment.

Further investigations revealed that worker threads may be SIGNALLED or UNSIGNALLED. When a worker thread is UNSIGNALLED, it will be parked soon after. However, an UNSIGNALLED thread scans the work queues before being parked; so UNSIGNALLED worker threads may actually execute tasks. 

It seems, if an UNSIGNALLED thread executes "managedBlock", the RC variable may temporarily be too high by one. Which ultimately results in the deadlock we have observed.

Below, you can find a unit test that reproduces the issue quite reliably (on my machine). It has been compiled with Scala 2.12.11 (I hope Scala is ok, if not I can convert it to Java) and executed with OpenJDK 11.0.6.

I kindly ask you to critically review my observations and let me know if they are correct. Also, if there is a bug in ForkJoinPool, how can I request a fix? Lastly, if this is not the right place to discuss such observations, I would be glad if you could point me to the right place.

Thank you very much in advance and have a nice day!

Matthias Schmalz
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.{ForkJoinPool, ForkJoinWorkerThread, Semaphore, TimeUnit}

import org.scalatest.{Matchers, WordSpec}

/**
* Demonstrates a deadlock in ForkJoinPool.
*
* Compiled with Scala 2.12.11
* Executed with OpenJDK 11.0.6
*/
class ForkJoinPoolTest extends WordSpec with Matchers {

lazy val forkJoinPool = new ForkJoinPool(
1, // The problem can also be reproduced with parallelism = 2.
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
((_, t) => t.printStackTrace()): Thread.UncaughtExceptionHandler,
true
)

val MinDelayCycles = 50000
val MaxDelayCycles = 100000
val Iterations = MaxDelayCycles - MinDelayCycles

"A task may run on an UNSIGNALLED work queue" in {
val completedTasks = new Semaphore(0)
val tasksOnUnsignalledWorkQueues = new AtomicInteger()

for (delay <- MinDelayCycles until MaxDelayCycles) {
forkJoinPool.execute(() => {
if (!isSignalled) {
tasksOnUnsignalledWorkQueues.getAndIncrement()
}
completedTasks.release()
})

busyWait(delay)
}

// All tasks scheduled. Awaiting termination.
completedTasks.tryAcquire(Iterations, 10, TimeUnit.SECONDS) shouldBe true

Console.err.println(s"Hits: ${tasksOnUnsignalledWorkQueues.get()}, Total: $Iterations")
tasksOnUnsignalledWorkQueues.get() should be > 0
}

"If several blocking tasks run on SIGNALLED work queues, all tasks complete" in {
val startedTasks = new Semaphore(0)
val blocker = new Semaphore(0)
val blockLimit = new AtomicInteger(100)

for (delay <- MinDelayCycles until MaxDelayCycles) {
forkJoinPool.execute(() => {
startedTasks.release()
if (isSignalled && blockLimit.getAndDecrement() > 0) {
blocking {
blocker.acquire()
}
}
})

busyWait(delay)
}

// As ForkJoinPool.managedBlock will create extra threads,
// all tasks will eventually be started.
startedTasks.tryAcquire(Iterations, 10, TimeUnit.SECONDS) shouldBe true

// Cleanup for next test
blocker.release(100)
}

"If several blocking tasks run on UNSIGNALLED work queues, the pool may deadlock" in {
val runningTasks = new Semaphore(0)
val blockers = new Semaphore(0)
val blockLimit = new AtomicInteger(10)

for (delay <- MinDelayCycles until MaxDelayCycles) {
forkJoinPool.execute(() => {
runningTasks.release()
if (!isSignalled && blockLimit.getAndDecrement() > 0) {
blocking {
blockers.acquire()
}
}
})

busyWait(delay)
}

// PROBLEM: runningTasks can no longer be acquired, so ForkJoinPool is running out of threads.
runningTasks.tryAcquire(Iterations, 10, TimeUnit.SECONDS) shouldBe false

// Looking at the output, all threads are parked or inside of ForkJoinPool.managedBlock,
// so apparently it failed to release an extra thread.
printStackTraces()

// The activeThreadCount seems to be off by one.
forkJoinPool.getRunningThreadCount shouldBe 0
forkJoinPool.getActiveThreadCount shouldBe 1
}

def busyWait(delay: Int): Long = {
var result = 0L
for (i <- 0 until delay) {
result += i * i
}
result
}

def isSignalled: Boolean = {
val workQueueField = classOf[ForkJoinWorkerThread].getDeclaredField("workQueue")
workQueueField.setAccessible(true)
val q = workQueueField.get(Thread.currentThread())

val phaseField = Class.forName("java.util.concurrent.ForkJoinPool$WorkQueue").getDeclaredField("phase")
phaseField.setAccessible(true)
val phase = phaseField.get(q).asInstanceOf[Int]

phase >= 0
}

def blocking(thunk: => Unit): Unit = {
val blocker: ForkJoinPool.ManagedBlocker = new ForkJoinPool.ManagedBlocker {
private[this] var done: Boolean = false
override def block(): Boolean = {
try {
if (!done) {
thunk
}
} finally {
done = true
}

true
}

override def isReleasable: Boolean = done
}
ForkJoinPool.managedBlock(blocker)
}

private def printStackTraces(): Unit = {
val traces = {
import scala.collection.JavaConverters._
Thread.getAllStackTraces.asScala.toMap
.collect {
case (thread, stackTrace) if thread.getName.startsWith("ForkJoinPool") =>
s"$thread state=${thread.getState.toString}" + stackTrace.mkString("\n\t", "\n\t", "\n")
}
.mkString("\n")
}
Console.out.println(s"Here are the stack-traces of ForkJoinPool worker threads:\n$traces")
}
}
--
Dr. Matthias Schmalz
Senior Software Engineer
e: [hidden email] 
Digital Asset (Switzerland) GmbH
Luggwegstrasse 9
8048 Zurich, Switzerland
digitalasset.com

This message, and any attachments, is for the intended recipient(s) only, may contain information that is privileged, confidential and/or proprietary and subject to important terms and conditions available at http://www.digitalasset.com/emaildisclaimer.html. If you are not the intended recipient, please delete this message.
_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: ForkJoinPool deadlock - bug?

JSR166 Concurrency mailing list
Thanks for the report. It appears to be a bug. I'll further investigate.

On 10/21/20 10:52 AM, Matthias Schmalz via Concurrency-interest wrote:

> Dear experts,
>
> We are using ForkJoinPool as part of a software project. In our
> integration tests, we occasionally observe deadlocks with the
> following characteristics:
> - The ForkJoinPool stops executing new tasks.
> - All worker threads are waiting inside of ForkJoinPool.managedBlock.
> - The number of running threads is 0, the number of active threads is 1.
>
> After increasing the "parallelism" parameter, the problem still
> occurred. Increasing the "minimumRunnable" parameter from 1 to 2 made
> the problem disappear. So we have a workaround for the moment.
_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest