Question regarding usage of LockSupprt.park/unpark in custom queue implementation

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Question regarding usage of LockSupprt.park/unpark in custom queue implementation

alarmnummer
Hi,

I have a question regarding a multi producer single consumer queue implementation. The queue works on the principle of 2 stacks; when threads put an item it is put in a stack (single linked list nodes) so the items are in reverse. When an item is taken, the whole stack can be removed using a simple cas and in reverse order the items are put in an array. So the original order is being restored.

I know this MPSC-queue isn't the most efficient one and also it generates litter. The main question is regarding the correctness of the park/unpark. For example can it happen that the consumer isn't unparked.

I have had a long look at this code and I had a few more experienced guys looks at this code as well. Till so far we could not shoot a hole in the approach, but the underbelly feeling is still there. Could someone have a look at it and tell if this queue is broken or not?


public final class MPSCQueue<E> extends AbstractQueue<E> implements BlockingQueue<E> {
private static final Node BLOCKED = new Node();
private static final int INITIAL_ARRAY_SIZE = 512;

private final AtomicReference<Node> putStackHead = new AtomicReference<Node>();

private final Thread owningThread;

// will only be used by the owningThread.
private Object[] array;
private int arrayIndex = -1;

public MPSCQueue(Thread owningThread) {
this.owningThread = owningThread;
this.array = new Object[INITIAL_ARRAY_SIZE];
}

@Override
public void clear() {
putStackHead.set(null);
}

@Override
public boolean offer(E value) {
if (value == null) {
throw new IllegalArgumentException("value can't be null");
}

AtomicReference<Node> head = putStackHead;
Node newHead = new Node();
newHead.value = value;

for (; ; ) {
Node oldHead = head.get();
if (oldHead == null || oldHead == BLOCKED) {
newHead.next = null;
newHead.size = 1;
} else {
newHead.next = oldHead;
newHead.size = oldHead.size + 1;
}

if (!head.compareAndSet(oldHead, newHead)) {
continue;
}

if (oldHead == BLOCKED) {
unpark(owningThread);
}

return true;
}
}

@Override
public E take() throws InterruptedException {
E item = next();
if (item != null) {
return item;
}

takeAll();
assert arrayIndex == 0;
assert array[arrayIndex] != null;

return next();
}

@Override
public E poll() {
E item = next();

if (item != null) {
return item;
}

if (!pollAll()) {
return null;
}

return next();
}

private E next() {
if (arrayIndex == -1) {
return null;
}

if (arrayIndex == array.length) {
arrayIndex = -1;
return null;
}

E item = (E) array[arrayIndex];
if (item == null) {
arrayIndex = -1;
return null;
}
array[arrayIndex] = null;
arrayIndex++;
return item;
}

public void takeAll() throws InterruptedException {
long iteration = 0;
AtomicReference<Node> head = putStackHead;
for (; ; ) {
if (owningThread.isInterrupted()) {
head.compareAndSet(BLOCKED, null);
throw new InterruptedException();
}

Node currentHead = head.get();

if (currentHead == null) {

// there is nothing to be take, so lets block.
if (!head.compareAndSet(null, BLOCKED)) {
continue;
}

park();
} else if (currentHead == BLOCKED) {
park();
} else {
if (!head.compareAndSet(currentHead, null)) {
continue;
}

initArray(currentHead);
break;
}
iteration++;
}
}

public boolean pollAll() {
AtomicReference<Node> head = putStackHead;
for (; ; ) {
Node headNode = head.get();
if (headNode == null) {
return false;
}

if (head.compareAndSet(headNode, null)) {
initArray(headNode);
return true;
}
}
}

private void initArray(Node head) {
int size = head.size;

assert head != BLOCKED;
assert size != 0;

Object[] drain = this.array;
if (size > drain.length) {
drain = new Object[head.size * 2];
this.array = drain;
}

for (int i = size - 1; i >= 0; i--) {
drain[i] = head.value;
head = head.next;
}

for (int k = 0; k < array.length; k++) {
if (array[k] == null) {
break;
}
}

arrayIndex = 0;
assert array[0] != null;
}

@Override
public int size() {
//todo: size can't be relied upon because this items which have been copied into the array, are not visible apart
//to the owning thread.
Node h = putStackHead.get();
return h == null ? 0 : h.size;
}

@Override
public boolean isEmpty() {
return size() == 0;
// throw new UnsupportedOperationException();
}

@Override
public void put(E e) throws InterruptedException {
offer(e);
}

@Override
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
throw new UnsupportedOperationException();
}

@Override
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
throw new UnsupportedOperationException();
}

@Override
public int remainingCapacity() {
throw new UnsupportedOperationException();
}

@Override
public int drainTo(Collection<? super E> c) {
throw new UnsupportedOperationException();
}

@Override
public int drainTo(Collection<? super E> c, int maxElements) {
throw new UnsupportedOperationException();
}

@Override
public Iterator<E> iterator() {
throw new UnsupportedOperationException();
}

@Override
public E peek() {
throw new UnsupportedOperationException();
}

private static final class Node<E> {
Node next;
E value;
int size;
}
}

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: Question regarding usage of LockSupprt.park/unpark in custom queue implementation

Martin Buchholz-3
We now put this into every class that uses park/unpark

        // Reduce the risk of rare disastrous classloading in first call to
        // LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773
        Class<?> ensureLoaded = LockSupport.class;
_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: Question regarding usage of LockSupprt.park/unpark in custom queue implementation

Giambattista Bloisi
In reply to this post by alarmnummer

Hi,
     the only problem I can spot so far is that an offer after a clear could not wake up a parked consumer. So in the clear method it would be better to put BLOCKED reference in head field rather than null.

Regards,

Il 08/mar/2016 10:45 AM, "Peter Veentjer" <[hidden email]> ha scritto:
Hi,

I have a question regarding a multi producer single consumer queue implementation. The queue works on the principle of 2 stacks; when threads put an item it is put in a stack (single linked list nodes) so the items are in reverse. When an item is taken, the whole stack can be removed using a simple cas and in reverse order the items are put in an array. So the original order is being restored.

I know this MPSC-queue isn't the most efficient one and also it generates litter. The main question is regarding the correctness of the park/unpark. For example can it happen that the consumer isn't unparked.

I have had a long look at this code and I had a few more experienced guys looks at this code as well. Till so far we could not shoot a hole in the approach, but the underbelly feeling is still there. Could someone have a look at it and tell if this queue is broken or not?


public final class MPSCQueue<E> extends AbstractQueue<E> implements BlockingQueue<E> {
private static final Node BLOCKED = new Node();
private static final int INITIAL_ARRAY_SIZE = 512;

private final AtomicReference<Node> putStackHead = new AtomicReference<Node>();

private final Thread owningThread;

// will only be used by the owningThread.
private Object[] array;
private int arrayIndex = -1;

public MPSCQueue(Thread owningThread) {
this.owningThread = owningThread;
this.array = new Object[INITIAL_ARRAY_SIZE];
}

@Override
public void clear() {
putStackHead.set(null);
}

@Override
public boolean offer(E value) {
if (value == null) {
throw new IllegalArgumentException("value can't be null");
}

AtomicReference<Node> head = putStackHead;
Node newHead = new Node();
newHead.value = value;

for (; ; ) {
Node oldHead = head.get();
if (oldHead == null || oldHead == BLOCKED) {
newHead.next = null;
newHead.size = 1;
} else {
newHead.next = oldHead;
newHead.size = oldHead.size + 1;
}

if (!head.compareAndSet(oldHead, newHead)) {
continue;
}

if (oldHead == BLOCKED) {
unpark(owningThread);
}

return true;
}
}

@Override
public E take() throws InterruptedException {
E item = next();
if (item != null) {
return item;
}

takeAll();
assert arrayIndex == 0;
assert array[arrayIndex] != null;

return next();
}

@Override
public E poll() {
E item = next();

if (item != null) {
return item;
}

if (!pollAll()) {
return null;
}

return next();
}

private E next() {
if (arrayIndex == -1) {
return null;
}

if (arrayIndex == array.length) {
arrayIndex = -1;
return null;
}

E item = (E) array[arrayIndex];
if (item == null) {
arrayIndex = -1;
return null;
}
array[arrayIndex] = null;
arrayIndex++;
return item;
}

public void takeAll() throws InterruptedException {
long iteration = 0;
AtomicReference<Node> head = putStackHead;
for (; ; ) {
if (owningThread.isInterrupted()) {
head.compareAndSet(BLOCKED, null);
throw new InterruptedException();
}

Node currentHead = head.get();

if (currentHead == null) {

// there is nothing to be take, so lets block.
if (!head.compareAndSet(null, BLOCKED)) {
continue;
}

park();
} else if (currentHead == BLOCKED) {
park();
} else {
if (!head.compareAndSet(currentHead, null)) {
continue;
}

initArray(currentHead);
break;
}
iteration++;
}
}

public boolean pollAll() {
AtomicReference<Node> head = putStackHead;
for (; ; ) {
Node headNode = head.get();
if (headNode == null) {
return false;
}

if (head.compareAndSet(headNode, null)) {
initArray(headNode);
return true;
}
}
}

private void initArray(Node head) {
int size = head.size;

assert head != BLOCKED;
assert size != 0;

Object[] drain = this.array;
if (size > drain.length) {
drain = new Object[head.size * 2];
this.array = drain;
}

for (int i = size - 1; i >= 0; i--) {
drain[i] = head.value;
head = head.next;
}

for (int k = 0; k < array.length; k++) {
if (array[k] == null) {
break;
}
}

arrayIndex = 0;
assert array[0] != null;
}

@Override
public int size() {
//todo: size can't be relied upon because this items which have been copied into the array, are not visible apart
//to the owning thread.
Node h = putStackHead.get();
return h == null ? 0 : h.size;
}

@Override
public boolean isEmpty() {
return size() == 0;
// throw new UnsupportedOperationException();
}

@Override
public void put(E e) throws InterruptedException {
offer(e);
}

@Override
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
throw new UnsupportedOperationException();
}

@Override
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
throw new UnsupportedOperationException();
}

@Override
public int remainingCapacity() {
throw new UnsupportedOperationException();
}

@Override
public int drainTo(Collection<? super E> c) {
throw new UnsupportedOperationException();
}

@Override
public int drainTo(Collection<? super E> c, int maxElements) {
throw new UnsupportedOperationException();
}

@Override
public Iterator<E> iterator() {
throw new UnsupportedOperationException();
}

@Override
public E peek() {
throw new UnsupportedOperationException();
}

private static final class Node<E> {
Node next;
E value;
int size;
}
}

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest


_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest