ExecutorCompletionService is dangerous for critical app!?

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

ExecutorCompletionService is dangerous for critical app!?

yangjs
 
 
hi,all
 
   I  use ExecutorCompletionService in my app ,that is a producer-consumer pattern. code like follow:
 
  blic ExecutorCompletionService jobQueue = new ExecutorCompletionService<Document>(
   getThreadExecutor(), getBlockingQueue(100));
 
I found when ExecutorCompletionService found queue full,then the finished job was discarded.
after read the implement code ,the ExecutorCompletionService implments the QueueingFuture as follow:
 
    private class QueueingFuture extends FutureTask<V> {
        QueueingFuture(Callable<V> c) { super(c); }
        QueueingFuture(Runnable t, V r) { super(t, r); }
        protected void done() { completionQueue.add(this); }
    }
 
   when queue  full, completionQueue.add(this) will throw exception but swallowed by Future self.so the task will disappear,this is very dangerous for my critical app.
 
 why use completionQueue.add(this) ,why not use completionQueue.put(this),this is a block method , we want use throttling strategy when queue is full,I will implement self ,like this :
 
 
    private class QueueingFuture extends FutureTask<V> {
        QueueingFuture(Callable<V> c) { super(c); }
        QueueingFuture(Runnable t, V r) { super(t, r); }
        protected void done() { completionQueue.put(this); }
    }
 
 
is right?
 
 
 
Best regards,

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://altair.cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: ExecutorCompletionService is dangerous for critical app!?

Doug Lea
yangjs wrote:

>
>    I  use ExecutorCompletionService in my app ,that is a
> producer-consumer pattern. code like follow:
>  
>
>     ExecutorCompletionService jobQueue = new
>     ExecutorCompletionService<Document>(
>        getThreadExecutor(), getBlockingQueue(100));/
>      
>    
>        when queue  full, completionQueue.add(this) will throw exception
>     but swallowed by Future self.

Sorry that the javadoc did not make clear that the supplied queue
should be unbounded. This should/will be clarified. The rationale
for not using BlockingQueue.put here is that there is no
policy control in this simple class for dealing with either
InterruptedExceptions that might occur with put, or the potential
for saturation and lockup.

Unless/until we get good suggestions for dealing with
such things in a generic way, I'm afraid that in those cases
where you have to deal with them, you'll need a custom
implementation of the CompletionService interface.

-Doug

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://altair.cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: ExecutorCompletionService is dangerous for critical app!?

yangjs
    This is my custom implementation of the CompletionService interface.
Can I use this impl for my work?

public class BlockingCompletionService<V> implements CompletionService<V> {
 private final Executor executor;

 private final BlockingQueue<Future<V>> completionQueue;

 private boolean isBlocking;

 /**
  * FutureTask extension to enqueue upon completion
  */
 private class QueueingFuture extends FutureTask<V> {
  QueueingFuture(Callable<V> c) {
   super(c);
  }

  QueueingFuture(Runnable t, V r) {
   super(t, r);
  }

  protected void done() {
   if (isBlocking) {
    try {
     completionQueue.put(this);
    } catch (InterruptedException e) {
     Thread.currentThread().interrupt();
     cancel(true);
    }
   } else {
    completionQueue.add(this);
   }
  }
 }

 public BlockingCompletionService(Executor executor) {
  if (executor == null)
   throw new NullPointerException();
  this.executor = executor;
  this.completionQueue = new LinkedBlockingQueue<Future<V>>();
 }

 public BlockingCompletionService(Executor executor,
   BlockingQueue<Future<V>> completionQueue, boolean isBlocking) {
  if (executor == null || completionQueue == null)
   throw new NullPointerException();
  this.executor = executor;
  this.completionQueue = completionQueue;
  this.isBlocking = isBlocking;
 }

 public Future<V> submit(Callable<V> task) {
  if (task == null)
   throw new NullPointerException();
  QueueingFuture f = new QueueingFuture(task);
  executor.execute(f);
  return f;
 }

 public Future<V> submit(Runnable task, V result) {
  if (task == null)
   throw new NullPointerException();
  QueueingFuture f = new QueueingFuture(task, result);
  executor.execute(f);
  return f;
 }

 public Future<V> take() throws InterruptedException {
  return completionQueue.take();
 }

 public Future<V> poll() {
  return completionQueue.poll();
 }

 public Future<V> poll(long timeout, TimeUnit unit)
   throws InterruptedException {
  return completionQueue.poll(timeout, unit);
 }

}


----- Original Message -----
From: "Doug Lea" <[hidden email]>
To: "yangjs" <[hidden email]>
Cc: <[hidden email]>
Sent: Saturday, August 12, 2006 2:16 AM
Subject: Re: [concurrency-interest] ExecutorCompletionService is dangerous
for critical app!?


> yangjs wrote:
>>
>>    I  use ExecutorCompletionService in my app ,that is a
>> producer-consumer pattern. code like follow:
>>  ExecutorCompletionService jobQueue = new
>>     ExecutorCompletionService<Document>(
>>        getThreadExecutor(), getBlockingQueue(100));/
>>      when queue  full, completionQueue.add(this) will throw exception
>>     but swallowed by Future self.
>
> Sorry that the javadoc did not make clear that the supplied queue
> should be unbounded. This should/will be clarified. The rationale
> for not using BlockingQueue.put here is that there is no
> policy control in this simple class for dealing with either
> InterruptedExceptions that might occur with put, or the potential
> for saturation and lockup.
>
> Unless/until we get good suggestions for dealing with
> such things in a generic way, I'm afraid that in those cases
> where you have to deal with them, you'll need a custom
> implementation of the CompletionService interface.
>
> -Doug
>
_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://altair.cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: ExecutorCompletionService is dangerous for critical app!?

Doug Lea
yangjs wrote:
>     This is my custom implementation of the CompletionService interface.
> Can I use this impl for my work?
>

Yes, this seems reasonable. You will still sometimes lose
tasks (on interrupt), but presumably your application can cope
with this.

-Doug


>
>   protected void done() {
>    if (isBlocking) {
>     try {
>      completionQueue.put(this);
>     } catch (InterruptedException e) {
>      Thread.currentThread().interrupt();
>      cancel(true);
>     }
>    } else {
>     completionQueue.add(this);
>    }
>   }
>  }
_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://altair.cs.oswego.edu/mailman/listinfo/concurrency-interest