Re: Help, how to design a pool of threads

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Re: Help, how to design a pool of threads

Andrej Navodnik
Hi all,

let's say that I did some homework ;-) Here is my
code:

import java.lang.reflect.Array;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class Test {
  public static void main(String... args) {
    int N = 100;
    FindOptimalElement s = new FindOptimalElement(
        prepareTestData(N));
    System.out.println(
        "Waiting for optimal element...");
    Result r = s.get();
    System.out.println("Result: " + r);
    System.exit(0);
  }
 
  static List<Pair> prepareTestData(int n) {
    Random r = new Random();
    List<Pair> testData = new ArrayList<Pair>();
    for (int i = 0; i < n; i++) {
      double a = r.nextDouble();
      double b = r.nextDouble();
      testData.add(new Pair(a, b));
    }
    return testData;
  }
}

interface OptimalElement<E> {
  boolean isOptimalElement(E e);
}

class FindOptimalElement {
  private static final int NUMBER_WORKERS = 10;
  private final AtomicBoolean optimalElementFound;
  private final AtomicInteger testedData;
  private final int numberOfData;
  private final BlockingQueue<Future<Result>> workers;
  private final ReentrantLock dataLock;
  private volatile Result result;
  private volatile Condition resultPrepared;
  private boolean resultDefined;
  private final ThreadPoolExecutor service;
 
  FindOptimalElement(List<Pair> data) {
    this.numberOfData = data.size();
    this.optimalElementFound =
        new AtomicBoolean(false);
    this.testedData = new AtomicInteger(0);
    this.dataLock = new ReentrantLock();
    this.resultPrepared = dataLock.newCondition();
    this.workers =
        new ArrayBlockingQueue<Future<Result>>(
        NUMBER_WORKERS);
   
    int corePoolSize = NUMBER_WORKERS;
    int maximumPoolSize = NUMBER_WORKERS;
    long keepAliveTime = 1;
    BlockingQueue<Runnable> workQueue =
        new LinkedBlockingQueue<Runnable>();
    service = new ThreadPoolExecutor(corePoolSize,
        maximumPoolSize, keepAliveTime,
        TimeUnit.SECONDS, workQueue);
    service.prestartAllCoreThreads();
   
    service.execute(new Producer(data));
    service.execute(new Consumer());
  }
 
  // how should I cancel/stop other threads,
  // is this possible??
  private void cancelRunnables(
      Runnable excludeRunnable) {
    BlockingQueue<Runnable> queue=service.getQueue();
    Runnable[] runnables = (Runnable[]) Array
        .newInstance(Runnable.class, NUMBER_WORKERS);
    queue.toArray(runnables);
    for (Runnable r : runnables) {
      System.out.println("Task: " + r);
      if (r != null && !r.equals(excludeRunnable)) {
        System.out.println("Removed task: " + r);
        service.remove(r);
      }
    }
    service.purge();
    System.out.println("Other threads in the pool " +
        "should be canceled/removed, are they?");
  }
 
  class Producer implements Runnable {
    private final List<Pair> testData;
    Producer(List<Pair> testData) {
      this.testData = testData;
    }
    public void run() {
      try {
        for (Pair p : testData) {
          Callable<Result> c = new Worker(p);
          Future<Result> task = service.submit(c);
          workers.put(task);
        }
      } catch (InterruptedException ie) {
        ie.printStackTrace();
      }
    }
  }
 
  class Consumer implements Runnable {
    public void run() {
      for (;;) {
        try {
          final Future<Result> f = workers.take();
          Runnable task = new PrepareResult(f);
          service.execute(task);
        } catch (InterruptedException ie) {
          ie.printStackTrace();
          return ;
        }
      }
    }
   
    class PrepareResult implements Runnable {
      private final Future<Result> f;
      PrepareResult(Future<Result> f) {
        this.f = f;
      }
      public void run() {
        try {
          Result r = f.get();
          dataLock.lock();
          try {
            if (r != null) {
              result = r;
              resultDefined = true;
              resultPrepared.signal();
            }
            if (!resultDefined) {
              int n = testedData.incrementAndGet();
              if (n==numberOfData) {
                System.out.println(
                    "All data tested, " +
                    "no optimal element found...");
                resultDefined = true;
                resultPrepared.signal();
              }
            }
          } finally {
            dataLock.unlock();
          }
         
        } catch (ExecutionException ee) {
          ee.printStackTrace();
        } catch (InterruptedException ie) {
          ie.printStackTrace();
        }
      }
    }
  }
 
  public Result get() {
    dataLock.lock();
    try {
      while (resultDefined == false)
        resultPrepared.await();
    } catch (InterruptedException ex) {
      ex.printStackTrace();
    } finally {
      dataLock.unlock();
    }
    return result;
  }
 
  private class Worker implements Callable<Result>,
      OptimalElement<Pair> {
    private final Pair p;
   
    Worker(Pair p) {
      this.p = p;
    }
   
    public Result call() {
      if (optimalElementFound.get()) {
        return null;
      }
      System.out.println(
        "Seaching for optimal element...");
      try {
        Thread.sleep(1000);
      } catch (InterruptedException ex) {
        ex.printStackTrace();
      }
     
      // check if the element satisfies intermediate
      // condition
      if (isOptimalElement(p)) {
        System.out.println(
          "Optimal element is found...");
        for(;;) {
          if (!optimalElementFound.get()) {
            optimalElementFound.compareAndSet(
              false, true);
            if (optimalElementFound.get()) {
              break;
            }
          } else
            return null;
        }
       
        // if proper element is found then
        // cancel other tasks,
        // actually this does not work
        System.out.println("Cancel other threads...");
        Thread currentThread = Thread.currentThread();
        cancelRunnables(currentThread);
       
        try {
          // simulate hard work, to find solution
          Thread.sleep(10000);
        } catch (InterruptedException ex) {
          ex.printStackTrace();
        }
       
        // final phase, prepare result
        System.out.println(Thread.currentThread() +
            ": Returning optimal element...");
        return new Result(p, p.a+p.b);
      } else {
        try {
          // simulate useless job that should
          // not be executed
          Thread.sleep(2000);
        } catch (InterruptedException ex) {
          ex.printStackTrace();
        }
        if (optimalElementFound.get()) {
          System.out.println(Thread.currentThread() +
              ": Task is still doing useless " +
              "things...");
        }
        return null;
      }
    }
   
    public boolean isOptimalElement(Pair p) {
      return  p.a > 0.8 && p.b > 0.8;
      // simulate hard to find optimal element, result

      // should be null;
      //return  p.a > 0.99 && p.b > 0.99;
    }
  }
}


class Result {
  public final double sum;
  public final Pair p;
  Result(Pair p, double sum) {
    this.p = p;
    this.sum = sum;
  }
  public String toString() {
    return "Input: " + p + " ==> result: " + sum;
  }
}

class Pair {
  public final double a;
  public final double b;
  Pair(double a, double b) {
    this.a = a;
    this.b = b;
  }
  public String toString() {
    return "[a: " + a + ", b: " + b + "]";
  }
}


If the input data set contains optimal data then the
output of the program is the following:

Waiting for optimal element...
Seaching for optimal element...
Seaching for optimal element...
Seaching for optimal element...
Seaching for optimal element...
Seaching for optimal element...
Seaching for optimal element...
Seaching for optimal element...
Seaching for optimal element...
Seaching for optimal element...
Seaching for optimal element...
Optimal element is found...
Cancel other threads...
Task: null
Task: null
Task: null
Task: null
Task: null
Task: null
Task: null
Task: null
Task: null
Task: null
Other threads in the pool should be canceled/removed,
are they?
Thread[pool-1-thread-6,5,main]: Task is still doing
useless things...
Thread[pool-1-thread-5,5,main]: Task is still doing
useless things...
Thread[pool-1-thread-1,5,main]: Task is still doing
useless things...
Thread[pool-1-thread-3,5,main]: Task is still doing
useless things...
Thread[pool-1-thread-4,5,main]: Task is still doing
useless things...
Thread[pool-1-thread-10,5,main]: Task is still doing
useless things...
Thread[pool-1-thread-9,5,main]: Task is still doing  
             useless things...
Thread[pool-1-thread-7,5,main]: Task is still doing
useless things...
Thread[pool-1-thread-8,5,main]: Returning optimal
element...
Result: Input: [a: 0.9554837437185378, b:
0.8581452336165145] ==> result: 1.8136289773350525

My questions are:
- how should I design my application so that the tasks

in the pool would be canceled/stopped after the
optimal element is found except for the task
that has found that “optimal element” - I wouldn't
like to see the message “Task is still doing useless
things” because it's actually using resources that
should be used fior preparing the final result;
- should I concern about this problem or not?
- are there any other constructs that I could
use and achieve the same result?

Best regards,
Andrei

P.S.: Please don't laugh at my code, I'm still
beginner as far as concurrent programming is
concerned...


               




               
__________________________________
Yahoo! FareChase: Search multiple travel sites in one click.
http://farechase.yahoo.com
_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://altair.cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: Re: Help, how to design a pool of threads

Doug Lea
Andrej Navodnik wrote:

>
> My questions are:
> - how should I design my application so that the tasks
>
> in the pool would be canceled/stopped after the
> optimal element is found except for the task
> that has found that “optimal element” - I wouldn't
> like to see the message “Task is still doing useless
> things” because it's actually using resources that
> should be used fior preparing the final result;
> - should I concern about this problem or not?
> - are there any other constructs that I could
> use and achieve the same result?
>

You might find the second code example in the ExecutorCompletionService
javadoc helpful. See

http://java.sun.com/j2se/1.5.0/docs/api/java/util/concurrent/ExecutorCompletionService.html

-Doug


_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://altair.cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: Re: Help, how to design a pool of threads

Joe Bowbeer
In reply to this post by Andrej Navodnik
First, I'm trying to understand the threading scheme.  It looks likethere is a producer that submits worker tasks to the pool and aconsumer that processes the results.  I suggest you move the producerand consumer into the same thread.  Perhaps this could be the mainthread.  That is, after you create the service, do the producer thingin the main thread next, submitting the worker tasks to the service;then do the consumer thing, processing the results.  If the mainthread won't do, move the producer and consumer to a single backgroundthread.
Moving the producer and consumer into a single thread leaves thethread pool dedicated to the worker tasks.  Easier to manage thatway...
> My questions are:
> - how should I design my application so that the tasks> in the pool would be canceled/stopped after the> optimal element is found except for the task> that has found that �optimal element� - I wouldn't> like to see the message �Task is still doing useless> things� because it's actually using resources that> should be used fior preparing the final result;
FutureTasks are cancelled by calling task.cancel(true);
If the task is still in the queue, this will prevent the task fromever running.  If the task is already actively running, this will alsointerrupt the thread that's running the task.
But the task must be responsive to interrupts.  That is, wheninterrupted, the task should promptly quit.  If the task could bewaiting on a lock when it is interrupted, I recommend usinglockInterruptibly().  If the task sleeps, then don't catch theInterruptedException, just let the task (ie, Callable.call) throwInterruptedException.  Finally, if the task is looping withoutblocking then it can check isCancelled() to see if it's beencancelled.
Your cancelRunnables method removes tasks from the service's queue butdoes not attempt to cancel running tasks.  Would it work better tocancel all the tasks in the workers list?  If the service is dedicatedto workers, though, then service.shutdownNow will do this for you.  Itcancels all active tasks.
> - should I concern about this problem or not?
Yes.
> - are there any other constructs that I could> use and achieve the same result?>
Use a completion service work for the consumer thing?
Joe.

On 11/5/05, Andrej Navodnik <[hidden email]> wrote:> Hi all,>> let's say that I did some homework ;-) Here is my> code:>> import java.lang.reflect.Array;> import java.util.ArrayList;> import java.util.Iterator;> import java.util.List;> import java.util.Random;> import java.util.concurrent.ArrayBlockingQueue;> import java.util.concurrent.BlockingQueue;> import java.util.concurrent.Callable;> import java.util.concurrent.ExecutionException;> import java.util.concurrent.ExecutorService;> import java.util.concurrent.Executors;> import java.util.concurrent.Future;> import java.util.concurrent.LinkedBlockingQueue;> import java.util.concurrent.ThreadPoolExecutor;> import java.util.concurrent.TimeUnit;> import java.util.concurrent.atomic.AtomicBoolean;> import java.util.concurrent.atomic.AtomicInteger;> import java.util.concurrent.locks.Condition;> import java.util.concurrent.locks.ReentrantLock;>> public class Test {>   public static void main(String... args) {>     int N = 100;>     FindOptimalElement s = new FindOptimalElement(>         prepareTestData(N));>     System.out.println(>         "Waiting for optimal element...");>     Result r = s.get();>     System.out.println("Result: " + r);>     System.exit(0);>   }>>   static List<Pair> prepareTestData(int n) {>     Random r = new Random();>     List<Pair> testData = new ArrayList<Pair>();>     for (int i = 0; i < n; i++) {>       double a = r.nextDouble();>       double b = r.nextDouble();>       testData.add(new Pair(a, b));>     }>     return testData;>   }> }>> interface OptimalElement<E> {>   boolean isOptimalElement(E e);> }>> class FindOptimalElement {>   private static final int NUMBER_WORKERS = 10;>   private final AtomicBoolean optimalElementFound;>   private final AtomicInteger testedData;>   private final int numberOfData;>   private final BlockingQueue<Future<Result>> workers;>   private final ReentrantLock dataLock;>   private volatile Result result;>   private volatile Condition resultPrepared;>   private boolean resultDefined;>   private final ThreadPoolExecutor service;>>   FindOptimalElement(List<Pair> data) {>     this.numberOfData = data.size();>     this.optimalElementFound =>         new AtomicBoolean(false);>     this.testedData = new AtomicInteger(0);>     this.dataLock = new ReentrantLock();>     this.resultPrepared = dataLock.newCondition();>     this.workers =>         new ArrayBlockingQueue<Future<Result>>(>         NUMBER_WORKERS);>>     int corePoolSize = NUMBER_WORKERS;>     int maximumPoolSize = NUMBER_WORKERS;>     long keepAliveTime = 1;>     BlockingQueue<Runnable> workQueue =>         new LinkedBlockingQueue<Runnable>();>     service = new ThreadPoolExecutor(corePoolSize,>         maximumPoolSize, keepAliveTime,>         TimeUnit.SECONDS, workQueue);>     service.prestartAllCoreThreads();>>     service.execute(new Producer(data));>     service.execute(new Consumer());>   }>>   // how should I cancel/stop other threads,>   // is this possible??>   private void cancelRunnables(>       Runnable excludeRunnable) {>     BlockingQueue<Runnable> queue=service.getQueue();>     Runnable[] runnables = (Runnable[]) Array>         .newInstance(Runnable.class, NUMBER_WORKERS);>     queue.toArray(runnables);>     for (Runnable r : runnables) {>       System.out.println("Task: " + r);>       if (r != null && !r.equals(excludeRunnable)) {>         System.out.println("Removed task: " + r);>         service.remove(r);>       }>     }>     service.purge();>     System.out.println("Other threads in the pool " +>         "should be canceled/removed, are they?");>   }>>   class Producer implements Runnable {>     private final List<Pair> testData;>     Producer(List<Pair> testData) {>       this.testData = testData;>     }>     public void run() {>       try {>         for (Pair p : testData) {>           Callable<Result> c = new Worker(p);>           Future<Result> task = service.submit(c);>           workers.put(task);>         }>       } catch (InterruptedException ie) {>         ie.printStackTrace();>       }>     }>   }>>   class Consumer implements Runnable {>     public void run() {>       for (;;) {>         try {>           final Future<Result> f = workers.take();>           Runnable task = new PrepareResult(f);>           service.execute(task);>         } catch (InterruptedException ie) {>           ie.printStackTrace();>           return ;>         }>       }>     }>>     class PrepareResult implements Runnable {>       private final Future<Result> f;>       PrepareResult(Future<Result> f) {>         this.f = f;>       }>       public void run() {>         try {>           Result r = f.get();>           dataLock.lock();>           try {>             if (r != null) {>               result = r;>               resultDefined = true;>               resultPrepared.signal();>             }>             if (!resultDefined) {>               int n = testedData.incrementAndGet();>               if (n==numberOfData) {>                 System.out.println(>                     "All data tested, " +>                     "no optimal element found...");>                 resultDefined = true;>                 resultPrepared.signal();>               }>             }>           } finally {>             dataLock.unlock();>           }>>         } catch (ExecutionException ee) {>           ee.printStackTrace();>         } catch (InterruptedException ie) {>           ie.printStackTrace();>         }>       }>     }>   }>>   public Result get() {>     dataLock.lock();>     try {>       while (resultDefined == false)>         resultPrepared.await();>     } catch (InterruptedException ex) {>       ex.printStackTrace();>     } finally {>       dataLock.unlock();>     }>     return result;>   }>>   private class Worker implements Callable<Result>,>       OptimalElement<Pair> {>     private final Pair p;>>     Worker(Pair p) {>       this.p = p;>     }>>     public Result call() {>       if (optimalElementFound.get()) {>         return null;>       }>       System.out.println(>         "Seaching for optimal element...");>       try {>         Thread.sleep(1000);>       } catch (InterruptedException ex) {>         ex.printStackTrace();>       }>>       // check if the element satisfies intermediate>       // condition>       if (isOptimalElement(p)) {>         System.out.println(>           "Optimal element is found...");>         for(;;) {>           if (!optimalElementFound.get()) {>             optimalElementFound.compareAndSet(>               false, true);>             if (optimalElementFound.get()) {>               break;>             }>           } else>             return null;>         }>>         // if proper element is found then>         // cancel other tasks,>         // actually this does not work>         System.out.println("Cancel other threads...");>         Thread currentThread = Thread.currentThread();>         cancelRunnables(currentThread);>>         try {>           // simulate hard work, to find solution>           Thread.sleep(10000);>         } catch (InterruptedException ex) {>           ex.printStackTrace();>         }>>         // final phase, prepare result>         System.out.println(Thread.currentThread() +>             ": Returning optimal element...");>         return new Result(p, p.a+p.b);>       } else {>         try {>           // simulate useless job that should>           // not be executed>           Thread.sleep(2000);>         } catch (InterruptedException ex) {>           ex.printStackTrace();>         }>         if (optimalElementFound.get()) {>           System.out.println(Thread.currentThread() +>               ": Task is still doing useless " +>               "things...");>         }>         return null;>       }>     }>>     public boolean isOptimalElement(Pair p) {>       return  p.a > 0.8 && p.b > 0.8;>       // simulate hard to find optimal element, result>>       // should be null;>       //return  p.a > 0.99 && p.b > 0.99;>     }>   }> }>>> class Result {>   public final double sum;>   public final Pair p;>   Result(Pair p, double sum) {>     this.p = p;>     this.sum = sum;>   }>   public String toString() {>     return "Input: " + p + " ==> result: " + sum;>   }> }>> class Pair {>   public final double a;>   public final double b;>   Pair(double a, double b) {>     this.a = a;>     this.b = b;>   }>   public String toString() {>     return "[a: " + a + ", b: " + b + "]";>   }> }>>> If the input data set contains optimal data then the> output of the program is the following:>> Waiting for optimal element...> Seaching for optimal element...> Seaching for optimal element...> Seaching for optimal element...> Seaching for optimal element...> Seaching for optimal element...> Seaching for optimal element...> Seaching for optimal element...> Seaching for optimal element...> Seaching for optimal element...> Seaching for optimal element...> Optimal element is found...> Cancel other threads...> Task: null> Task: null> Task: null> Task: null> Task: null> Task: null> Task: null> Task: null> Task: null> Task: null> Other threads in the pool should be canceled/removed,> are they?> Thread[pool-1-thread-6,5,main]: Task is still doing> useless things...> Thread[pool-1-thread-5,5,main]: Task is still doing> useless things...> Thread[pool-1-thread-1,5,main]: Task is still doing> useless things...> Thread[pool-1-thread-3,5,main]: Task is still doing> useless things...> Thread[pool-1-thread-4,5,main]: Task is still doing> useless things...> Thread[pool-1-thread-10,5,main]: Task is still doing> useless things...> Thread[pool-1-thread-9,5,main]: Task is still doing>              useless things...> Thread[pool-1-thread-7,5,main]: Task is still doing> useless things...> Thread[pool-1-thread-8,5,main]: Returning optimal> element...> Result: Input: [a: 0.9554837437185378, b:> 0.8581452336165145] ==> result: 1.8136289773350525>> My questions are:> - how should I design my application so that the tasks>> in the pool would be canceled/stopped after the> optimal element is found except for the task> that has found that �optimal element� - I wouldn't> like to see the message �Task is still doing useless> things� because it's actually using resources that> should be used fior preparing the final result;> - should I concern about this problem or not?> - are there any other constructs that I could> use and achieve the same result?>> Best regards,> Andrei>> P.S.: Please don't laugh at my code, I'm still> beginner as far as concurrent programming is> concerned...>
_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://altair.cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: Re: Help, how to design a pool of threads

Andrej Navodnik
In reply to this post by Doug Lea
Hi Doug,

thank you very much for your quick response.

I have already studied the example program you have
suggested but if I understand it correctly the threads
in the pool are canceled/stopped AFTER the final
result is found/prepared. Actually, I'd like to have a
pool where threads could be canceled/stopped BEFORE
one the threads prepares final result.

Suppose that the algorithm is testing different
variants for the next  step and that this phase must
also produce some data for the next  step. Suppose
also, that during this step, let's say approximately
about at 1/3 to 1/5 of the total work, it must decide
whether the input data would produce useful results or
not. If the input data could not produce useful result
then the algorithm should try with another parameters.
But in case that the algorithm could predict that the
result could be useful then I'd like to stop other
threads because they are using valuable resources
(CPU) and I'd like to give priority only to the thread
that is one the right path. So, I'd like to test some
kind of an intermediate condition and AFTER this
condition there would be only one working thread in
the pool which would produce result. I'd like to
implement a strategy where only an owner of the right
token is given the right to run in the final.

Best regards,
Andrei


--- Doug Lea <[hidden email]> wrote:

> Andrej Navodnik wrote:
> >
> > My questions are:
> > - how should I design my application so that the
> tasks
> >
> > in the pool would be canceled/stopped after the
> > optimal element is found except for the task
> > that has found that “optimal element” - I wouldn't
>
> > like to see the message “Task is still doing
> useless
> > things” because it's actually using resources that
> > should be used fior preparing the final result;
> > - should I concern about this problem or not?
> > - are there any other constructs that I could
> > use and achieve the same result?
> >
>
> You might find the second code example in the
> ExecutorCompletionService
> javadoc helpful. See
>
>
http://java.sun.com/j2se/1.5.0/docs/api/java/util/concurrent/ExecutorCompletionService.html
>
> -Doug
>
>



       
               
__________________________________
Yahoo! Mail - PC Magazine Editors' Choice 2005
http://mail.yahoo.com
_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://altair.cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: Re: Help, how to design a pool of threads

Andrej Navodnik
In reply to this post by Joe Bowbeer
Hello Joe,

thank you very much for your reply. I will carefuly
consider all your suggestions.

Back to my homework...
Andrei

--- Joe Bowbeer <[hidden email]> wrote:

> First, I'm trying to understand the threading
> scheme.  It looks like
> there is a producer that submits worker tasks to the
> pool and a
> consumer that processes the results.  I suggest you
> move the producer
> and consumer into the same thread.  Perhaps this
> could be the main
> thread.  That is, after you create the service, do
> the producer thing
> in the main thread next, submitting the worker tasks
> to the service;
> then do the consumer thing, processing the results.
> If the main
> thread won't do, move the producer and consumer to a
> single background
> thread.
>
> Moving the producer and consumer into a single
> thread leaves the
> thread pool dedicated to the worker tasks.  Easier
> to manage that
> way...
>
> > My questions are:
>
> > - how should I design my application so that the
> tasks
> > in the pool would be canceled/stopped after the
> > optimal element is found except for the task
> > that has found that "optimal element" - I
> wouldn't
> > like to see the message �Task is still doing
> useless
> > things� because it's actually using resources
> that
> > should be used fior preparing the final result;
>
> FutureTasks are cancelled by calling
> task.cancel(true);
>
> If the task is still in the queue, this will prevent
> the task from
> ever running.  If the task is already actively
> running, this will also
> interrupt the thread that's running the task.
>
> But the task must be responsive to interrupts.  That
> is, when
> interrupted, the task should promptly quit.  If the
> task could be
> waiting on a lock when it is interrupted, I
> recommend using
> lockInterruptibly().  If the task sleeps, then don't
> catch the
> InterruptedException, just let the task (ie,
> Callable.call) throw
> InterruptedException.  Finally, if the task is
> looping without
> blocking then it can check isCancelled() to see if
> it's been
> cancelled.
>
> Your cancelRunnables method removes tasks from the
> service's queue but
> does not attempt to cancel running tasks.  Would it
> work better to
> cancel all the tasks in the workers list?  If the
> service is dedicated
> to workers, though, then service.shutdownNow will do
> this for you.  It
> cancels all active tasks.
>
> > - should I concern about this problem or not?
>
> Yes.
>
> > - are there any other constructs that I could
> > use and achieve the same result?
> >
>
> Use a completion service work for the consumer
> thing?
>
> Joe.



       
               
__________________________________
Yahoo! Mail - PC Magazine Editors' Choice 2005
http://mail.yahoo.com
_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://altair.cs.oswego.edu/mailman/listinfo/concurrency-interest