Reliable RabbitMQ consumer with C#

Message Brokers like RabbitMQ are the most common solution for decouple the communication between two different application and add an intermediary between them.

With this solution there are many benefits, like asynchronous communication, indipendent protocols between the applications, definition of rules for routing messages and in case of RabbitMQ an efficient protocol like AMQP to manage huge amount of messages in short time.

Like all the solutions there are some pros and cons and one of the possible problems is the single point of failure; if we have a single server instance and it shutdown, all the applications involved will be not able to communicate with all the consequences.

Fortunately, RabbitMQ provide a cluster solution in order to have multiple nodes that are replicated with each other in real time; on the RabbitMQ website there is a detailed documentation about how configure a cluster, but is not the topic of this post; instead I’ll talk about how we can adapt our code to manage the communication with a RabbitMQ cluster.

Option1 – Endpoint resolver factory

The RabbitMQ.Client connection factory has an EndpointResolverFactory property that we can set with a function that returns a custom implementation of the interface IEndpointResolver.

So first of all we need to implement this interface:


public class EndPointResolver : IEndpointResolver
{
public IEnumerable<AmqpTcpEndpoint> All()
{
return new List<AmqpTcpEndpoint>()
{
new AmqpTcpEndpoint()
{
HostName = ConfigurationManager.ConnectionStrings["RabbitMQHostname1"].ConnectionString
},
new AmqpTcpEndpoint()
{
HostName = ConfigurationManager.ConnectionStrings["RabbitMQHostname2"].ConnectionString
}
};
}
}

Then in the autofac module we can setup the function that will returns an instance of this class:


public class MessagingCoreModule : Module
{
protected override void Load(ContainerBuilder builder)
{
var connectionFactory = new ConnectionFactory
{
UserName = ConfigurationManager.ConnectionStrings["RabbitMQUsername"].ConnectionString,
Password = ConfigurationManager.ConnectionStrings["RabbitMQPassword"].ConnectionString,
RequestedHeartbeat = 10,
AutomaticRecoveryEnabled = true,
NetworkRecoveryInterval = TimeSpan.FromSeconds(10)
};

connectionFactory.EndpointResolverFactory = (amqpTcpEndpoints) => new EndPointResolver();

builder.Register(c => connectionFactory).SingleInstance();
.....
}
}

Obviously we don’t setup the hostname in the ConnectionFactory, it will be used the EndpointResolverFactory  function to resolve the hostname.

Option2 – Connection parameters

The second option is setup the list of the AmqpTcpEndpoint directly in the inizialization of the connection.

We can create a static class of constants:


public static class Constants
{
.....
public static readonly List<AmqpTcpEndpoint> AmqpTcpEndpoints = new List<AmqpTcpEndpoint>()
{
new AmqpTcpEndpoint()
{
HostName = ConfigurationManager.AppSettings["RabbitMQHostname1"]
},
new AmqpTcpEndpoint()
{
HostName = ConfigurationManager.AppSettings["RabbitMQHostname2"]
}
};
}

And then, in the publisher/consumer classes we use the constant in the CreateConnection method:


public class RabbitPublisher : IRabbitPublisher
{
.....

public void Publish(IEnumerable<Payload> payloads)
{
using (var connection = _rabbitConfiguration.ConnectionFactory.CreateConnection(SinConstants.AmqpTcpEndpoints))
using (var channel = connection.CreateModel())
{
.....
}
}
}

Exception management

Last but not least, in the publisher/consumer classes we need to take care about possible connection failures when one of nodes comes down.

In this phase RabbitMQ deal with the requeue of non-acked messages on the others cluster nodes and .NET RabbitMQ client will automatically switch on one of the other active nodes, but on the client side we could receive IOExceptions for the messages that in this transition they cannot be published or received.

So what we need to do is simply put the BasicPublish and BasicAck operations in a try catch:


channel.ExchangeDeclare(_rabbitConfiguration.Exchange, _rabbitConfiguration.Type, _rabbitConfiguration.Durable, false, null);
var i = 0;
while (i < payloads.Count())
{
.....
try
{
channel.BasicPublish(_rabbitConfiguration.Exchange, _rabbitConfiguration.RoutingKey, true, properties, body);
i++;
}
catch (Exception ex)
{
//Log exception
}
}


_consumer.Received += (model, result) =>
{
try
{
_channel.BasicAck(result.DeliveryTag, false);
messageHandler.Handle(model, result);
}
catch (Exception ex)
{
//Log exception
}
};

You can find the entire project here.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

Create a website or blog at WordPress.com

Up ↑

%d bloggers like this: