overview new features -> tim.

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

overview new features -> tim.

Peter Veentjer - Anchor Men
Sorry for this late reply tim.

>>  ... But there are some
>> parts where the concurrency library could be improved. One of those things
>> would be better control on timeout behaviour of the executorservices. That
>> is why I have created the BlockingExecutor that gives the control.
>>
>> public interface BlockingExecutor {
>>  void put(Runnable command) throws InterruptedException;
>>  boolean offer(Runnable command, long timeout, TimeUnit unit) throws InterruptedException;
>> }
>Do you have a small example that illustrates the use of BlockingExecutor to do
>something that is difficult or impossible to achieve conveniently with the
>standard task execution framework?

I`m currently working on a channels project, and I need the control on timeout behaviour of

the tasks in the BlockingExecutor. I don`t have the control to do a put/offer and send

timeout parameters with the current Executors (there is only a single execute method

with no arguments) and that is what I need.


>> And why where the Takeable/Puttable/Channel interfaces/implementations
>> removed? I had to create my own libary based on those interfaces and I
>> think the original code should have made it into java 5. JMS is too
>> heavy... and the removed code was perfect.
>
>The method names haven't changed, only the class name.
>
> Channel -> BlockingQueue
>  Takeable -> the take and poll methods of BlockingQueue
>  Puttable -> the put and offer methods of BlockingQueue
>
>There are no separate interfaces to describe the puttable side of a
>BlockingQueue from its takeable side, but you can provide simple wrappers to
>achieve the same effect.

I don`t think that is a very good solution because the exchange of messages is combined

with storage of messages.

You can see my channels project here:

http://members.home.nl/peter-veentjer01/index.htm

I want to have total control and don`t want to drag along a lot of garbage.

And sorry for this messy reply. My Email client (outlook) doesn`t make a good

reply and is terrible for layout.

 


_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://altair.cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: overview new features -> tim.

Doug Lea
Peter Veentjer - Anchor Men wrote:
>
> I don`t think that is a very good solution because the exchange of messages is combined
>
> with storage of messages.
>

That was my original rational for dl.uti.concurrent versions. But the
rest of the world (including, now, me) disagrees. Support for standard
Collection methods was probably the most frequently requested feature.
Integration into Collections makes these classes more widely usable. And
(sorry to say) the only people inconvenienced by it can cope
with this a lot more easily than if it were the other way around.

-Doug
_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://altair.cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

RE: overview new features -> tim.

Peter Veentjer - Anchor Men
In reply to this post by Peter Veentjer - Anchor Men
I like to have a small interface so making implementations is easy
and are easy to reason about. If I need buffering (queuing) I could
chain (decorate) channels, the same goes for other functionality like regulating
(closing/opening channels) or monitoring. There is no reason this
functionality should be declared in the root interface.
 
example of chaining:
OutputChannel c = someChannel;//every channel implement in/outputchannel
c = new MonitoringOutputChannel(c);
c = new LoggingOutputChannel(c);
Channel newChannel = new ComposedChannel(c,someChannel);
 
In this example a new channel (with logging and monitoring) is created
based on someChannel. This works great and a wide root interface
isn`t required.

________________________________

From: Doug Lea [mailto:[hidden email]]
Sent: Tue 9/6/2005 1:48
To: Peter Veentjer - Anchor Men
Cc: [hidden email]
Subject: Re: [concurrency-interest] overview new features -> tim.



Peter Veentjer - Anchor Men wrote:
>
> I don`t think that is a very good solution because the exchange of messages is combined
>
> with storage of messages.
>

That was my original rational for dl.uti.concurrent versions. But the
rest of the world (including, now, me) disagrees. Support for standard
Collection methods was probably the most frequently requested feature.
Integration into Collections makes these classes more widely usable. And
(sorry to say) the only people inconvenienced by it can cope
with this a lot more easily than if it were the other way around.

-Doug




_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://altair.cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: overview new features -> tim.

tpeierls
In reply to this post by Doug Lea
> Peter Veentjer - Anchor Men wrote:
>> I don`t think that is a very good solution because the exchange of
>> messages is combined with storage of messages.
>
Doug Lea wrote:
> That was my original rationale for dl.uti.concurrent versions. But the
> rest of the world (including, now, me) disagrees. Support for standard
> Collection methods was probably the most frequently requested feature.
> Integration into Collections makes these classes more widely usable. And
> (sorry to say) the only people inconvenienced by it can cope
> with this a lot more easily than if it were the other way around.

And if you're having trouble coping, just do this:

   public interface Puttable<E> {
       boolean offer(E o);
       boolean offer(E o, long timeout, TimeUnit unit) throws InterruptedException;
       void put(E o) throws InterruptedException;
   }

   public interface Takeable<E> {
       E poll();
       E pool(long timeout, TimeUnit unit) throws InterruptedException;
       E take() throws InterruptedException;
   }

   public class PTAdapters {
       public static <E> Puttable<E> asPuttable(BlockingQueue<E> q) {
           return new Puttable<E>() {
               public boolean offer(E o) { return q.offer(o); }
               public boolean offer(E o, long timeout, TimeUnit unit)
                   throws InterruptedException { return q.offer(o, timeout, unit); }
               public void put(E o) throws InterruptedException { q.put(o); }
           };
       }
       public static <E> Takeable<E> asTakeable(BlockingQueue<E> q) {
           return new Takeable<E>() {
               public E poll() { return q.poll(); }
               public E poll(long timeout, TimeUnit unit)
                   throws InterruptedException { return q.poll(timeout, unit); }
               public E take() throws InterruptedException { return q.take(); }
           };
       }
       private PTAdapters() {} // uninstantiable
   }

   // sample use
   class Producer {
       Producer(Puttable<Long> p) {...}
       ...
   }
   class Consumer {
       Consumer(Takeable<Long> t) {...}
       ...
   }
   BlockingQueue<Long> queue = new ArrayBlockingQueue<Long>(10);
   Executor exec = newFixedThreadPool(2);
   exec.execute(new Producer(asPuttable(queue)));
   exec.execute(new Consumer(asTakeable(queue)));
   exec.shutdown();

There, now you have them back. :-)

--tim

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://altair.cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

RE: overview new features -> tim.

Peter Veentjer - Anchor Men
In reply to this post by Peter Veentjer - Anchor Men
I'm going to work, have a nice cup of coffee and then I'll
have a good look at it (looks nice).

________________________________

From: Tim Peierls [mailto:[hidden email]]
Sent: Tue 9/6/2005 8:11
To: Peter Veentjer - Anchor Men
Cc: Doug Lea; [hidden email]
Subject: Re: [concurrency-interest] overview new features -> tim.



> Peter Veentjer - Anchor Men wrote:
>> I don`t think that is a very good solution because the exchange of
>> messages is combined with storage of messages.
>
Doug Lea wrote:
> That was my original rationale for dl.uti.concurrent versions. But the
> rest of the world (including, now, me) disagrees. Support for standard
> Collection methods was probably the most frequently requested feature.
> Integration into Collections makes these classes more widely usable. And
> (sorry to say) the only people inconvenienced by it can cope
> with this a lot more easily than if it were the other way around.

And if you're having trouble coping, just do this:

   public interface Puttable<E> {
       boolean offer(E o);
       boolean offer(E o, long timeout, TimeUnit unit) throws InterruptedException;
       void put(E o) throws InterruptedException;
   }

   public interface Takeable<E> {
       E poll();
       E pool(long timeout, TimeUnit unit) throws InterruptedException;
       E take() throws InterruptedException;
   }

   public class PTAdapters {
       public static <E> Puttable<E> asPuttable(BlockingQueue<E> q) {
           return new Puttable<E>() {
               public boolean offer(E o) { return q.offer(o); }
               public boolean offer(E o, long timeout, TimeUnit unit)
                   throws InterruptedException { return q.offer(o, timeout, unit); }
               public void put(E o) throws InterruptedException { q.put(o); }
           };
       }
       public static <E> Takeable<E> asTakeable(BlockingQueue<E> q) {
           return new Takeable<E>() {
               public E poll() { return q.poll(); }
               public E poll(long timeout, TimeUnit unit)
                   throws InterruptedException { return q.poll(timeout, unit); }
               public E take() throws InterruptedException { return q.take(); }
           };
       }
       private PTAdapters() {} // uninstantiable
   }

   // sample use
   class Producer {
       Producer(Puttable<Long> p) {...}
       ...
   }
   class Consumer {
       Consumer(Takeable<Long> t) {...}
       ...
   }
   BlockingQueue<Long> queue = new ArrayBlockingQueue<Long>(10);
   Executor exec = newFixedThreadPool(2);
   exec.execute(new Producer(asPuttable(queue)));
   exec.execute(new Consumer(asTakeable(queue)));
   exec.shutdown();

There, now you have them back. :-)

--tim





_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://altair.cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

RE: overview new features -> tim.

Peter Veentjer - Anchor Men
In reply to this post by Peter Veentjer - Anchor Men
Hi Tim,

Your solution looks a lot like mine. I have create the
Input/OutputChannel
Instead of the Puttable/Takable. Although I find puttable and takable
Easier to understand (you can put things in a output channel, and
retrieve
Things from a input channen, feels Strange at first) but they are well
known
names. So I didn`t want to create new names.

The PTAdapters are nice. I have created something similar:
The BufferedChannel.

Example:
Channel c = new BufferedChannel(new BlockingQueue());
InputChannel in = c;
OutputChannel out = c;

And the threading part... If have been strugling with it in the
Beginning, but I have found the following solutions:

1---------------------------------------
I can make channels active:
OutputChannel c = new DoSomeHeavyCalculatingOutputChannel(..);
ActiveOutputChannel a = new ActiveOutputChannel(c,someExecutorService);

If a message is put/offered on a, a new runnable is put on the
someExecutorService and the thread that put it there isn`t
Used for putting the message on c, but a thread from the
'someExecutorService' is used.

2---------------------------------------
But also have created something new recently: the Repeater.
Example:

Channel fetchChannel = new StdBufferedChannel();
Channel protocolChan = new StdBufferedChannel();
Channel parseDataChan = new StdBufferedChannel();

RepeaterService fetchRepeater = new StdRepeaterService(40);//40 threads
fetchRepeater.start();
BetterFetcher fetcher = new
BetterFetcher(fetchChannel,protocolChan,fetchRepeater);
fetcher.start();

RepeaterService analyseRepeater = new StdRepeaterService();
analyseRepeater.start();
Analyzer analyzer = new
Analyzer(protocolChan,parseDataChan,analyseRepeater);
analyzer.start();

RepeaterService downManRepeater = new StdRepeaterService();
downManRepeater.start();
DownloadManager downMan = new
DownloadManager(parseDataChan,fetchChannel,downManRepeater);
downMan.start();

In this example there are 3 channels:
fetchChannel (you can put requests in here for pages to download)
protocolChan (the fetcher puts the downloaded pages in here)
parseDataChan (the analyzer put the analyzed pages (with outlinks) in
here))

The Repeaters can be compared to a threadpool, but they keep
Executing the same task over and over again. In this example
The repeaters keep sucking on channels to recieve messages
And process them.

This is the code of the BetterFetcher:

public class BetterFetcher {

        private InputChannel _channel;
        private Repeater _repeater;
        private Channel _protocolOutputChannel;

        public BetterFetcher(InputChannel channel, Channel
protocolOutputChannel, Repeater repeater) {
                if (channel ==
null||protocolOutputChannel==null||repeater == null) throw new
NullPointerException();
                _channel = channel;
                _repeater = repeater;
                _protocolOutputChannel = protocolOutputChannel;
        }

        public void start() throws InterruptedException {
                _repeater.repeat(new RunnableImpl());
        }

        private class RunnableImpl implements Runnable{

                private void runInterrupteble()throws
InterruptedException{
                        FetchListEntry fle =
(FetchListEntry)_channel.take();
                        String url = fle.getPage().getURL().toString();
                        System.out.println("fetching url: "+url);
                        Protocol protocol = null;
                        try {
                                try{
                                        protocol =
ProtocolFactory.getProtocol(url);
                                }catch(RuntimeException ex){
                                        ex.printStackTrace();
                                        throw ex;
                                }

                                ProtocolOutput output =
protocol.getProtocolOutput(fle);
                                System.out.println("page succesfully
retrieved");
                                _protocolOutputChannel.put(new
X(fle,output));
                        } catch (ProtocolNotFound protocolNotFound) {
                                System.out.println("no protocol found");
                                protocolNotFound.printStackTrace();
                        }
                }

                public void run() {
                        try {
                                runInterrupteble();
                        } catch (InterruptedException e) {
                                e.printStackTrace();  //To change body
of catch statement use File | Settings | File Templates.
                        }
                }
        }
}


At the moment I`m experimenting with the repeaters in a prototype,
But the ActiveChannels have been used in a couple of projects and
I`m totally in love with this channeled approach.




_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://altair.cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

RE: overview new features -> tim.

Peter Veentjer - Anchor Men
In reply to this post by Peter Veentjer - Anchor Men

You have been asking about a usecase for the BlockingExecutor.
The someExecutorService from my threading example is A BlockingExecutor.

OutputChannel c = new DoSomeHeavyCalculatingOutputChannel(..);
BlockingExecutor blockingExecutor = new BlockingExecutor(10);//10
threads.
ActiveOutputChannel activeChannel= new
ActiveOutputChannel(c,blockingExecutor);

The someExecutor is a BlockingExecutor. That is where I need The control

for timeouts for. If a msg is offered on activeChannel, The message is
offered with the same arguments to the blockingexecutor.

Example of put:
activeChannel.Put(msg) gives a blockingExecutor.put(new
PutterRunnable(msg));

Example of offer:
activeChannel.offer(msg,timeout,unit) gives a blockingExecutor.offer(new
PutterRunnable(msg),timeout,unit);

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://altair.cs.oswego.edu/mailman/listinfo/concurrency-interest