PriorityBlockingQueue question

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

PriorityBlockingQueue question

David Walend
I'm imitating the PriorityBlockingQueue code to add Conditions  
dynamically.

I ran across a line of code I don't understand. The member variable  
lock is final. The offer() method sets up a final local variable  
named lock, set to the same value. Nothing can change the final  
(short of the hack used by Serialization). The Condition notEmpty and  
the PriorityQueue q don't get the same treatment.

Do I need to imitate that line of code, or can I skip it?

Thanks,

Dave

     private final PriorityQueue<E> q;
     private final ReentrantLock lock = new ReentrantLock(true);
     private final Condition notEmpty = lock.newCondition();

...

     public boolean offer(E o) {
         if (o == null) throw new NullPointerException(); //needs a  
message
         final ReentrantLock lock = this.lock; //why is a local  
variable needed?
         lock.lock();
         try {
             boolean ok = q.offer(o);
             assert ok;
             notEmpty.signal();
             return true;
         } finally {
             lock.unlock();
         }
     }


David Walend
[hidden email]


_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://altair.cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: PriorityBlockingQueue question

Kasper Nielsen-2
David Walend wrote:

> I'm imitating the PriorityBlockingQueue code to add Conditions  
> dynamically.
>
> I ran across a line of code I don't understand. The member variable  
> lock is final. The offer() method sets up a final local variable  
> named lock, set to the same value. Nothing can change the final  
> (short of the hack used by Serialization). The Condition notEmpty and  
> the PriorityQueue q don't get the same treatment.
>
> Do I need to imitate that line of code, or can I skip it?

Take a look at the "[concurrency-interest] ThreadPoolExecutor implement
question!" thread that was posted a couple of days ago.

- Kasper
_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://altair.cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: PriorityBlockingQueue question

David Walend
Thanks, everyone, for the help so far.

I've got something coded up that should work, but isn't very  
satisfying. I did find a way to signal just the conditions that  
matter, but take() and poll(timeout) still have to scan back to find  
the right message.

I know there's still things to clean up, but I'm still trying to  
convince myself this is a good approach.

I'd like a way to clean up the scanForMatchingMessages() st/ it isn't  
scanning. Any ideas?

Code is at
https://somnifugijms.dev.java.net/source/browse/somnifugijms/v3/ 
source/somnifugi/net/walend/somnifugi/juc/
MessageSelectingPriorityBlockingQueue.java?rev=1.1&view=auto&content-
type=text/vnd.viewcvs-markup

Thanks again,

Dave

David Walend
[hidden email]


_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://altair.cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: PriorityBlockingQueue question

tpeierls
On 9/25/06, David Walend <[hidden email]> wrote:
I've got something coded up that should work, but isn't very
satisfying. I did find a way to signal just the conditions that
matter, but take() and poll(timeout) still have to scan back to find
the right message.

I know there's still things to clean up, but I'm still trying to
convince myself this is a good approach.

I'm trying to understand the code first. How can take() ever work with this code:

while(message == null)
{
    condition.await();
}
 
message is a local variable, so how can you wait for it to become non-null?


I'd like a way to clean up the scanForMatchingMessages() st/ it isn't
scanning. Any ideas?

No ideas for this -- I'm thinking about the other approach.

--tim

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://altair.cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: PriorityBlockingQueue question

tpeierls
On 9/26/06, Tim Peierls <[hidden email]> wrote:
I'd like a way to clean up the scanForMatchingMessages() st/ it isn't
scanning. Any ideas?

No ideas for this -- I'm thinking about the other approach.

What if you maintained a separate queue for each selector and atomically marked messages when consumed? (You could use AtomicMarkableReference.attemptMark, for example.) Then you don't have the problem of having to remove a message from all other queues, since receivers can simply ignore messages that someone else marked.

Then take(selector) is just "take from queue associated with selector" -- more precisely, repeatedly take until you can atomically mark an unmarked message. I think you could use PBQs instead of PQs and a ConcurrentMap from selector to queue, avoiding the need for a global lock.

Not sure of the desired behavior for messages that don't match any currently waiting selector. Are they discarded? Left in their own queue for selector-less consumption? Or do they have to be scanned for matches each time you hear about a new selector? (In which case it might seem as though you're almost back to the other approach, but maybe without the need for a global lock.)

--tim

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://altair.cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: PriorityBlockingQueue question

David Walend
In reply to this post by tpeierls
On Sep 26, 2006, at 10:35 AM, Tim Peierls wrote:
>
> I'm trying to understand the code first. How can take() ever work  
> with this code:
>
> while(message == null)
> {
>     condition.await();
> }

Arg. Missed a scan. Try

                     while(message == null)
                     {
                         condition.await();
                         message = scanForMatchingMessages
(messageSelector);
                     }

Thanks,

Dave

David Walend
[hidden email]


_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://altair.cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: PriorityBlockingQueue question

David Walend
In reply to this post by tpeierls

On Sep 26, 2006, at 11:11 AM, Tim Peierls wrote:

> On 9/26/06, Tim Peierls <[hidden email]> wrote:
> No ideas for this -- I'm thinking about the other approach.
>
> What if you maintained a separate queue for each selector and  
> atomically marked messages when consumed? (You could use  
> AtomicMarkableReference.attemptMark, for example.) Then you don't  
> have the problem of having to remove a message from all other  
> queues, since receivers can simply ignore messages that someone  
> else marked.

Thanks for the suggestion, Tim.

See https://somnifugijms.dev.java.net/source/browse/somnifugijms/v3/ 
source/somnifugi/net/walend/somnifugi/juc/
MessageSelectingPriorityBlockingQueue.java?rev=1.3&view=auto&content-
type=text/vnd.viewcvs-markup for a new version.

I like this one a lot. JMS QueueSenders handle the message selector  
workings inside offer(). JMS QueueReceivers with no message selector  
(and will use the ALLMESSAGESELECTOR) will have something close to  
PBQ performance. QueueReceivers with messages selectors will still  
have to do a PBQ scan in the remove() method, but that's the only  
disappointing thing. Everything else should be very live.

Thanks again for the help. Please let me know if you see any problems  
when you flip through the code.

Dave

David Walend
[hidden email]


_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://altair.cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: PriorityBlockingQueue question

tpeierls
I had time for a quick scan only right now. I noticed a lock.lock()/unlock() pair that doesn't match the "approved" pattern: in getQueueForMessageSelector. Since you aren't using lockInterruptibly or any of the other features of RL, why not just use synchronized?

--tim

On 10/4/06, David Walend <[hidden email]> wrote:

On Sep 26, 2006, at 11:11 AM, Tim Peierls wrote:

> On 9/26/06, Tim Peierls <[hidden email]> wrote:
> No ideas for this -- I'm thinking about the other approach.
>
> What if you maintained a separate queue for each selector and
> atomically marked messages when consumed? (You could use
> AtomicMarkableReference.attemptMark, for example.) Then you don't
> have the problem of having to remove a message from all other
> queues, since receivers can simply ignore messages that someone
> else marked.

Thanks for the suggestion, Tim.

See https://somnifugijms.dev.java.net/source/browse/somnifugijms/v3/
source/somnifugi/net/walend/somnifugi/juc/
MessageSelectingPriorityBlockingQueue.java?rev=1.3&view=auto&content-
type=text/vnd.viewcvs-markup for a new version.

I like this one a lot. JMS QueueSenders handle the message selector
workings inside offer(). JMS QueueReceivers with no message selector
(and will use the ALLMESSAGESELECTOR) will have something close to
PBQ performance. QueueReceivers with messages selectors will still
have to do a PBQ scan in the remove() method, but that's the only
disappointing thing. Everything else should be very live.

Thanks again for the help. Please let me know if you see any problems
when you flip through the code.

Dave

David Walend
[hidden email]




_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://altair.cs.oswego.edu/mailman/listinfo/concurrency-interest