Multi-Reader One-Writer Queue

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Multi-Reader One-Writer Queue

Jean Morissette
Hi,
    I'm developing an application where a single writer thread must send
messages to many reader thread.  The particularity is that a message
must be read by all readers before being discarded.  Also, low memory
consomption is an important requirement.  To achieve that, I'm thinking
to create an array-based custom queue where each reader is in fact a
queue proxy that keep its "dequeue pointer"  in the array.  Only the
last reader of an element will remove it from the array.  However, I'm
wondering how to determine efficiently who is the last reader during
dequeue operation?

Is this the best way to accomplish this?  Any advices would be appreciated.
Thanks,
-Jean


_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://altair.cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: Multi-Reader One-Writer Queue

Brian Goetz
 > However, I'm
 > wondering how to determine efficiently who is the last reader during
 > dequeue operation?

I think that's a task for the garbage collector.

> To achieve that, I'm thinking to create an array-based custom queue
 > where each reader is in fact a queue proxy that keep its "dequeue
 > pointer" in the array.

This will only work if you can ensure that the producer cannot outrun
the slowest consumer.  If you can't, then you have to make the
multi-insert operation block.

To do this, you could have a semaphore which represents the put credit,
and no master queue, but N array-based queues:

   Semaphore sem = new Semaphore(bound);

The put operation operates on all queues:

   void put(T t) throws IE {
     sem.acquire();
     Node n = new Node(t, nQueues);
     for (Queue q : queues)
       q.put(n);
   }

The get() operation operates on one queue, and the last of the get()
operations for a given node releases the semaphore permit:

   T get() {
     Node n = q.take();
     int count = n.count.decrementAndGet();
     if (count == 0)
       sem.release();
   }

   class Node<T> {
     final T item;
     final AtomicInteger count;

     public Node(T item, int count) {
       this.item = item;
       this.count = new AtomicInteger(count);
     }
   }

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://altair.cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: Multi-Reader One-Writer Queue

Gregg Wonderly-2


Brian Goetz wrote:

>  > However, I'm
>  > wondering how to determine efficiently who is the last reader during
>  > dequeue operation?
>
> I think that's a task for the garbage collector.
>
>> To achieve that, I'm thinking to create an array-based custom queue
>
>  > where each reader is in fact a queue proxy that keep its "dequeue
>  > pointer" in the array.
>
> This will only work if you can ensure that the producer cannot outrun
> the slowest consumer.  If you can't, then you have to make the
> multi-insert operation block.
>
> To do this, you could have a semaphore which represents the put credit,
> and no master queue, but N array-based queues:

The other choice is to make the receivers be listeners in a list that the producer cycles through.  If the listeners
need to manage some queuing, they could use queues in their listener objects.  But, in the end, you have to be very
careful about unbounded production opportunities.  It can seem attractive to let them run asynchronously, but this can
be a big trap to fall into.

Asynchronous processing is good for unrelated activities, or when you are using multi-threading to hide latency.  But,
you have to be careful to still keep data producers and consumers in sync either by using bounded queues, or counting
semaphores or something that will keep the outstanding operations at some reasonable level.

Gregg Wonderly
_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://altair.cs.oswego.edu/mailman/listinfo/concurrency-interest