implementing a DB write-behind algorithm

classic Classic list List threaded Threaded
15 messages Options
Reply | Threaded
Open this post in threaded view
|

implementing a DB write-behind algorithm

Alexandru Popescu ☀
Hi!

I firstly have to confess that when getting to concurrency related
problems, I am getting confused quite quickly :-).

Now, the current problem I am trying to solve is: I am trying to
figure out how to implement a DB write-behind strategy. Multiple
processes will post records to be written to the DB, but the actual
writes should happen on a separate process. So, far I was thinking
about 2 possible approaches:
a) continous write-behind: multiple processes write to a queue which
is continously polled by a separate process. When an element is found
on the queue, than the write process removes it from queue and
attempts to write it to the DB.

To have this done, I was looking in the direction of ConcurrentLinkedQueue.

b) batched write-behind: multiple processes post to a size-bounded
queue. When the max size is reached, the original queue is passed to
the parallel write process and replaced with a new queue.

To have this done, I was looking in the direction of
LinkedBlockingQueue with an additional atomic operation of swapping
the old queue with the new empty one.

My question is: am I looking in the right direction or I am completely
wrong. Any ideas and help are highly appreciated.

./alex
--
.w( the_mindstorm )p.

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://altair.cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: implementing a DB write-behind algorithm

tpeierls
On 4/30/06, Alexandru Popescu <[hidden email]> wrote:
I firstly have to confess that when getting to concurrency related
problems, I am getting confused quite quickly :-).

You're not alone! :-)
 

Now, the current problem I am trying to solve is: I am trying to
figure out how to implement a DB write-behind strategy. Multiple
processes will post records to be written to the DB, but the actual
writes should happen on a separate process. So, far I was thinking
about 2 possible approaches:
a) continous write-behind: multiple processes write to a queue which
is continously polled by a separate process. When an element is found
on the queue, than the write process removes it from queue and
attempts to write it to the DB.

To have this done, I was looking in the direction of ConcurrentLinkedQueue.

b) batched write-behind: multiple processes post to a size-bounded
queue. When the max size is reached, the original queue is passed to
the parallel write process and replaced with a new queue.

To have this done, I was looking in the direction of
LinkedBlockingQueue with an additional atomic operation of swapping
the old queue with the new empty one.

My question is: am I looking in the right direction or I am completely
wrong. Any ideas and help are highly appreciated.

The use of BlockingQueue.put makes it possible to implement strategies that make the caller block, while still permitting strategies that don't block. So I would avoid ConcurrentLinkedQueue here, because it does not implement the BlockingQueue interface.

You can use an unbounded LinkedBlockingQueue for continuous write-behind, and ArrayBlockingQueue (always bounded) for batched write-behind. Instead of swapping in a new queue, the consumer thread could just poll until the batch size was reached (using a timeout to avoid the risk of batches never completing), and then send the batch. The batch size need not be the same as the queue capacity.

Here's an uncompiled, untested fragment that illustrates the idea:

public interface WriteBehind<T> {
    void put(T record) throws InterruptedException;
}

public interface RecordWriter<T> {
    void write(List<T> records) throws InterruptedException;
}

class AbstractWriteBehind<T> implements WriteBehind<T> {
    private final BlockingQueue<T> queue;
    private final RecordWriter<T> writer;
    @GuardedBy("this") private Future<Void> consumer = null;

    protected AbstractWriteBehind(BlockingQueue<T> queue, RecordWriter<T> writer) {
        this.queue = queue;
        this.writer = writer;
    }

    class Consumer implements Callable<Void> {
        public Void call() throws InterruptedException {
            consume(queue, writer);
            return null;
        }
    }

    public synchronized void start() {
        if (consumer == null) {
            ExecutorService exec = Executors.newSingleThreadExecutor();
            try {
                consumer = exec.submit(new Consumer());
            } finally {
                exec.shutdown();
            }
        }
    }

    public synchronized boolean isRunning() {
        return consumer != null;
    }

    public synchronized void stop() {
        if (consumer != null) {
            consumer.cancel(true);
            consumer = null;
        }
    }

    public final void put(T record) throws InterruptedException {
        queue.put(record);
    }

    protected abstract void consume(BlockingQueue<T> queue, RecordWriter<T> writer)
        throws InterruptedException;
}

class ContinuousWriteBehind<T> extends AbstractWriteBehind<T> {
    ContinousWriteBehind(RecordWriter<T> writer) {
        super(new LinkedBlockingQueue<T>(), writer);
    }

    protected void consume(BlockingQueue<T> q, RecordWriter<T> writer)
            throws InterruptedException {
        for (T rec; (rec = q.take()) != null; )
            writer.write (Collections.singletonList(rec));
    }
}

class BatchedWriteBehind<T> extends AbstractWriteBehind<T> {
    private final int maxBuf;
    private final List<T> buf;
    private final long time;
    private final TimeUnit unit;

    BatchedWriteBehind(RecordWriter<T> writer, int capacity, int maxBuf,
                       long time, TimeUnit unit) {
        super(new ArrayBlockingQueue<T>(capacity), writer);
        this.maxBuf = maxBuf;
        this.buf = new ArrayList<T>(maxBuf);
        this.time = time;
        this.unit = unit;
    }

    protected void consume(BlockingQueue<T> q, RecordWriter<T> writer)
            throws InterruptedException {
        for (T rec; (rec = q.take()) != null; ) {
            buf.add(rec);
            while (buf.size() < maxBuf && (rec = q.poll(time, unit)) != null)
                buf.add(rec);
            writer.write(buf);
            buf.clear();
        }
    }
}

--tim

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://altair.cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: implementing a DB write-behind algorithm

Alexandru Popescu ☀
Hi Tim!

And thanks for the first comments :-).

My intention is mainly the minimize any/most of the locks of the
writters, so responsiveness is maximum on this side. (the reason for
looking at ConcurrentLinkedQueue).

Thanks also for the code sample. It makes lot of sense to me. However,
I have a few comments (in case I got it write):
- considering that LinkedBlockingQueue is using different locks for
put/take it looks like there is not penalty introduced by the
consumer. Am I getting this right?
- it looks like the batched writter is doing a continuous job on
polling the queue. I was thinking that maybe I can find a way that
this batched writter to do its job only when the limit was reached.

Considering that while adding elements to the queue, I can determine
the remaining capacity of the queue, than I might trigger manually the
writter process and pass it the content of the current queue (probably
use for this the drainTo()).

./alex
--
.w( the_mindstorm )p.



On 4/30/06, Tim Peierls <[hidden email]> wrote:

> On 4/30/06, Alexandru Popescu
> <[hidden email] > wrote:
>
> > I firstly have to confess that when getting to concurrency related
> > problems, I am getting confused quite quickly :-).
>
>
> You're not alone! :-)
>
>
> > Now, the current problem I am trying to solve is: I am trying to
> > figure out how to implement a DB write-behind strategy. Multiple
> > processes will post records to be written to the DB, but the actual
> > writes should happen on a separate process. So, far I was thinking
> > about 2 possible approaches:
> > a) continous write-behind: multiple processes write to a queue which
> > is continously polled by a separate process. When an element is found
> > on the queue, than the write process removes it from queue and
> > attempts to write it to the DB.
> >
> > To have this done, I was looking in the direction of
> ConcurrentLinkedQueue.
> >
> > b) batched write-behind: multiple processes post to a size-bounded
> > queue. When the max size is reached, the original queue is passed to
> > the parallel write process and replaced with a new queue.
> >
> > To have this done, I was looking in the direction of
> > LinkedBlockingQueue with an additional atomic operation of swapping
> > the old queue with the new empty one.
> >
> > My question is: am I looking in the right direction or I am completely
> > wrong. Any ideas and help are highly appreciated.
> >
>
> The use of BlockingQueue.put makes it possible to implement strategies that
> make the caller block, while still permitting strategies that don't block.
> So I would avoid ConcurrentLinkedQueue here, because it does not implement
> the BlockingQueue interface.
>
> You can use an unbounded LinkedBlockingQueue for continuous write-behind,
> and ArrayBlockingQueue (always bounded) for batched write-behind. Instead of
> swapping in a new queue, the consumer thread could just poll until the batch
> size was reached (using a timeout to avoid the risk of batches never
> completing), and then send the batch. The batch size need not be the same as
> the queue capacity.
>
> Here's an uncompiled, untested fragment that illustrates the idea:
>
>  public interface WriteBehind<T> {
>     void put(T record) throws InterruptedException;
> }
>
> public interface RecordWriter<T> {
>     void write(List<T> records) throws InterruptedException;
>  }
>
> class AbstractWriteBehind<T> implements WriteBehind<T> {
>     private final BlockingQueue<T> queue;
>     private final RecordWriter<T> writer;
>     @GuardedBy("this") private Future<Void> consumer = null;
>
>     protected AbstractWriteBehind(BlockingQueue<T> queue,
> RecordWriter<T> writer) {
>         this.queue = queue;
>         this.writer = writer;
>     }
>
>     class Consumer implements Callable<Void> {
>         public Void call() throws InterruptedException {
>             consume(queue, writer);
>             return null;
>         }
>     }
>
>     public synchronized void start() {
>         if (consumer == null) {
>             ExecutorService exec =
> Executors.newSingleThreadExecutor();
>             try {
>                 consumer = exec.submit(new Consumer());
>             } finally {
>                 exec.shutdown();
>              }
>         }
>     }
>
>     public synchronized boolean isRunning() {
>         return consumer != null;
>     }
>
>     public synchronized void stop() {
>         if (consumer != null) {
>             consumer.cancel(true);
>             consumer = null;
>         }
>     }
>
>     public final void put(T record) throws InterruptedException {
>         queue.put(record);
>     }
>
>     protected abstract void consume(BlockingQueue<T> queue, RecordWriter<T>
> writer)
>         throws InterruptedException;
> }
>
> class ContinuousWriteBehind<T> extends AbstractWriteBehind<T> {
>     ContinousWriteBehind(RecordWriter<T> writer) {
>         super(new LinkedBlockingQueue<T>(), writer);
>     }
>
>     protected void consume(BlockingQueue<T> q, RecordWriter<T> writer)
>             throws InterruptedException {
>         for (T rec; (rec = q.take()) != null; )
>             writer.write (Collections.singletonList(rec));
>     }
> }
>
> class BatchedWriteBehind<T> extends AbstractWriteBehind<T> {
>     private final int maxBuf;
>     private final List<T> buf;
>     private final long time;
>     private final TimeUnit unit;
>
>     BatchedWriteBehind(RecordWriter<T> writer, int
> capacity, int maxBuf,
>                        long time, TimeUnit unit) {
>         super(new ArrayBlockingQueue<T>(capacity), writer);
>         this.maxBuf = maxBuf;
>         this.buf = new ArrayList<T>(maxBuf);
>         this.time = time;
>         this.unit = unit;
>     }
>
>     protected void consume(BlockingQueue<T> q, RecordWriter<T> writer)
>             throws InterruptedException {
>         for (T rec; (rec = q.take()) != null; ) {
>             buf.add(rec);
>             while (buf.size() < maxBuf && (rec = q.poll(time, unit)) !=
> null)
>                  buf.add(rec);
>             writer.write(buf);
>             buf.clear();
>         }
>     }
> }
>
>  --tim
>

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://altair.cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: implementing a DB write-behind algorithm

Alexandru Popescu ☀
... and another thing (for which I am not sure, but my gut feeling is
saying so)... the writter process is taking longer time to process
than the queue posters, and the solution would lead to block all
posters till the writter finishes - because of the size-bound. (hope I
explained it good enough to be understandable :-) ). Swapping a new
queue may solve this issue.

./alex
--
.w( the_mindstorm )p.



On 4/30/06, Alexandru Popescu <[hidden email]> wrote:

> Hi Tim!
>
> And thanks for the first comments :-).
>
> My intention is mainly the minimize any/most of the locks of the
> writters, so responsiveness is maximum on this side. (the reason for
> looking at ConcurrentLinkedQueue).
>
> Thanks also for the code sample. It makes lot of sense to me. However,
> I have a few comments (in case I got it write):
> - considering that LinkedBlockingQueue is using different locks for
> put/take it looks like there is not penalty introduced by the
> consumer. Am I getting this right?
> - it looks like the batched writter is doing a continuous job on
> polling the queue. I was thinking that maybe I can find a way that
> this batched writter to do its job only when the limit was reached.
>
> Considering that while adding elements to the queue, I can determine
> the remaining capacity of the queue, than I might trigger manually the
> writter process and pass it the content of the current queue (probably
> use for this the drainTo()).
>
> ./alex
> --
> .w( the_mindstorm )p.
>
>
>
> On 4/30/06, Tim Peierls <[hidden email]> wrote:
> > On 4/30/06, Alexandru Popescu
> > <[hidden email] > wrote:
> >
> > > I firstly have to confess that when getting to concurrency related
> > > problems, I am getting confused quite quickly :-).
> >
> >
> > You're not alone! :-)
> >
> >
> > > Now, the current problem I am trying to solve is: I am trying to
> > > figure out how to implement a DB write-behind strategy. Multiple
> > > processes will post records to be written to the DB, but the actual
> > > writes should happen on a separate process. So, far I was thinking
> > > about 2 possible approaches:
> > > a) continous write-behind: multiple processes write to a queue which
> > > is continously polled by a separate process. When an element is found
> > > on the queue, than the write process removes it from queue and
> > > attempts to write it to the DB.
> > >
> > > To have this done, I was looking in the direction of
> > ConcurrentLinkedQueue.
> > >
> > > b) batched write-behind: multiple processes post to a size-bounded
> > > queue. When the max size is reached, the original queue is passed to
> > > the parallel write process and replaced with a new queue.
> > >
> > > To have this done, I was looking in the direction of
> > > LinkedBlockingQueue with an additional atomic operation of swapping
> > > the old queue with the new empty one.
> > >
> > > My question is: am I looking in the right direction or I am completely
> > > wrong. Any ideas and help are highly appreciated.
> > >
> >
> > The use of BlockingQueue.put makes it possible to implement strategies that
> > make the caller block, while still permitting strategies that don't block.
> > So I would avoid ConcurrentLinkedQueue here, because it does not implement
> > the BlockingQueue interface.
> >
> > You can use an unbounded LinkedBlockingQueue for continuous write-behind,
> > and ArrayBlockingQueue (always bounded) for batched write-behind. Instead of
> > swapping in a new queue, the consumer thread could just poll until the batch
> > size was reached (using a timeout to avoid the risk of batches never
> > completing), and then send the batch. The batch size need not be the same as
> > the queue capacity.
> >
> > Here's an uncompiled, untested fragment that illustrates the idea:
> >
> >  public interface WriteBehind<T> {
> >     void put(T record) throws InterruptedException;
> > }
> >
> > public interface RecordWriter<T> {
> >     void write(List<T> records) throws InterruptedException;
> >  }
> >
> > class AbstractWriteBehind<T> implements WriteBehind<T> {
> >     private final BlockingQueue<T> queue;
> >     private final RecordWriter<T> writer;
> >     @GuardedBy("this") private Future<Void> consumer = null;
> >
> >     protected AbstractWriteBehind(BlockingQueue<T> queue,
> > RecordWriter<T> writer) {
> >         this.queue = queue;
> >         this.writer = writer;
> >     }
> >
> >     class Consumer implements Callable<Void> {
> >         public Void call() throws InterruptedException {
> >             consume(queue, writer);
> >             return null;
> >         }
> >     }
> >
> >     public synchronized void start() {
> >         if (consumer == null) {
> >             ExecutorService exec =
> > Executors.newSingleThreadExecutor();
> >             try {
> >                 consumer = exec.submit(new Consumer());
> >             } finally {
> >                 exec.shutdown();
> >              }
> >         }
> >     }
> >
> >     public synchronized boolean isRunning() {
> >         return consumer != null;
> >     }
> >
> >     public synchronized void stop() {
> >         if (consumer != null) {
> >             consumer.cancel(true);
> >             consumer = null;
> >         }
> >     }
> >
> >     public final void put(T record) throws InterruptedException {
> >         queue.put(record);
> >     }
> >
> >     protected abstract void consume(BlockingQueue<T> queue, RecordWriter<T>
> > writer)
> >         throws InterruptedException;
> > }
> >
> > class ContinuousWriteBehind<T> extends AbstractWriteBehind<T> {
> >     ContinousWriteBehind(RecordWriter<T> writer) {
> >         super(new LinkedBlockingQueue<T>(), writer);
> >     }
> >
> >     protected void consume(BlockingQueue<T> q, RecordWriter<T> writer)
> >             throws InterruptedException {
> >         for (T rec; (rec = q.take()) != null; )
> >             writer.write (Collections.singletonList(rec));
> >     }
> > }
> >
> > class BatchedWriteBehind<T> extends AbstractWriteBehind<T> {
> >     private final int maxBuf;
> >     private final List<T> buf;
> >     private final long time;
> >     private final TimeUnit unit;
> >
> >     BatchedWriteBehind(RecordWriter<T> writer, int
> > capacity, int maxBuf,
> >                        long time, TimeUnit unit) {
> >         super(new ArrayBlockingQueue<T>(capacity), writer);
> >         this.maxBuf = maxBuf;
> >         this.buf = new ArrayList<T>(maxBuf);
> >         this.time = time;
> >         this.unit = unit;
> >     }
> >
> >     protected void consume(BlockingQueue<T> q, RecordWriter<T> writer)
> >             throws InterruptedException {
> >         for (T rec; (rec = q.take()) != null; ) {
> >             buf.add(rec);
> >             while (buf.size() < maxBuf && (rec = q.poll(time, unit)) !=
> > null)
> >                  buf.add(rec);
> >             writer.write(buf);
> >             buf.clear();
> >         }
> >     }
> > }
> >
> >  --tim
> >
>

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://altair.cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: implementing a DB write-behind algorithm

Richie.Jefts

If new jobs are queued faster than the writers can write, there is not a whole lot you can do. Something has to give. You can have the producers wait until writer is ready (which currently happens), have the producers do a CallersRunPolicy or throw away the produced item. Swapping the queues won't solve the fundamental issue that producers are faster than consumers.

You could also make the writers use a threadpool if writes can happen concurrently. In this case, each thread of the continuous write behind can pretty much do what Tim suggested, read from queue and write the data. The BatchedWriteBehind you could modify to something like:

protected void consume(BlockingQueue<T> q, RecordWriter<T> writer)
   throws InterruptedException {


    BlockingQueue<List<T>> itemQueue = new ArrayBlockingQueue<List<T>>(capacity);
   
   for (T rec; (rec = q.take()) != null; ) {

          List<T> buf = new ArrayList<T>();
       buf.add(rec);
       while (buf.size() < maxBuf && (rec = q.poll(time, unit)) != null)
           buf.add(rec);


        itemQueue.add(buf);
   }
}


then have threads on the writer side do:

public void run() {
    for (List<T> list; (list = itemQueue.take()) != null; )
       writer.write(list);

}

That would ensure each writer gets a full batch of data.

richie



"Alexandru Popescu" <[hidden email]>
Sent by: [hidden email]

04/30/2006 02:14 PM

To
[hidden email]
cc
evo1 <[hidden email]>, Tim Peierls <[hidden email]>
Subject
Re: [concurrency-interest] implementing a DB write-behind algorithm





... and another thing (for which I am not sure, but my gut feeling is
saying so)... the writter process is taking longer time to process
than the queue posters, and the solution would lead to block all
posters till the writter finishes - because of the size-bound. (hope I
explained it good enough to be understandable :-) ). Swapping a new
queue may solve this issue.

./alex
--
.w( the_mindstorm )p.



On 4/30/06, Alexandru Popescu <[hidden email]> wrote:
> Hi Tim!
>
> And thanks for the first comments :-).
>
> My intention is mainly the minimize any/most of the locks of the
> writters, so responsiveness is maximum on this side. (the reason for
> looking at ConcurrentLinkedQueue).
>
> Thanks also for the code sample. It makes lot of sense to me. However,
> I have a few comments (in case I got it write):
> - considering that LinkedBlockingQueue is using different locks for
> put/take it looks like there is not penalty introduced by the
> consumer. Am I getting this right?
> - it looks like the batched writter is doing a continuous job on
> polling the queue. I was thinking that maybe I can find a way that
> this batched writter to do its job only when the limit was reached.
>
> Considering that while adding elements to the queue, I can determine
> the remaining capacity of the queue, than I might trigger manually the
> writter process and pass it the content of the current queue (probably
> use for this the drainTo()).
>
> ./alex
> --
> .w( the_mindstorm )p.
>
>
>
> On 4/30/06, Tim Peierls <[hidden email]> wrote:
> > On 4/30/06, Alexandru Popescu
> > <[hidden email] > wrote:
> >
> > > I firstly have to confess that when getting to concurrency related
> > > problems, I am getting confused quite quickly :-).
> >
> >
> > You're not alone! :-)
> >
> >
> > > Now, the current problem I am trying to solve is: I am trying to
> > > figure out how to implement a DB write-behind strategy. Multiple
> > > processes will post records to be written to the DB, but the actual
> > > writes should happen on a separate process. So, far I was thinking
> > > about 2 possible approaches:
> > > a) continous write-behind: multiple processes write to a queue which
> > > is continously polled by a separate process. When an element is found
> > > on the queue, than the write process removes it from queue and
> > > attempts to write it to the DB.
> > >
> > > To have this done, I was looking in the direction of
> > ConcurrentLinkedQueue.
> > >
> > > b) batched write-behind: multiple processes post to a size-bounded
> > > queue. When the max size is reached, the original queue is passed to
> > > the parallel write process and replaced with a new queue.
> > >
> > > To have this done, I was looking in the direction of
> > > LinkedBlockingQueue with an additional atomic operation of swapping
> > > the old queue with the new empty one.
> > >
> > > My question is: am I looking in the right direction or I am completely
> > > wrong. Any ideas and help are highly appreciated.
> > >
> >
> > The use of BlockingQueue.put makes it possible to implement strategies that
> > make the caller block, while still permitting strategies that don't block.
> > So I would avoid ConcurrentLinkedQueue here, because it does not implement
> > the BlockingQueue interface.
> >
> > You can use an unbounded LinkedBlockingQueue for continuous write-behind,
> > and ArrayBlockingQueue (always bounded) for batched write-behind. Instead of
> > swapping in a new queue, the consumer thread could just poll until the batch
> > size was reached (using a timeout to avoid the risk of batches never
> > completing), and then send the batch. The batch size need not be the same as
> > the queue capacity.
> >
> > Here's an uncompiled, untested fragment that illustrates the idea:
> >
> >  public interface WriteBehind<T> {
> >     void put(T record) throws InterruptedException;
> > }
> >
> > public interface RecordWriter<T> {
> >     void write(List<T> records) throws InterruptedException;
> >  }
> >
> > class AbstractWriteBehind<T> implements WriteBehind<T> {
> >     private final BlockingQueue<T> queue;
> >     private final RecordWriter<T> writer;
> >     @GuardedBy("this") private Future<Void> consumer = null;
> >
> >     protected AbstractWriteBehind(BlockingQueue<T> queue,
> > RecordWriter<T> writer) {
> >         this.queue = queue;
> >         this.writer = writer;
> >     }
> >
> >     class Consumer implements Callable<Void> {
> >         public Void call() throws InterruptedException {
> >             consume(queue, writer);
> >             return null;
> >         }
> >     }
> >
> >     public synchronized void start() {
> >         if (consumer == null) {
> >             ExecutorService exec =
> > Executors.newSingleThreadExecutor();
> >             try {
> >                 consumer = exec.submit(new Consumer());
> >             } finally {
> >                 exec.shutdown();
> >              }
> >         }
> >     }
> >
> >     public synchronized boolean isRunning() {
> >         return consumer != null;
> >     }
> >
> >     public synchronized void stop() {
> >         if (consumer != null) {
> >             consumer.cancel(true);
> >             consumer = null;
> >         }
> >     }
> >
> >     public final void put(T record) throws InterruptedException {
> >         queue.put(record);
> >     }
> >
> >     protected abstract void consume(BlockingQueue<T> queue, RecordWriter<T>
> > writer)
> >         throws InterruptedException;
> > }
> >
> > class ContinuousWriteBehind<T> extends AbstractWriteBehind<T> {
> >     ContinousWriteBehind(RecordWriter<T> writer) {
> >         super(new LinkedBlockingQueue<T>(), writer);
> >     }
> >
> >     protected void consume(BlockingQueue<T> q, RecordWriter<T> writer)
> >             throws InterruptedException {
> >         for (T rec; (rec = q.take()) != null; )
> >             writer.write (Collections.singletonList(rec));
> >     }
> > }
> >
> > class BatchedWriteBehind<T> extends AbstractWriteBehind<T> {
> >     private final int maxBuf;
> >     private final List<T> buf;
> >     private final long time;
> >     private final TimeUnit unit;
> >
> >     BatchedWriteBehind(RecordWriter<T> writer, int
> > capacity, int maxBuf,
> >                        long time, TimeUnit unit) {
> >         super(new ArrayBlockingQueue<T>(capacity), writer);
> >         this.maxBuf = maxBuf;
> >         this.buf = new ArrayList<T>(maxBuf);
> >         this.time = time;
> >         this.unit = unit;
> >     }
> >
> >     protected void consume(BlockingQueue<T> q, RecordWriter<T> writer)
> >             throws InterruptedException {
> >         for (T rec; (rec = q.take()) != null; ) {
> >             buf.add(rec);
> >             while (buf.size() < maxBuf && (rec = q.poll(time, unit)) !=
> > null)
> >                  buf.add(rec);
> >             writer.write(buf);
> >             buf.clear();
> >         }
> >     }
> > }
> >
> >  --tim
> >
>

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://altair.cs.oswego.edu/mailman/listinfo/concurrency-interest


_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://altair.cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: implementing a DB write-behind algorithm

Alexandru Popescu ☀


On 4/30/06, [hidden email] <[hidden email]> wrote:

If new jobs are queued faster than the writers can write, there is not a whole lot you can do. Something has to give. You can have the producers wait until writer is ready (which currently happens), have the producers do a CallersRunPolicy or throw away the produced item.

 

Swapping the queues won't solve the fundamental issue that producers are faster than consumers.

Sorry, but I am missing the reason why... the producers will continue to post on empty queue, while  writter(s)  will continue to write. Balancing the two will keep the producers not block.
 

You could also make the writers use a threadpool if writes can happen concurrently.

I missed mentioning this part: yes writes can happen concurrently.

./alex
--
.w( the_mindstorm )p.
 

In this case, each thread of the continuous write behind can pretty much do what Tim suggested, read from queue and write the data. The BatchedWriteBehind you could modify to something like:


protected void consume(BlockingQueue<T> q, RecordWriter<T> writer)
   throws InterruptedException {


    BlockingQueue<List<T>> itemQueue = new ArrayBlockingQueue<List<T>>(capacity);
   
   for (T rec; (rec = q.take()) != null; ) {

          List<T> buf = new ArrayList<T>();

       buf.add(rec);
       while (buf.size() < maxBuf && (rec = q.poll(time, unit)) != null)
           buf.add(rec);


        itemQueue.add(buf);
   }
}


then have threads on the writer side do:

public void run() {
    for (List<T> list; (list = itemQueue.take()) != null; )
       writer.write(list);

}

That would ensure each writer gets a full batch of data.

richie



"Alexandru Popescu" <[hidden email]>
Sent by: [hidden email]

04/30/2006 02:14 PM

To
[hidden email]
cc
evo1 <[hidden email]>, Tim Peierls <[hidden email]>
Subject
Re: [concurrency-interest] implementing a DB write-behind algorithm







... and another thing (for which I am not sure, but my gut feeling is
saying so)... the writter process is taking longer time to process
than the queue posters, and the solution would lead to block all
posters till the writter finishes - because of the size-bound. (hope I
explained it good enough to be understandable :-) ). Swapping a new
queue may solve this issue.

./alex
--
.w( the_mindstorm )p.



On 4/30/06, Alexandru Popescu <[hidden email]> wrote:
> Hi Tim!
>
> And thanks for the first comments :-).
>
> My intention is mainly the minimize any/most of the locks of the
> writters, so responsiveness is maximum on this side. (the reason for
> looking at ConcurrentLinkedQueue).
>
> Thanks also for the code sample. It makes lot of sense to me. However,
> I have a few comments (in case I got it write):
> - considering that LinkedBlockingQueue is using different locks for
> put/take it looks like there is not penalty introduced by the
> consumer. Am I getting this right?
> - it looks like the batched writter is doing a continuous job on
> polling the queue. I was thinking that maybe I can find a way that
> this batched writter to do its job only when the limit was reached.
>
> Considering that while adding elements to the queue, I can determine
> the remaining capacity of the queue, than I might trigger manually the
> writter process and pass it the content of the current queue (probably
> use for this the drainTo()).
>
> ./alex
> --
> .w( the_mindstorm )p.
>
>
>
> On 4/30/06, Tim Peierls <[hidden email]> wrote:
> > On 4/30/06, Alexandru Popescu
> > <[hidden email] > wrote:
> >
> > > I firstly have to confess that when getting to concurrency related
> > > problems, I am getting confused quite quickly :-).
> >
> >
> > You're not alone! :-)
> >
> >
> > > Now, the current problem I am trying to solve is: I am trying to
> > > figure out how to implement a DB write-behind strategy. Multiple
> > > processes will post records to be written to the DB, but the actual
> > > writes should happen on a separate process. So, far I was thinking
> > > about 2 possible approaches:
> > > a) continous write-behind: multiple processes write to a queue which
> > > is continously polled by a separate process. When an element is found
> > > on the queue, than the write process removes it from queue and
> > > attempts to write it to the DB.
> > >
> > > To have this done, I was looking in the direction of
> > ConcurrentLinkedQueue.
> > >
> > > b) batched write-behind: multiple processes post to a size-bounded
> > > queue. When the max size is reached, the original queue is passed to
> > > the parallel write process and replaced with a new queue.
> > >
> > > To have this done, I was looking in the direction of
> > > LinkedBlockingQueue with an additional atomic operation of swapping
> > > the old queue with the new empty one.
> > >
> > > My question is: am I looking in the right direction or I am completely
> > > wrong. Any ideas and help are highly appreciated.
> > >
> >
> > The use of BlockingQueue.put makes it possible to implement strategies that
> > make the caller block, while still permitting strategies that don't block.
> > So I would avoid ConcurrentLinkedQueue here, because it does not implement
> > the BlockingQueue interface.
> >
> > You can use an unbounded LinkedBlockingQueue for continuous write-behind,
> > and ArrayBlockingQueue (always bounded) for batched write-behind. Instead of
> > swapping in a new queue, the consumer thread could just poll until the batch
> > size was reached (using a timeout to avoid the risk of batches never
> > completing), and then send the batch. The batch size need not be the same as
> > the queue capacity.
> >
> > Here's an uncompiled, untested fragment that illustrates the idea:
> >
> >  public interface WriteBehind<T> {
> >     void put(T record) throws InterruptedException;
> > }
> >
> > public interface RecordWriter<T> {
> >     void write(List<T> records) throws InterruptedException;
> >  }
> >
> > class AbstractWriteBehind<T> implements WriteBehind<T> {
> >     private final BlockingQueue<T> queue;
> >     private final RecordWriter<T> writer;
> >     @GuardedBy("this") private Future<Void> consumer = null;
> >
> >     protected AbstractWriteBehind(BlockingQueue<T> queue,
> > RecordWriter<T> writer) {
> >         this.queue = queue;
> >         this.writer = writer;
> >     }
> >
> >     class Consumer implements Callable<Void> {
> >         public Void call() throws InterruptedException {
> >             consume(queue, writer);
> >             return null;
> >         }
> >     }
> >
> >     public synchronized void start() {
> >         if (consumer == null) {
> >             ExecutorService exec =
> > Executors.newSingleThreadExecutor();
> >             try {
> >                 consumer = exec.submit(new Consumer());
> >             } finally {
> >                 exec.shutdown();
> >              }
> >         }
> >     }
> >
> >     public synchronized boolean isRunning() {
> >         return consumer != null;
> >     }
> >
> >     public synchronized void stop() {
> >         if (consumer != null) {
> >             consumer.cancel(true);
> >             consumer = null;
> >         }
> >     }
> >
> >     public final void put(T record) throws InterruptedException {
> >         queue.put(record);
> >     }
> >
> >     protected abstract void consume(BlockingQueue<T> queue, RecordWriter<T>
> > writer)
> >         throws InterruptedException;
> > }
> >
> > class ContinuousWriteBehind<T> extends AbstractWriteBehind<T> {
> >     ContinousWriteBehind(RecordWriter<T> writer) {
> >         super(new LinkedBlockingQueue<T>(), writer);
> >     }
> >
> >     protected void consume(BlockingQueue<T> q, RecordWriter<T> writer)
> >             throws InterruptedException {
> >         for (T rec; (rec = q.take()) != null; )
> >             writer.write (Collections.singletonList(rec));
> >     }
> > }
> >
> > class BatchedWriteBehind<T> extends AbstractWriteBehind<T> {
> >     private final int maxBuf;
> >     private final List<T> buf;
> >     private final long time;
> >     private final TimeUnit unit;
> >
> >     BatchedWriteBehind(RecordWriter<T> writer, int
> > capacity, int maxBuf,
> >                        long time, TimeUnit unit) {
> >         super(new ArrayBlockingQueue<T>(capacity), writer);
> >         this.maxBuf = maxBuf;
> >         this.buf = new ArrayList<T>(maxBuf);
> >         this.time = time;
> >         this.unit = unit;
> >     }
> >
> >     protected void consume(BlockingQueue<T> q, RecordWriter<T> writer)
> >             throws InterruptedException {
> >         for (T rec; (rec = q.take()) != null; ) {
> >             buf.add(rec);
> >             while (buf.size() < maxBuf && (rec = q.poll(time, unit)) !=
> > null)
> >                  buf.add(rec);
> >             writer.write(buf);
> >             buf.clear();
> >         }
> >     }
> > }
> >
> >  --tim
> >
>

_______________________________________________
Concurrency-interest mailing list
[hidden email]
<a href="http://altair.cs.oswego.edu/mailman/listinfo/concurrency-interest" target="_blank" onclick="return top.js.OpenExtLink(window,event,this)">http://altair.cs.oswego.edu/mailman/listinfo/concurrency-interest



_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://altair.cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: implementing a DB write-behind algorithm

Richie.Jefts

The ContinousWriteBehind does not block producers since it uses an unbounded queue. You could change the BatchWriteBehind to also use an unbounded queue so producers do not wait until writes are complete (switch the ArrayBlockingQueue to LinkedBlockingQueue). In this case the queues will continue to grow while writes are happening. But do you really want this behavior? If writes are always slower than producers you'll eventually hit out of memory problems.

richie



"Alexandru Popescu" <[hidden email]>

04/30/2006 02:53 PM

To
"[hidden email]" <[hidden email]>
cc
[hidden email], [hidden email], evo1 <[hidden email]>, "Tim Peierls" <[hidden email]>
Subject
Re: [concurrency-interest] implementing a DB write-behind algorithm







On 4/30/06, Richie.Jefts@... <Richie.Jefts@...> wrote:

If new jobs are queued faster than the writers can write, there is not a whole lot you can do. Something has to give. You can have the producers wait until writer is ready (which currently happens), have the producers do a CallersRunPolicy or throw away the produced item.




Swapping the queues won't solve the fundamental issue that producers are faster than consumers.

Sorry, but I am missing the reason why... the producers will continue to post on empty queue, while  writter(s)  will continue to write. Balancing the two will keep the producers not block.


You could also make the writers use a threadpool if writes can happen concurrently.

I missed mentioning this part: yes writes can happen concurrently.

./alex
--
.w( the_mindstorm )p.


In this case, each thread of the continuous write behind can pretty much do what Tim suggested, read from queue and write the data. The BatchedWriteBehind you could modify to something like:


protected void consume(BlockingQueue<T> q, RecordWriter<T> writer)
  throws InterruptedException {


    BlockingQueue<List<T>> itemQueue = new ArrayBlockingQueue<List<T>>(capacity);
   
  for (T rec; (rec = q.take()) != null; ) {

         List<T> buf = new ArrayList<T>();


      buf.add(rec);
      while (buf.size() < maxBuf && (rec = q.poll(time, unit)) != null)
          buf.add(rec);



       itemQueue.add(buf);
  }
}


then have threads on the writer side do:


public void run() {

   for (List<T> list; (list = itemQueue.take()) != null; )
      writer.write(list);

}


That would ensure each writer gets a full batch of data.


richie



"Alexandru Popescu" <the.mindstorm.mailinglist@...>
Sent by: [hidden email]

04/30/2006 02:14 PM


To
[hidden email]
cc
evo1 <the_mindstorm@...>, Tim Peierls <tim@...>
Subject
Re: [concurrency-interest] implementing a DB write-behind algorithm







... and another thing (for which I am not sure, but my gut feeling is
saying so)... the writter process is taking longer time to process
than the queue posters, and the solution would lead to block all
posters till the writter finishes - because of the size-bound. (hope I
explained it good enough to be understandable :-) ). Swapping a new
queue may solve this issue.

./alex
--
.w( the_mindstorm )p.



On 4/30/06, Alexandru Popescu <
the.mindstorm.mailinglist@...> wrote:
> Hi Tim!
>
> And thanks for the first comments :-).
>
> My intention is mainly the minimize any/most of the locks of the
> writters, so responsiveness is maximum on this side. (the reason for
> looking at ConcurrentLinkedQueue).
>
> Thanks also for the code sample. It makes lot of sense to me. However,
> I have a few comments (in case I got it write):
> - considering that LinkedBlockingQueue is using different locks for
> put/take it looks like there is not penalty introduced by the
> consumer. Am I getting this right?
> - it looks like the batched writter is doing a continuous job on
> polling the queue. I was thinking that maybe I can find a way that
> this batched writter to do its job only when the limit was reached.
>
> Considering that while adding elements to the queue, I can determine
> the remaining capacity of the queue, than I might trigger manually the
> writter process and pass it the content of the current queue (probably
> use for this the drainTo()).
>
> ./alex
> --
> .w( the_mindstorm )p.
>
>
>
> On 4/30/06, Tim Peierls <
tim@...> wrote:
> > On 4/30/06, Alexandru Popescu
> > <
the.mindstorm.mailinglist@... > wrote:
> >
> > > I firstly have to confess that when getting to concurrency related
> > > problems, I am getting confused quite quickly :-).
> >
> >
> > You're not alone! :-)
> >
> >
> > > Now, the current problem I am trying to solve is: I am trying to
> > > figure out how to implement a DB write-behind strategy. Multiple
> > > processes will post records to be written to the DB, but the actual
> > > writes should happen on a separate process. So, far I was thinking
> > > about 2 possible approaches:
> > > a) continous write-behind: multiple processes write to a queue which
> > > is continously polled by a separate process. When an element is found
> > > on the queue, than the write process removes it from queue and
> > > attempts to write it to the DB.
> > >
> > > To have this done, I was looking in the direction of
> > ConcurrentLinkedQueue.
> > >
> > > b) batched write-behind: multiple processes post to a size-bounded
> > > queue. When the max size is reached, the original queue is passed to
> > > the parallel write process and replaced with a new queue.
> > >
> > > To have this done, I was looking in the direction of
> > > LinkedBlockingQueue with an additional atomic operation of swapping
> > > the old queue with the new empty one.
> > >
> > > My question is: am I looking in the right direction or I am completely
> > > wrong. Any ideas and help are highly appreciated.
> > >
> >
> > The use of BlockingQueue.put makes it possible to implement strategies that
> > make the caller block, while still permitting strategies that don't block.
> > So I would avoid ConcurrentLinkedQueue here, because it does not implement
> > the BlockingQueue interface.
> >
> > You can use an unbounded LinkedBlockingQueue for continuous write-behind,
> > and ArrayBlockingQueue (always bounded) for batched write-behind. Instead of
> > swapping in a new queue, the consumer thread could just poll until the batch
> > size was reached (using a timeout to avoid the risk of batches never
> > completing), and then send the batch. The batch size need not be the same as
> > the queue capacity.
> >
> > Here's an uncompiled, untested fragment that illustrates the idea:
> >
> >  public interface WriteBehind<T> {
> >     void put(T record) throws InterruptedException;
> > }
> >
> > public interface RecordWriter<T> {
> >     void write(List<T> records) throws InterruptedException;
> >  }
> >
> > class AbstractWriteBehind<T> implements WriteBehind<T> {
> >     private final BlockingQueue<T> queue;
> >     private final RecordWriter<T> writer;
> >     @GuardedBy("this") private Future<Void> consumer = null;
> >
> >     protected AbstractWriteBehind(BlockingQueue<T> queue,
> > RecordWriter<T> writer) {
> >         this.queue = queue;
> >         this.writer = writer;
> >     }
> >
> >     class Consumer implements Callable<Void> {
> >         public Void call() throws InterruptedException {
> >             consume(queue, writer);
> >             return null;
> >         }
> >     }
> >
> >     public synchronized void start() {
> >         if (consumer == null) {
> >             ExecutorService exec =
> > Executors.newSingleThreadExecutor();
> >             try {
> >                 consumer = exec.submit(new Consumer());
> >             } finally {
> >                 exec.shutdown();
> >              }
> >         }
> >     }
> >
> >     public synchronized boolean isRunning() {
> >         return consumer != null;
> >     }
> >
> >     public synchronized void stop() {
> >         if (consumer != null) {
> >             consumer.cancel(true);
> >             consumer = null;
> >         }
> >     }
> >
> >     public final void put(T record) throws InterruptedException {
> >         queue.put(record);
> >     }
> >
> >     protected abstract void consume(BlockingQueue<T> queue, RecordWriter<T>
> > writer)
> >         throws InterruptedException;
> > }
> >
> > class ContinuousWriteBehind<T> extends AbstractWriteBehind<T> {
> >     ContinousWriteBehind(RecordWriter<T> writer) {
> >         super(new LinkedBlockingQueue<T>(), writer);
> >     }
> >
> >     protected void consume(BlockingQueue<T> q, RecordWriter<T> writer)
> >             throws InterruptedException {
> >         for (T rec; (rec = q.take()) != null; )
> >             writer.write (Collections.singletonList(rec));
> >     }
> > }
> >
> > class BatchedWriteBehind<T> extends AbstractWriteBehind<T> {
> >     private final int maxBuf;
> >     private final List<T> buf;
> >     private final long time;
> >     private final TimeUnit unit;
> >
> >     BatchedWriteBehind(RecordWriter<T> writer, int
> > capacity, int maxBuf,
> >                        long time, TimeUnit unit) {
> >         super(new ArrayBlockingQueue<T>(capacity), writer);
> >         this.maxBuf = maxBuf;
> >         this.buf = new ArrayList<T>(maxBuf);
> >         this.time = time;
> >         this.unit = unit;
> >     }
> >
> >     protected void consume(BlockingQueue<T> q, RecordWriter<T> writer)
> >             throws InterruptedException {
> >         for (T rec; (rec = q.take()) != null; ) {
> >             buf.add(rec);
> >             while (buf.size() < maxBuf && (rec = q.poll(time, unit)) !=
> > null)
> >                  buf.add(rec);
> >             writer.write(buf);
> >             buf.clear();
> >         }
> >     }
> > }
> >
> >  --tim
> >
>

_______________________________________________
Concurrency-interest mailing list

[hidden email]
http://altair.cs.oswego.edu/mailman/listinfo/concurrency-interest



_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://altair.cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: implementing a DB write-behind algorithm

Alexandru Popescu ☀
On 4/30/06, [hidden email] <[hidden email]> wrote:

The ContinousWriteBehind does not block producers since it uses an unbounded queue. You could change the BatchWriteBehind to also use an unbounded queue so producers do not wait until writes are complete (switch the ArrayBlockingQueue to LinkedBlockingQueue). In this case the queues will continue to grow while writes are happening. But do you really want this behavior? If writes are always slower than producers you'll eventually hit out of memory problems.

richie

Thanks Rickie. Probably, I need to clarify one thing: a "write" operation is slower than a "put" operation (a put op means adding some data to a queue, while a write op means persisting it in a DB). But this doesn't imply that the system will hit the system limits, because you can have multiple writers (still I agree with you that theoretically speaking it may happen if no good balance can be computed).
Now, in time the "put" ops frequency are fluctuant, while the writes can be continuous.

hope this clarifies some of the points I've missed,

./alex
--
.w( the_mindstorm )p.


"Alexandru Popescu" <[hidden email]>

04/30/2006 02:53 PM

To
cc
[hidden email], [hidden email], evo1 <[hidden email]>, "Tim Peierls" <[hidden email]>
Subject
Re: [concurrency-interest] implementing a DB write-behind algorithm









On 4/30/06, [hidden email] <[hidden email]> wrote:

If new jobs are queued faster than the writers can write, there is not a whole lot you can do. Something has to give. You can have the producers wait until writer is ready (which currently happens), have the producers do a CallersRunPolicy or throw away the produced item.




Swapping the queues won't solve the fundamental issue that producers are faster than consumers.

Sorry, but I am missing the reason why... the producers will continue to post on empty queue, while  writter(s)  will continue to write. Balancing the two will keep the producers not block.


You could also make the writers use a threadpool if writes can happen concurrently.

I missed mentioning this part: yes writes can happen concurrently.

./alex
--
.w( the_mindstorm )p.


In this case, each thread of the continuous write behind can pretty much do what Tim suggested, read from queue and write the data. The BatchedWriteBehind you could modify to something like:


protected void consume(BlockingQueue<T> q, RecordWriter<T> writer)
  throws InterruptedException {


    BlockingQueue<List<T>> itemQueue = new ArrayBlockingQueue<List<T>>(capacity);
   
  for (T rec; (rec = q.take()) != null; ) {

         List<T> buf = new ArrayList<T>();


      buf.add(rec);
      while (buf.size() < maxBuf && (rec = q.poll(time, unit)) != null)
          buf.add(rec);



       itemQueue.add(buf);
  }
}


then have threads on the writer side do:


public void run() {

   for (List<T> list; (list = itemQueue.take()) != null; )
      writer.write(list);

}


That would ensure each writer gets a full batch of data.


richie



"Alexandru Popescu" <[hidden email]>
Sent by: [hidden email]

04/30/2006 02:14 PM


To
[hidden email]
cc
evo1 <[hidden email]>, Tim Peierls <[hidden email] >
Subject
Re: [concurrency-interest] implementing a DB write-behind algorithm









... and another thing (for which I am not sure, but my gut feeling is
saying so)... the writter process is taking longer time to process
than the queue posters, and the solution would lead to block all
posters till the writter finishes - because of the size-bound. (hope I
explained it good enough to be understandable :-) ). Swapping a new
queue may solve this issue.

./alex
--
.w( the_mindstorm )p.



On 4/30/06, Alexandru Popescu <
[hidden email]> wrote:
> Hi Tim!
>
> And thanks for the first comments :-).
>
> My intention is mainly the minimize any/most of the locks of the
> writters, so responsiveness is maximum on this side. (the reason for
> looking at ConcurrentLinkedQueue).
>
> Thanks also for the code sample. It makes lot of sense to me. However,
> I have a few comments (in case I got it write):
> - considering that LinkedBlockingQueue is using different locks for
> put/take it looks like there is not penalty introduced by the
> consumer. Am I getting this right?
> - it looks like the batched writter is doing a continuous job on
> polling the queue. I was thinking that maybe I can find a way that
> this batched writter to do its job only when the limit was reached.
>
> Considering that while adding elements to the queue, I can determine
> the remaining capacity of the queue, than I might trigger manually the
> writter process and pass it the content of the current queue (probably
> use for this the drainTo()).
>
> ./alex
> --
> .w( the_mindstorm )p.
>
>
>
> On 4/30/06, Tim Peierls <
[hidden email] > wrote:
> > On 4/30/06, Alexandru Popescu
> > <
[hidden email] > wrote:
> >
> > > I firstly have to confess that when getting to concurrency related
> > > problems, I am getting confused quite quickly :-).
> >
> >
> > You're not alone! :-)
> >
> >
> > > Now, the current problem I am trying to solve is: I am trying to
> > > figure out how to implement a DB write-behind strategy. Multiple
> > > processes will post records to be written to the DB, but the actual
> > > writes should happen on a separate process. So, far I was thinking
> > > about 2 possible approaches:
> > > a) continous write-behind: multiple processes write to a queue which
> > > is continously polled by a separate process. When an element is found
> > > on the queue, than the write process removes it from queue and
> > > attempts to write it to the DB.
> > >
> > > To have this done, I was looking in the direction of
> > ConcurrentLinkedQueue.
> > >
> > > b) batched write-behind: multiple processes post to a size-bounded
> > > queue. When the max size is reached, the original queue is passed to
> > > the parallel write process and replaced with a new queue.
> > >
> > > To have this done, I was looking in the direction of
> > > LinkedBlockingQueue with an additional atomic operation of swapping
> > > the old queue with the new empty one.
> > >
> > > My question is: am I looking in the right direction or I am completely
> > > wrong. Any ideas and help are highly appreciated.
> > >
> >
> > The use of BlockingQueue.put makes it possible to implement strategies that
> > make the caller block, while still permitting strategies that don't block.
> > So I would avoid ConcurrentLinkedQueue here, because it does not implement
> > the BlockingQueue interface.
> >
> > You can use an unbounded LinkedBlockingQueue for continuous write-behind,
> > and ArrayBlockingQueue (always bounded) for batched write-behind. Instead of
> > swapping in a new queue, the consumer thread could just poll until the batch
> > size was reached (using a timeout to avoid the risk of batches never
> > completing), and then send the batch. The batch size need not be the same as
> > the queue capacity.
> >
> > Here's an uncompiled, untested fragment that illustrates the idea:
> >
> >  public interface WriteBehind<T> {
> >     void put(T record) throws InterruptedException;
> > }
> >
> > public interface RecordWriter<T> {
> >     void write(List<T> records) throws InterruptedException;
> >  }
> >
> > class AbstractWriteBehind<T> implements WriteBehind<T> {
> >     private final BlockingQueue<T> queue;
> >     private final RecordWriter<T> writer;
> >     @GuardedBy("this") private Future<Void> consumer = null;
> >
> >     protected AbstractWriteBehind(BlockingQueue<T> queue,
> > RecordWriter<T> writer) {
> >         this.queue = queue;
> >         this.writer = writer;
> >     }
> >
> >     class Consumer implements Callable<Void> {
> >         public Void call() throws InterruptedException {
> >             consume(queue, writer);
> >             return null;
> >         }
> >     }
> >
> >     public synchronized void start() {
> >         if (consumer == null) {
> >             ExecutorService exec =
> > Executors.newSingleThreadExecutor();
> >             try {
> >                 consumer = exec.submit(new Consumer());
> >             } finally {
> >                 exec.shutdown();
> >              }
> >         }
> >     }
> >
> >     public synchronized boolean isRunning() {
> >         return consumer != null;
> >     }
> >
> >     public synchronized void stop() {
> >         if (consumer != null) {
> >             consumer.cancel(true);
> >             consumer = null;
> >         }
> >     }
> >
> >     public final void put(T record) throws InterruptedException {
> >         queue.put(record);
> >     }
> >
> >     protected abstract void consume(BlockingQueue<T> queue, RecordWriter<T>
> > writer)
> >         throws InterruptedException;
> > }
> >
> > class ContinuousWriteBehind<T> extends AbstractWriteBehind<T> {
> >     ContinousWriteBehind(RecordWriter<T> writer) {
> >         super(new LinkedBlockingQueue<T>(), writer);
> >     }
> >
> >     protected void consume(BlockingQueue<T> q, RecordWriter<T> writer)
> >             throws InterruptedException {
> >         for (T rec; (rec = q.take()) != null; )
> >             writer.write (Collections.singletonList(rec));
> >     }
> > }
> >
> > class BatchedWriteBehind<T> extends AbstractWriteBehind<T> {
> >     private final int maxBuf;
> >     private final List<T> buf;
> >     private final long time;
> >     private final TimeUnit unit;
> >
> >     BatchedWriteBehind(RecordWriter<T> writer, int
> > capacity, int maxBuf,
> >                        long time, TimeUnit unit) {
> >         super(new ArrayBlockingQueue<T>(capacity), writer);
> >         this.maxBuf = maxBuf;
> >         this.buf = new ArrayList<T>(maxBuf);
> >         this.time = time;
> >         this.unit = unit;
> >     }
> >
> >     protected void consume(BlockingQueue<T> q, RecordWriter<T> writer)
> >             throws InterruptedException {
> >         for (T rec; (rec = q.take()) != null; ) {
> >             buf.add(rec);
> >             while (buf.size() < maxBuf && (rec = q.poll(time, unit)) !=
> > null)
> >                  buf.add(rec);
> >             writer.write(buf);
> >             buf.clear();
> >         }
> >     }
> > }
> >
> >  --tim
> >
>

_______________________________________________
Concurrency-interest mailing list

[hidden email]
<a href="http://altair.cs.oswego.edu/mailman/listinfo/concurrency-interest" target="_blank" onclick="return top.js.OpenExtLink(window,event,this)">http://altair.cs.oswego.edu/mailman/listinfo/concurrency-interest




_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://altair.cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: implementing a DB write-behind algorithm

Alexandru Popescu ☀
In reply to this post by tpeierls
One more question about the code sample: what is the
[code]@GuardedBy("this")[/code] annotation responsible for? Mark that
any operations on that field must be synched?

./alex
--
.w( the_mindstorm )p.


On 4/30/06, Tim Peierls <[hidden email]> wrote:

> On 4/30/06, Alexandru Popescu
> <[hidden email] > wrote:
>
> > I firstly have to confess that when getting to concurrency related
> > problems, I am getting confused quite quickly :-).
>
>
> You're not alone! :-)
>
>
> > Now, the current problem I am trying to solve is: I am trying to
> > figure out how to implement a DB write-behind strategy. Multiple
> > processes will post records to be written to the DB, but the actual
> > writes should happen on a separate process. So, far I was thinking
> > about 2 possible approaches:
> > a) continous write-behind: multiple processes write to a queue which
> > is continously polled by a separate process. When an element is found
> > on the queue, than the write process removes it from queue and
> > attempts to write it to the DB.
> >
> > To have this done, I was looking in the direction of
> ConcurrentLinkedQueue.
> >
> > b) batched write-behind: multiple processes post to a size-bounded
> > queue. When the max size is reached, the original queue is passed to
> > the parallel write process and replaced with a new queue.
> >
> > To have this done, I was looking in the direction of
> > LinkedBlockingQueue with an additional atomic operation of swapping
> > the old queue with the new empty one.
> >
> > My question is: am I looking in the right direction or I am completely
> > wrong. Any ideas and help are highly appreciated.
> >
>
> The use of BlockingQueue.put makes it possible to implement strategies that
> make the caller block, while still permitting strategies that don't block.
> So I would avoid ConcurrentLinkedQueue here, because it does not implement
> the BlockingQueue interface.
>
> You can use an unbounded LinkedBlockingQueue for continuous write-behind,
> and ArrayBlockingQueue (always bounded) for batched write-behind. Instead of
> swapping in a new queue, the consumer thread could just poll until the batch
> size was reached (using a timeout to avoid the risk of batches never
> completing), and then send the batch. The batch size need not be the same as
> the queue capacity.
>
> Here's an uncompiled, untested fragment that illustrates the idea:
>
>  public interface WriteBehind<T> {
>     void put(T record) throws InterruptedException;
> }
>
> public interface RecordWriter<T> {
>     void write(List<T> records) throws InterruptedException;
>  }
>
> class AbstractWriteBehind<T> implements WriteBehind<T> {
>     private final BlockingQueue<T> queue;
>     private final RecordWriter<T> writer;
>     @GuardedBy("this") private Future<Void> consumer = null;
>
>     protected AbstractWriteBehind(BlockingQueue<T> queue,
> RecordWriter<T> writer) {
>         this.queue = queue;
>         this.writer = writer;
>     }
>
>     class Consumer implements Callable<Void> {
>         public Void call() throws InterruptedException {
>             consume(queue, writer);
>             return null;
>         }
>     }
>
>     public synchronized void start() {
>         if (consumer == null) {
>             ExecutorService exec =
> Executors.newSingleThreadExecutor();
>             try {
>                 consumer = exec.submit(new Consumer());
>             } finally {
>                 exec.shutdown();
>              }
>         }
>     }
>
>     public synchronized boolean isRunning() {
>         return consumer != null;
>     }
>
>     public synchronized void stop() {
>         if (consumer != null) {
>             consumer.cancel(true);
>             consumer = null;
>         }
>     }
>
>     public final void put(T record) throws InterruptedException {
>         queue.put(record);
>     }
>
>     protected abstract void consume(BlockingQueue<T> queue, RecordWriter<T>
> writer)
>         throws InterruptedException;
> }
>
> class ContinuousWriteBehind<T> extends AbstractWriteBehind<T> {
>     ContinousWriteBehind(RecordWriter<T> writer) {
>         super(new LinkedBlockingQueue<T>(), writer);
>     }
>
>     protected void consume(BlockingQueue<T> q, RecordWriter<T> writer)
>             throws InterruptedException {
>         for (T rec; (rec = q.take()) != null; )
>             writer.write (Collections.singletonList(rec));
>     }
> }
>
> class BatchedWriteBehind<T> extends AbstractWriteBehind<T> {
>     private final int maxBuf;
>     private final List<T> buf;
>     private final long time;
>     private final TimeUnit unit;
>
>     BatchedWriteBehind(RecordWriter<T> writer, int
> capacity, int maxBuf,
>                        long time, TimeUnit unit) {
>         super(new ArrayBlockingQueue<T>(capacity), writer);
>         this.maxBuf = maxBuf;
>         this.buf = new ArrayList<T>(maxBuf);
>         this.time = time;
>         this.unit = unit;
>     }
>
>     protected void consume(BlockingQueue<T> q, RecordWriter<T> writer)
>             throws InterruptedException {
>         for (T rec; (rec = q.take()) != null; ) {
>             buf.add(rec);
>             while (buf.size() < maxBuf && (rec = q.poll(time, unit)) !=
> null)
>                  buf.add(rec);
>             writer.write(buf);
>             buf.clear();
>         }
>     }
> }
>
>  --tim
>

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://altair.cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: implementing a DB write-behind algorithm

tpeierls
Short answer: Yes.

Longer answer:
<shamelessPlug>
We introduce the @GuardedBy annotation in Java Concurrency in Practice. @GuardedBy(lock) documents that a field or method should be accessed only with the specific lock held. In particular, @GuardedBy("this") means that all accesses must be done while holding the intrinsic lock of the containing object. Documenting the synchronization policy of a class through such annotations makes it easier to reason about its correctness and easier to maintain the class without breaking it.

All non-volatile, non-final fields of a class that can be accessed by multiple threads must be guarded in this way. In that code example, the start, stop, and isRunning methods all access the consumer field. I would have made the field volatile, but the first two methods are check-then-act sequences that must be performed atomically with respect to each other, so volatile isn't enough in this case.
</shamelessPlug>

Regarding your concern about polling from the database writer thread:

BlockingQueue is generally the appropriate interface to use when dealing with producer-consumer designs, because it gives you the most flexibility in dealing with overproduction (producers adding work faster than consumers can process it) or underproduction (consumers processing work faster than producers can supply them). 
It's perfectly reasonable for a consumer thread to loop calling take() and timed poll(). You normally don't want to spin in an untimed poll() loop -- but that's what you'd be forced to do with a ConcurrentLinkedQueue.

As Richie Jefts hinted, you should to be prepared to handle saturation from the upstream (producer) end. You have two main choices: use a timed queue.put() behind the scenes and throw an application-level checked exception on timeout -- forcing clients to catch that checked exception -- or simply use an untimed queue.put() and document that WriteBehind.put can block indefinitely by putting "throws InterruptedException" in the signature, as in the sample code.

--tim



On 5/1/06, Alexandru Popescu <[hidden email]> wrote:
One more question about the code sample: what is the
[code]@GuardedBy("this")[/code] annotation responsible for? Mark that
any operations on that field must be synched?

./alex
--
.w( the_mindstorm )p.


On 4/30/06, Tim Peierls <[hidden email]> wrote:

> On 4/30/06, Alexandru Popescu
> <[hidden email] > wrote:
>
> > I firstly have to confess that when getting to concurrency related
> > problems, I am getting confused quite quickly :-).
>
>
> You're not alone! :-)
>
>
> > Now, the current problem I am trying to solve is: I am trying to
> > figure out how to implement a DB write-behind strategy. Multiple
> > processes will post records to be written to the DB, but the actual
> > writes should happen on a separate process. So, far I was thinking
> > about 2 possible approaches:
> > a) continous write-behind: multiple processes write to a queue which
> > is continously polled by a separate process. When an element is found
> > on the queue, than the write process removes it from queue and
> > attempts to write it to the DB.
> >
> > To have this done, I was looking in the direction of
> ConcurrentLinkedQueue.
> >
> > b) batched write-behind: multiple processes post to a size-bounded
> > queue. When the max size is reached, the original queue is passed to
> > the parallel write process and replaced with a new queue.
> >
> > To have this done, I was looking in the direction of
> > LinkedBlockingQueue with an additional atomic operation of swapping
> > the old queue with the new empty one.
> >
> > My question is: am I looking in the right direction or I am completely
> > wrong. Any ideas and help are highly appreciated.
> >
>
> The use of BlockingQueue.put makes it possible to implement strategies that
> make the caller block, while still permitting strategies that don't block.
> So I would avoid ConcurrentLinkedQueue here, because it does not implement
> the BlockingQueue interface.
>
> You can use an unbounded LinkedBlockingQueue for continuous write-behind,
> and ArrayBlockingQueue (always bounded) for batched write-behind. Instead of
> swapping in a new queue, the consumer thread could just poll until the batch
> size was reached (using a timeout to avoid the risk of batches never
> completing), and then send the batch. The batch size need not be the same as
> the queue capacity.
>
> Here's an uncompiled, untested fragment that illustrates the idea:
>
>  public interface WriteBehind<T> {
>     void put(T record) throws InterruptedException;
> }
>
> public interface RecordWriter<T> {
>     void write(List<T> records) throws InterruptedException;
>  }
>
> class AbstractWriteBehind<T> implements WriteBehind<T> {
>     private final BlockingQueue<T> queue;
>     private final RecordWriter<T> writer;
>     @GuardedBy("this") private Future<Void> consumer = null;
>
>     protected AbstractWriteBehind(BlockingQueue<T> queue,
> RecordWriter<T> writer) {
>         this.queue = queue;
>         this.writer = writer;
>     }
>
>     class Consumer implements Callable<Void> {
>         public Void call() throws InterruptedException {
>             consume(queue, writer);
>             return null;
>         }
>     }
>
>     public synchronized void start() {
>         if (consumer == null) {
>             ExecutorService exec =
> Executors.newSingleThreadExecutor();
>             try {
>                 consumer = exec.submit(new Consumer());
>             } finally {

>                 exec.shutdown();
>              }
>         }
>     }
>
>     public synchronized boolean isRunning() {
>         return consumer != null;
>     }
>
>     public synchronized void stop() {
>         if (consumer != null) {
>             consumer.cancel(true);
>             consumer = null;
>         }
>     }
>

>     public final void put(T record) throws InterruptedException {
>         queue.put(record);
>     }
>
>     protected abstract void consume(BlockingQueue<T> queue, RecordWriter<T>
> writer)
>         throws InterruptedException;
> }
>
> class ContinuousWriteBehind<T> extends AbstractWriteBehind<T> {
>     ContinousWriteBehind(RecordWriter<T> writer) {
>         super(new LinkedBlockingQueue<T>(), writer);
>     }
>
>     protected void consume(BlockingQueue<T> q, RecordWriter<T> writer)
>             throws InterruptedException {
>         for (T rec; (rec = q.take()) != null; )
>             writer.write (Collections.singletonList(rec));
>     }
> }
>
> class BatchedWriteBehind<T> extends AbstractWriteBehind<T> {
>     private final int maxBuf;
>     private final List<T> buf;
>     private final long time;
>     private final TimeUnit unit;
>
>     BatchedWriteBehind(RecordWriter<T> writer, int
> capacity, int maxBuf,
>                        long time, TimeUnit unit) {
>         super(new ArrayBlockingQueue<T>(capacity), writer);
>         this.maxBuf = maxBuf;
>         this.buf = new ArrayList<T>(maxBuf);
>         this.time = time;
>         this.unit = unit;
>     }
>
>     protected void consume(BlockingQueue<T> q, RecordWriter<T> writer)
>             throws InterruptedException {
>         for (T rec; (rec = q.take()) != null; ) {
>             buf.add(rec);
>             while (buf.size() < maxBuf && (rec = q.poll(time, unit)) !=
> null)
>                   buf.add(rec);
>             writer.write(buf);
>             buf.clear();
>         }
>     }
> }
>
>  --tim
>

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://altair.cs.oswego.edu/mailman/listinfo/concurrency-interest


_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://altair.cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: implementing a DB write-behind algorithm

Brian Goetz
In reply to this post by Alexandru Popescu ☀
> One more question about the code sample: what is the
> [code]@GuardedBy("this")[/code] annotation responsible for? Mark that
> any operations on that field must be synched?

This is an annotation we introduce in the book.

@ThreadSafe
public class Counter {
   @GuardedBy("this") private int value;

   public synchronized int getNext() { return value++; }
}

In this case, we say value is guarded-by 'this' because we achieve
thread safety by ensuring that whenever a thread accesses value, it must
hold the Counter lock.  Its a way of documenting your synchroinzation
policy, so that when the class is extended/maintained, you know how the
locks are used to ensure thread safety and therefore are less likely to
break it.
_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://altair.cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: implementing a DB write-behind algorithm

studdugie
In reply to this post by Alexandru Popescu ☀
Another option is to use a CLQ w/ an AtomicBoolean or (now that I
think about it) an AtomicInteger (AI).

The logic goes like this. Thread has database write so it increments
AtomicInteger and compares it against guard variable
(MAX_CONCURR_WRITERS). If AI <= MAX_CONCURR_WRITERS then enter write
loop and drain the CLQ.

The code may look something like this.

void dbPut(DbData data)
{
    dbWriteCLQ.offer( data );

    try
    {
        if(AI.incrementAndGet() <= MAX_CONCURR_WRITERS )
            while(null !=(data = dbWriteCLQ.poll()))
                doDatabaseWrite( data );
    }
    finally
    {
        AI.decrementAndGet();
    }
}

I call it the hijack approach because instead of having one or more
specialist threads to do the job you use a CAS to "hijack" any thread
that happens to be passing by at the wrong (or right) place at the
wrong (or right) time.

Like I said in the first paragraph, you could also use an
AtomicBoolean to do the hijacking (CAS) but that limits you to one
thread.

Obviously hijacking isn't the best solution all the time because
you've pulled a thread away from its normal flow and that flow may be
time sensitive (ex. responding to an HTTP request). But if you have
complete control over every thread that can get hijacked you may be
able to get away w/ it. Your best bet (as always) is slap a profiler
on the code w/ varying loads after you've determined your throughput
minimum.

Love, peace, and hair grease,

Dane

On 4/30/06, Alexandru Popescu <[hidden email]> wrote:

> Hi!
>
> I firstly have to confess that when getting to concurrency related
> problems, I am getting confused quite quickly :-).
>
> Now, the current problem I am trying to solve is: I am trying to
> figure out how to implement a DB write-behind strategy. Multiple
> processes will post records to be written to the DB, but the actual
> writes should happen on a separate process. So, far I was thinking
> about 2 possible approaches:
> a) continous write-behind: multiple processes write to a queue which
> is continously polled by a separate process. When an element is found
> on the queue, than the write process removes it from queue and
> attempts to write it to the DB.
>
> To have this done, I was looking in the direction of ConcurrentLinkedQueue.
>
> b) batched write-behind: multiple processes post to a size-bounded
> queue. When the max size is reached, the original queue is passed to
> the parallel write process and replaced with a new queue.
>
> To have this done, I was looking in the direction of
> LinkedBlockingQueue with an additional atomic operation of swapping
> the old queue with the new empty one.
>
> My question is: am I looking in the right direction or I am completely
> wrong. Any ideas and help are highly appreciated.
>
> ./alex
> --
> .w( the_mindstorm )p.
>
> _______________________________________________
> Concurrency-interest mailing list
> [hidden email]
> http://altair.cs.oswego.edu/mailman/listinfo/concurrency-interest
>

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://altair.cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: implementing a DB write-behind algorithm

Alexandru Popescu ☀
This is gonna be stupid, but I've seen it used on different places
(sourcecode included), and couldn't find a definition [blushing/]:
CAS=Compair-and-swap?

./alex
--
.w( the_mindstorm )p.


On 5/2/06, studdugie <[hidden email]> wrote:

> Another option is to use a CLQ w/ an AtomicBoolean or (now that I
> think about it) an AtomicInteger (AI).
>
> The logic goes like this. Thread has database write so it increments
> AtomicInteger and compares it against guard variable
> (MAX_CONCURR_WRITERS). If AI <= MAX_CONCURR_WRITERS then enter write
> loop and drain the CLQ.
>
> The code may look something like this.
>
> void dbPut(DbData data)
> {
>     dbWriteCLQ.offer( data );
>
>     try
>     {
>         if(AI.incrementAndGet() <= MAX_CONCURR_WRITERS )
>             while(null !=(data = dbWriteCLQ.poll()))
>                 doDatabaseWrite( data );
>     }
>     finally
>     {
>         AI.decrementAndGet();
>     }
> }
>
> I call it the hijack approach because instead of having one or more
> specialist threads to do the job you use a CAS to "hijack" any thread
> that happens to be passing by at the wrong (or right) place at the
> wrong (or right) time.
>
> Like I said in the first paragraph, you could also use an
> AtomicBoolean to do the hijacking (CAS) but that limits you to one
> thread.
>
> Obviously hijacking isn't the best solution all the time because
> you've pulled a thread away from its normal flow and that flow may be
> time sensitive (ex. responding to an HTTP request). But if you have
> complete control over every thread that can get hijacked you may be
> able to get away w/ it. Your best bet (as always) is slap a profiler
> on the code w/ varying loads after you've determined your throughput
> minimum.
>
> Love, peace, and hair grease,
>
> Dane
>
> On 4/30/06, Alexandru Popescu <[hidden email]> wrote:
> > Hi!
> >
> > I firstly have to confess that when getting to concurrency related
> > problems, I am getting confused quite quickly :-).
> >
> > Now, the current problem I am trying to solve is: I am trying to
> > figure out how to implement a DB write-behind strategy. Multiple
> > processes will post records to be written to the DB, but the actual
> > writes should happen on a separate process. So, far I was thinking
> > about 2 possible approaches:
> > a) continous write-behind: multiple processes write to a queue which
> > is continously polled by a separate process. When an element is found
> > on the queue, than the write process removes it from queue and
> > attempts to write it to the DB.
> >
> > To have this done, I was looking in the direction of ConcurrentLinkedQueue.
> >
> > b) batched write-behind: multiple processes post to a size-bounded
> > queue. When the max size is reached, the original queue is passed to
> > the parallel write process and replaced with a new queue.
> >
> > To have this done, I was looking in the direction of
> > LinkedBlockingQueue with an additional atomic operation of swapping
> > the old queue with the new empty one.
> >
> > My question is: am I looking in the right direction or I am completely
> > wrong. Any ideas and help are highly appreciated.
> >
> > ./alex
> > --
> > .w( the_mindstorm )p.
> >
> > _______________________________________________
> > Concurrency-interest mailing list
> > [hidden email]
> > http://altair.cs.oswego.edu/mailman/listinfo/concurrency-interest
> >
>

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://altair.cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: implementing a DB write-behind algorithm

Joe Bowbeer
Here's a good reference:

http://www-128.ibm.com/developerworks/java/library/j-jtp11234/

Compare and swap is the original name, apparently, though I think of
it as "compare and set", or test and set.

It's an instruction on some machines that could be used to atomically
twiddle bits in memory (including hardware device registers).

On 5/2/06, Alexandru Popescu <[hidden email]> wrote:

> This is gonna be stupid, but I've seen it used on different places
> (sourcecode included), and couldn't find a definition [blushing/]:
> CAS=Compair-and-swap?
>
> ./alex
> --
> .w( the_mindstorm )p.
>
>
> On 5/2/06, studdugie <[hidden email]> wrote:
> > Another option is to use a CLQ w/ an AtomicBoolean or (now that I
> > think about it) an AtomicInteger (AI).
> >
> > The logic goes like this. Thread has database write so it increments
> > AtomicInteger and compares it against guard variable
> > (MAX_CONCURR_WRITERS). If AI <= MAX_CONCURR_WRITERS then enter write
> > loop and drain the CLQ.
> >
> > The code may look something like this.
> >
> > void dbPut(DbData data)
> > {
> >     dbWriteCLQ.offer( data );
> >
> >     try
> >     {
> >         if(AI.incrementAndGet() <= MAX_CONCURR_WRITERS )
> >             while(null !=(data = dbWriteCLQ.poll()))
> >                 doDatabaseWrite( data );
> >     }
> >     finally
> >     {
> >         AI.decrementAndGet();
> >     }
> > }
> >
> > I call it the hijack approach because instead of having one or more
> > specialist threads to do the job you use a CAS to "hijack" any thread
> > that happens to be passing by at the wrong (or right) place at the
> > wrong (or right) time.
> >
> > Like I said in the first paragraph, you could also use an
> > AtomicBoolean to do the hijacking (CAS) but that limits you to one
> > thread.
> >
> > Obviously hijacking isn't the best solution all the time because
> > you've pulled a thread away from its normal flow and that flow may be
> > time sensitive (ex. responding to an HTTP request). But if you have
> > complete control over every thread that can get hijacked you may be
> > able to get away w/ it. Your best bet (as always) is slap a profiler
> > on the code w/ varying loads after you've determined your throughput
> > minimum.
> >
> > Love, peace, and hair grease,
> >
> > Dane
> >
> > On 4/30/06, Alexandru Popescu <[hidden email]> wrote:
> > > Hi!
> > >
> > > I firstly have to confess that when getting to concurrency related
> > > problems, I am getting confused quite quickly :-).
> > >
> > > Now, the current problem I am trying to solve is: I am trying to
> > > figure out how to implement a DB write-behind strategy. Multiple
> > > processes will post records to be written to the DB, but the actual
> > > writes should happen on a separate process. So, far I was thinking
> > > about 2 possible approaches:
> > > a) continous write-behind: multiple processes write to a queue which
> > > is continously polled by a separate process. When an element is found
> > > on the queue, than the write process removes it from queue and
> > > attempts to write it to the DB.
> > >
> > > To have this done, I was looking in the direction of ConcurrentLinkedQueue.
> > >
> > > b) batched write-behind: multiple processes post to a size-bounded
> > > queue. When the max size is reached, the original queue is passed to
> > > the parallel write process and replaced with a new queue.
> > >
> > > To have this done, I was looking in the direction of
> > > LinkedBlockingQueue with an additional atomic operation of swapping
> > > the old queue with the new empty one.
> > >
> > > My question is: am I looking in the right direction or I am completely
> > > wrong. Any ideas and help are highly appreciated.
> > >
> > > ./alex
> > > --
> > > .w( the_mindstorm )p.
> > >

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://altair.cs.oswego.edu/mailman/listinfo/concurrency-interest
Reply | Threaded
Open this post in threaded view
|

Re: implementing a DB write-behind algorithm

studdugie
In reply to this post by Alexandru Popescu ☀
> CAS=Compair-and-swap?
Yes.

Hijacking is nothing new. As a matter of fact you'll be hard pressed
to find anything totally new in computer science. I just liked the
term _hijacking_ as it applies to this specific case plus I didn't see
the technique offered in any of the other responses to your post,
unless I missed it. Apologies if I missed it.

On 5/2/06, Alexandru Popescu <[hidden email]> wrote:

> This is gonna be stupid, but I've seen it used on different places
> (sourcecode included), and couldn't find a definition [blushing/]:
> CAS=Compair-and-swap?
>
> ./alex
> --
> .w( the_mindstorm )p.
>
>
> On 5/2/06, studdugie <[hidden email]> wrote:
> > Another option is to use a CLQ w/ an AtomicBoolean or (now that I
> > think about it) an AtomicInteger (AI).
> >
> > The logic goes like this. Thread has database write so it increments
> > AtomicInteger and compares it against guard variable
> > (MAX_CONCURR_WRITERS). If AI <= MAX_CONCURR_WRITERS then enter write
> > loop and drain the CLQ.
> >
> > The code may look something like this.
> >
> > void dbPut(DbData data)
> > {
> >     dbWriteCLQ.offer( data );
> >
> >     try
> >     {
> >         if(AI.incrementAndGet() <= MAX_CONCURR_WRITERS )
> >             while(null !=(data = dbWriteCLQ.poll()))
> >                 doDatabaseWrite( data );
> >     }
> >     finally
> >     {
> >         AI.decrementAndGet();
> >     }
> > }
> >
> > I call it the hijack approach because instead of having one or more
> > specialist threads to do the job you use a CAS to "hijack" any thread
> > that happens to be passing by at the wrong (or right) place at the
> > wrong (or right) time.
> >
> > Like I said in the first paragraph, you could also use an
> > AtomicBoolean to do the hijacking (CAS) but that limits you to one
> > thread.
> >
> > Obviously hijacking isn't the best solution all the time because
> > you've pulled a thread away from its normal flow and that flow may be
> > time sensitive (ex. responding to an HTTP request). But if you have
> > complete control over every thread that can get hijacked you may be
> > able to get away w/ it. Your best bet (as always) is slap a profiler
> > on the code w/ varying loads after you've determined your throughput
> > minimum.
> >
> > Love, peace, and hair grease,
> >
> > Dane
> >
> > On 4/30/06, Alexandru Popescu <[hidden email]> wrote:
> > > Hi!
> > >
> > > I firstly have to confess that when getting to concurrency related
> > > problems, I am getting confused quite quickly :-).
> > >
> > > Now, the current problem I am trying to solve is: I am trying to
> > > figure out how to implement a DB write-behind strategy. Multiple
> > > processes will post records to be written to the DB, but the actual
> > > writes should happen on a separate process. So, far I was thinking
> > > about 2 possible approaches:
> > > a) continous write-behind: multiple processes write to a queue which
> > > is continously polled by a separate process. When an element is found
> > > on the queue, than the write process removes it from queue and
> > > attempts to write it to the DB.
> > >
> > > To have this done, I was looking in the direction of ConcurrentLinkedQueue.
> > >
> > > b) batched write-behind: multiple processes post to a size-bounded
> > > queue. When the max size is reached, the original queue is passed to
> > > the parallel write process and replaced with a new queue.
> > >
> > > To have this done, I was looking in the direction of
> > > LinkedBlockingQueue with an additional atomic operation of swapping
> > > the old queue with the new empty one.
> > >
> > > My question is: am I looking in the right direction or I am completely
> > > wrong. Any ideas and help are highly appreciated.
> > >
> > > ./alex
> > > --
> > > .w( the_mindstorm )p.
> > >
> > > _______________________________________________
> > > Concurrency-interest mailing list
> > > [hidden email]
> > > http://altair.cs.oswego.edu/mailman/listinfo/concurrency-interest
> > >
> >
>
> _______________________________________________
> Concurrency-interest mailing list
> [hidden email]
> http://altair.cs.oswego.edu/mailman/listinfo/concurrency-interest
>

_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://altair.cs.oswego.edu/mailman/listinfo/concurrency-interest