TransferQueue batch processing

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

TransferQueue batch processing

Carfield Yim

I get code like this as a feed handler:

  private def consume(name: String, callback: (List[Base]) => Array[Array[_]], queue: BlockingQueue[Base]) = {
    val listener = new Thread(new Runnable {
      def run() {
        while (true) {
          var list = new ArrayList[Base]();
          try {
            queue.drainTo(list);
            if (list.size() > 0) {
              var converted = callback(list.toList);
              batchInsert(name, converted);
            } else {
              Thread.sleep(100);
            }
          } catch {
            case e: Exception =>
              logger.error(list.toString(), e);
          }
        }
      }
    }, name);
    listener.start();
  }

  def consume(x: Trade) = {
    tradeQueue.put(x)
  }

It work reasonable good but there are still time that too much update from upstream causing the message blocked and doesn't process fast enough. Recently I come up with this article (http://php.sabscape.com/blog/?p=557) saying that transferqueue may help to reduce the blocking time. However, after I change to use transferqueue and change put() to transfer(), it actually slower the processing time.

I believe the reason is transferqueue.transfer() will face the handler get the message asap, thus queue.drainTo(list) will always draining single element to the list and there won't be batch processing.

Thus, if that mean transfer queue doesn't work for my case? Will it is similar that disruptor also don't target for this kind of batch processing?


_______________________________________________
Concurrency-interest mailing list
[hidden email]
http://cs.oswego.edu/mailman/listinfo/concurrency-interest