CompletionService and I/O - can I use it?

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

CompletionService and I/O - can I use it?

David Harrigan
Hi,

Another posting ;) I appreciate all the help I'm getting in this to further my (and maybe other's) understanding.

Rethinking my problem, I have decided to take the advice and use a CompletionService to hand off the searchers, and to wait for their results, so I have something like this:

...
private final CompletionService<Searcher> ecs = ExecutorCompletionService<Searcher>(Executors.newCachedThreadPool());

public void search(final List<Searcher> searchers) {
    for(Searcher searcher : searchers) {
        ecs.submit(searcher);
    }

    for(int i = 0, j = searchers.size() ; i < j ; i++) {
        try {
            Future<Searcher> result = ecs.poll(1000, TimeUnit.MILLISECONDS);
            if(result != null) {
              Searcher s = result.get();
              // do something with s now...
            }
        } catch(InterruptedException e) {
           Thread.currentThread().interrupt();
        } catch(ExecutionException e) {
           e.printStackTrace();
        }
}

This I think gives me the ability to poll each searcher for 1 second. But I have two
questions:

1. A searcher makes a connection to the outside world, this could take more than 1
second, or it could take less than 1 second. If it takes more than 1 second, fine, then
it's dead and I don't care about it, but say it makes a connection within 1 second, but
then takes another 2 seconds to retrieve the results, the method above will timeout
the searcher and I won't be able to do anything with it. What I would ideally like is
the ability to poll for the searcher, then if it's doing something, to wait for it to
complete. Can I somehow shove it back on the queue? Or to "peek" inside the
queue to see if my searcher is doing anything interesting?

2. Perhaps using a CompletionService isn't appropriate in this case? You see, I have
3 states a search can move thru:

Connected but not reading
Reading
Finished

each of those states could timeout. I can control the first one, but I need to be
able to pause acting on the searcher if it's reading for another bit of time, but
eventually to timeout if reading is taking too long...

Has anyone else come across this situation (it must be common! - it's a basic
I/O read/wait time of situation) and using threads to control it? I would appreciate
any help/advice on this.

-=david=-
Reply | Threaded
Open this post in threaded view
|

Re: CompletionService and I/O - can I use it?

tpeierls
A completion service is appropriate if you want to get results from tasks as they complete (either by having a result or by throwing an exception). The completion service returns completed tasks; the timed poll method waits the given duration for any task to become available by completing, not one particular task.

You are submitting Searcher instances to a CompletionService<Searcher>, which means that Searcher must implement Callable<Searcher>. This could be part of a design where the call method of the Searcher establishes the connection and reads some data, throwing an exception if either of these activities take too long. Otherwise, if connection and reading complete successfully, the call method returns the searcher itself. Then a sequence of ready-to-use Searchers could be obtained by calling the following method in a loop:

Searcher getNextReadySearcher(long deadlineNanos)
            throws TimeoutException, InterruptedException {
    while (true) {
        try {
            long remainingNanos = deadlineNanos - System.nanoTime();
            if (remainingNanos > 0) {
                Future<Searcher> f = ecs.poll(remainingNanos, TimeUnit.NANOSECONDS);
                if (f != null) return f.get();
            }
            throw new TimeoutException("deadline passed");
        } catch (ExecutionException e) {
            // could resubmit if exception refers to Searcher
            if (e.getCause() instanceof SearcherException) {
                SearcherException se = (SearcherException) e.getCause();
                ecs.submit(se.getSearcher());
            }
            // retry loop
        }
    }
}

For an example of doing work with a time budget, see Section 6.3.7 of Java Concurrency in Practice. For an example of nonstandard cancellation within the task execution framework, see Section 7.1.7.

--tim


On 10/27/06, David Harrigan <[hidden email]> wrote:

Hi,

Another posting ;) I appreciate all the help I'm getting in this to further
my (and maybe other's) understanding.

Rethinking my problem, I have decided to take the advice and use a
CompletionService to hand off the searchers, and to wait for their results,
so I have something like this:

...
private final CompletionService<Searcher> ecs =
ExecutorCompletionService<Searcher>(Executors.newCachedThreadPool());

public void search(final List<Searcher> searchers) {
    for(Searcher searcher : searchers) {
        ecs.submit(searcher);
    }

    for(int i = 0, j = searchers.size() ; i < j ; i++) {
        try {
            Future<Searcher> result = ecs.poll (1000, TimeUnit.MILLISECONDS);
            if(result != null) {
              Searcher s = result.get();
              // do something with s now...
            }
        } catch(InterruptedException e) {
           Thread.currentThread().interrupt();
        } catch(ExecutionException e) {
           e.printStackTrace();
        }
}

This I think gives me the ability to poll each searcher for 1 second. But I
have two
questions:

1. A searcher makes a connection to the outside world, this could take more
than 1
second, or it could take less than 1 second. If it takes more than 1 second,
fine, then
it's dead and I don't care about it, but say it makes a connection within 1
second, but
then takes another 2 seconds to retrieve the results, the method above will
timeout
the searcher and I won't be able to do anything with it. What I would
ideally like is
the ability to poll for the searcher, then if it's doing something, to wait
for it to
complete. Can I somehow shove it back on the queue? Or to "peek" inside the
queue to see if my searcher is doing anything interesting?

2. Perhaps using a CompletionService isn't appropriate in this case? You
see, I have
3 states a search can move thru:

Connected but not reading
Reading
Finished

each of those states could timeout. I can control the first one, but I need
to be
able to pause acting on the searcher if it's reading for another bit of
time, but
eventually to timeout if reading is taking too long...

Has anyone else come across this situation (it must be common! - it's a
basic
I/O read/wait time of situation) and using threads to control it? I would
appreciate
any help/advice on this.


_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://altair.cs.oswego.edu/mailman/listinfo/concurrency-interest