RabbitMQ consumer with C#

In the previous post I talked about a RabbitMQ publisher and how we can implement it with C#.

The next step is implement a consumer and we’ll see how we can do that efficiently; unlike the publisher, the consumer has more implication and as we’ll see, we can consume messages from RabbitMQ in different ways.

The final step will be use the consumer in a higher level service that exposes methods to consume messages.

Interfaces

As we done for the publisher, we need to define a bunch of interfaces:


public interface IRabbitMessageHandler
{
List<Payload> Payloads { get; set; }
void Handle(BasicGetResult result);
void Handle(object model, BasicDeliverEventArgs result);
}

public interface IRabbitConsumerSetup : IDisposable
{
IRabbitConsumer Setup(RabbitConfiguration rabbitConfiguration);
}

public interface IRabbitConsumer : IRabbitConsumerSetup
{
void Get(IRabbitMessageHandler messageHandler);
void Consume(IRabbitMessageHandler messageHandler);
}

I provide an handler that deal with the management of the messages received from RabbitMQ.

Every implementation of the message handler will have a list of payload received from the server and two different methods to retrieves the messages, the first one with a BasicGet operation and the second one with a subscription.

The behaviours of these two methods are completely different; with a get the consumer has the responsability to check if there are new messages on the server and retreive it; unlike, if I’m subscribed to a queue, the RabbitMQ server leveraging the AMQP protocol to push the new messages to the consumers subscribed; obviously the second method is more efficient and highly recommended practice in the RabbitMQ documentation.

All the consumers will have to implements these methods, the setup method to define the consumer parameters and the cosume method to register the consumer with a specific handler.

Models

The implementation of the message handler looks like this:


public class RabbitMessageHandler : IRabbitMessageHandler
{
public List<Payload> Payloads { get; set; } = new List<Payload>();

public void Handle(BasicGetResult result)
{
Payloads.Add(new Payload()
{
Body = Encoding.UTF8.GetString(result.Body),
MessageId = result.BasicProperties.MessageId,
CorrelationId = result.BasicProperties.CorrelationId,
ReplyTo = result.BasicProperties.ReplyTo
});
}

public void Handle(object model, BasicDeliverEventArgs result)
{
Payloads.Add(new Payload()
{
Body = Encoding.UTF8.GetString(result.Body),
MessageId = result.BasicProperties.MessageId,
CorrelationId = result.BasicProperties.CorrelationId,
ReplyTo = result.BasicProperties.ReplyTo
});
}
}

The consumer has two different methods for handle a message that arrives from a get (BasicGetResult) and a message that arrives from an subscription (BasicDeliverEventArgs).

The operation could be the same, that is add a new payload object to the list of payloads.

Now we can use the handler in the consumer:


public class DefaultRabbitConsumer : IRabbitConsumer
{
private RabbitConfiguration _rabbitConfiguration;
private IConnection _connection;
private IModel _channel;
private EventingBasicConsumer _consumer;

public IRabbitConsumer Setup(RabbitConfiguration rabbitConfiguration)
{
_rabbitConfiguration = rabbitConfiguration;
_connection = _rabbitConfiguration.ConnectionFactory.CreateConnection();
_channel = _connection.CreateModel();
_channel.ExchangeDeclare(_rabbitConfiguration.Exchange, _rabbitConfiguration.Type, _rabbitConfiguration.Durable, false);
_channel.QueueDeclare(_rabbitConfiguration.Queue, _rabbitConfiguration.Durable, false, false, null);
_channel.QueueBind(_rabbitConfiguration.Queue, _rabbitConfiguration.Exchange, _rabbitConfiguration.RoutingKey);
return this;
}

public void Get(IRabbitMessageHandler messageHandler)
{
if (_rabbitConfiguration == null)
throw new ApplicationException("Rabbit configuration is missing.");

var result = _channel.BasicGet(_rabbitConfiguration.Queue, true);
while (result != null)
{
messageHandler.Handle(result);
result = _channel.BasicGet(_rabbitConfiguration.Queue, true);
}
}

public void Consume(IRabbitMessageHandler messageHandler)
{
if (_rabbitConfiguration == null)
throw new ApplicationException("Rabbit configuration is missing.");

if (messageHandler == null)
throw new ArgumentNullException(nameof(messageHandler));

_consumer = new EventingBasicConsumer(_channel);
_consumer.Received += (model, result) =>
{
messageHandler.Handle(model, result);
_channel.BasicAck(result.DeliveryTag, false);
};

_channel.BasicConsume(_rabbitConfiguration.Queue, false, _consumer);
}

public void Dispose()
{
if (_channel?.IsOpen == true && _consumer != null)
{
_channel.BasicCancel(_consumer.ConsumerTag);
}

_channel?.Dispose();
_connection?.Dispose();
}
}

The get method retrieves all the messages from the queue, until is empty; is the less efficient method that we can use to get a message and no recommend for RabbitMQ, but we can use it in some particular cases when necessary.

The second method consume messages from a queue with a specific handler; this is the most efficient way in terms of performances because leverage the AMQP protocol and the deliver protocol of RabbitMQ server.

We also implement the dispose method exposed to caller in order to dispose the objects involved in the connection.

Service

In the RabbitService implemented in the previous post we can add these methods:


public class RabbitService<TPublisher, TConsumer> where TPublisher : class, IRabbitPublisherSetup, new() where TConsumer : class, IRabbitConsumerSetup, new()
{
private readonly ConnectionFactory _connectionFactory;
private readonly ConcurrentDictionary<Guid, TConsumer> _consumers = new ConcurrentDictionary<Guid, TConsumer>();

public RabbitService(ConnectionFactory connectionFactory)
{
_connectionFactory = connectionFactory;
}

.....

public void Get(string exchange, string queue, IRabbitMessageHandler messageHandler, string routingKey = "", string type = "fanout", bool durable = false)
{
var rabbitConfiguration = new RabbitConfiguration(_connectionFactory, exchange, routingKey, type, durable, queue);
using (var messageConsumer = new TConsumer())
{
messageConsumer.Setup(rabbitConfiguration).Get(messageHandler);
}
}

public Guid Subscribe(string exchange, string queue, IRabbitMessageHandler messageHandler, string routingKey = "", string type = "fanout", bool durable = false)
{
var rabbitConfiguration = new RabbitConfiguration(_connectionFactory, exchange, routingKey, type, durable, queue);
var consumer = new TConsumer();
var consumerId = Guid.NewGuid();
_consumers.TryAdd(consumerId, consumer);
consumer.Setup(rabbitConfiguration).Consume(messageHandler);

return consumerId;
}

public void Unsubscribe(Guid consumerId)
{
var consumer = _consumers[consumerId];
consumer.Dispose();
_consumers.TryRemove(consumerId, out TConsumer _);
}
}

The Get and Subscribe methods leverage the RabbitConsumer implemented with a fluent syntax.

As the service is singleton, I provide a ConcurrentDictionary to stores all the consumers instantiated; in this way I can implement a Unsubscribe method that could be used to unregister a specific consumer.

Now we can change the registration of the service in the Autofac module like this:

public class MessagingCoreModule : Module 
{ 
protected override void Load(ContainerBuilder builder) 
{
.....
builder.RegisterType<RabbitService<DefaultRabbitPublisher, DefaultRabbitConsumer>>().SingleInstance(); 
} 
}

You can find the project here.

 

 

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google+ photo

You are commenting using your Google+ account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

w

Connecting to %s