How to process messages in parallel using EasyNetQ

The IBus interface from EasyNetQ framework has SubscribeAsync() method which allows easily to span message handling between different threads. It uses standard TPL Tasks for delegating handler execution. Depending on the application, you may create Tasks with LongRunning flag, which provides a hint to the scheduler that additional thread may be required1 and it should avoid using a ThreadPool2. Below example shows how to register parallel consumer:

var bus = RabbitHutch.CreateBus("host=localhost");

bus.SubscribeAsync("sub1", message => Task.Factory.StartNew(() =>
{
// long running message handler
System.Threading.Thread.Sleep(TimeSpan.FromSeconds(5));
}, TaskCreationOptions.LongRunning));

What if there are thousands messages in the queue

Of course we do not want a situation where the application is flooded with messages and consumes all the resources. We want to be in control and be able to specify how many concurrent tasks to run. Fortunately it is very easy to do.

Prefetch count

RabbitMQ allows clients to specify prefetch count. It is a value which determines how many messages are sent to the client and cached by RabbitMQ client library. We can use this value to control number of concurrent tasks. The prefetch count can be set in EasyNetQ through bus’ connection string. The default value is 50 and the maximum value is 65535 (UInt16.MaxValue).

Below example shows how to set number of concurrent tasks to 15 by using prefetchcount:

var bus = RabbitHutch.CreateBus("host=localhost;prefetchcount=15");

bus.SubscribeAsync("sub1", message => Task.Factory.StartNew(() =>
{
// long running message handler
System.Threading.Thread.Sleep(TimeSpan.FromSeconds(5));
}, TaskCreationOptions.LongRunning));

Notice the prefetchcount setting in line 1 and LongRunning hint in line 7.

Although this solution will work fine with long running handlers where the time to deliver messages is negligible compared to the handling time, it is not good in cases where handling time is small and pre fetching messages may increase general performance of the system.

TaskScheduler

Fortunately there is natural way to control how TPL dispatches work to threads. The task scheduler class is responsible for queuing tasks onto thread. By using a specially crafted task scheduler we can control number of concurrent threads used for handling messages. Microsoft even has a How To article showing LimitedConcurrencyLevelTaskScheduler which does exactly that. Using task scheduler provided in that article we can easily set up concurrency level to 10 and pre fetch count to 15:

var bus = RabbitHutch.CreateBus("prefetchcount=15;host=localhost");

LimitedConcurrencyLevelTaskScheduler lcts = new LimitedConcurrencyLevelTaskScheduler(10);
TaskFactory factory = new TaskFactory(lcts);

bus.SubscribeAsync("sub1", message => factory.StartNew(() =>
{
// long running message handler
System.Threading.Thread.Sleep(TimeSpan.FromSeconds(5));
}, TaskCreationOptions.LongRunning));

  1. http://msdn.microsoft.com/en-us/library/dd997402.aspx 
  2. Actually in .NET framework 4 it always creates new Thread
Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s