RabbitMQ publisher with C#

RabbitMQ is one of the most popular message broker used to decouple the communication between different application and components.

It has a nice and clear documentation with simply tutorials to understand the capabilities of the middleware; it implements the AMQP protocol and it offer a client for different enviroments and languages, like C#, Java, Python, PHP, Javascript an so on.

In my company there wasn’t a message broker so far and so, after some searches I decided to adopt this awesome system to use it as a communication layer in the applications; in order to use it easily and decoupling the applications from the message broker I implemented a layer with some functionalities that I want to discuss in this post.

The project is organized with two macro arguments, a publisher and a consumer and in this post I’ll start to discuss about the first one.

Interfaces

In order to develop a clean solution I first start to define a bunch of interfaces.

The first thing that I need is an interface for the Rabbit configuration:

public interface IRabbitConfiguration
{
ConnectionFactory ConnectionFactory { get; }
string Exchange { get; }
string Queue { get; }
string RoutingKey { get; }
string Type { get; }
bool Durable { get; }
}

These are the main components that compose the RabbitMQ ecosystem.

The ConnectionFactory is used to define the parameters and establish a connection to the Rabbit server; we’ll define that later.

The exchange is the dispatcher of the messages, the queue are binded to the exchange by the subscribers in order to receive certain messages and the routing keys can be used by the publishers to define certain routes and by the subscribers to receive a subset of messages.

With the type we define how the exchange works (fanout for deliver the message to all the queues binded, direct in order to send to certain queues) and we could define the exchange durable (persistent, even after a server restart) or not.

Anyway the RabbitMQ documentation about the .NET client is very exaustive.

The other interfaces are for the Rabbit publisher:

public interface IRabbitPublisherSetup
{
IRabbitPublisher Setup(RabbitConfiguration rabbitConfiguration);
}

public interface IRabbitPublisher : IRabbitPublisherSetup
{
void Publish(IEnumerable<Payload> payloads);
}

All the publishers will have to implement the IRabbitPublisher with two methods, the first one is the setup with a specific configuration, the second one is the publish, that will accepts a list of messages/payloads.

Notice that the setup returns a IRabbitPublisher interface; with this method that I have discussed in this post, I’m sure that the setup method will be called before the Publish.

Models

The first model is the configuration, that implement the IRabbitConfiguration interface:


public class RabbitConfiguration : IRabbitConfiguration
{
public RabbitConfiguration(ConnectionFactory connectionFactory, string exchange, string routingKey, string type, bool durable, string queue = "")
{
ConnectionFactory = connectionFactory;
Exchange = exchange;
Queue = queue;
RoutingKey = routingKey;
Type = type;
Durable = durable;
}

......
}

[/soucecode]

Pretty simple, now I define the payload model



public class Payload
{
public string Label { get; set; }
public string MessageId { get; set; }
public string CorrelationId { get; set; }
public string ReplyTo { get; set; }
public string Body { get; set; }
}

In the payload model I can have a label, a message id, a correlation id (useful for the request/reply messages), a reply property where specify the queue where the responder will send the response, the body of the message.

The last model is the most important, the implementation of the IRabbitPublisher interface:


public class DefaultRabbitPublisher : IRabbitPublisher
{
private RabbitConfiguration _rabbitConfiguration;

public IRabbitPublisher Setup(RabbitConfiguration rabbitConfiguration)
{
_rabbitConfiguration = rabbitConfiguration;
}

public void Publish(IEnumerable<Payload> payloads)
{
using (var connection = _rabbitConfiguration.ConnectionFactory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.ExchangeDeclare(_rabbitConfiguration.Exchange, _rabbitConfiguration.Type, _rabbitConfiguration.Durable, false, null);

foreach (var payload in payloads)
{
var properties = channel.CreateBasicProperties();
properties.CorrelationId = payload.CorrelationId ?? "";
properties.ReplyTo = payload.ReplyTo ?? "";

var body = Encoding.UTF8.GetBytes(payload.Body);
channel.BasicPublish(_rabbitConfiguration.Exchange, _rabbitConfiguration.RoutingKey, true, properties, body);
}
}
}
}

As you can see the Publish method create the connection and the channel, declare the exchange based on the parameters in the configuration and foreach payload the properties of the message are defined and the message is published.

With the using statement I ensure that at the end of the method the objects will be disposed correcly.

Now I’m ready to implement the service that will use these models.

Service

In order to make the service more flexible I define it as typed:


public class RabbitService<TPublisher> where TPublisher : class, IRabbitPublisher, new()
{
private readonly ConnectionFactory _connectionFactory;

public RabbitService(ConnectionFactory connectionFactory)
{
_connectionFactory = connectionFactory;
}

public void Publish(IEnumerable<Payload> payloads, string exchange, string routingKey = "", string type = "fanout", bool durable = false)
{
var rabbitConfiguration = new RabbitConfiguration(_connectionFactory, exchange, routingKey, type, durable);
var messagePublisher = new TPublisher();
messagePublisher.Setup(rabbitConfiguration).Publish(payloads);
}
}

In this method I have assembled all the models defined above and I can setup the configuration and send the messages fluently.

The last step is register the configuration and the service in a IoC container like Autofac:


public class MessagingCoreModule : Module
{
protected override void Load(ContainerBuilder builder)
{
builder.Register(c => new ConnectionFactory
{
HostName = ConfigurationManager.ConnectionStrings["RabbitMQHostname"].ConnectionString,
RequestedHeartbeat = 10,
AutomaticRecoveryEnabled = true,
NetworkRecoveryInterval = TimeSpan.FromSeconds(10)
}).SingleInstance();

builder.RegisterType<RabbitService<DefaultRabbitPublisher>>().SingleInstance();
}
}

I’ve enabled the automatic detection for the TCP connections failures; it’s a awesome feature of rabbitmq, very helpful to automatically manage and restore connections broked.

In the last row I register the typed RabbitService as single instance.

After that I can use the service for example in a unit test project like this:


var containerBuilder = new ContainerBuilder();
containerBuilder.RegisterModule(new MessagingCoreModule());

using (var container = containerBuilder.Build())
{
var _sut = container.Resolve<RabbitService<DefaultRabbitPublisher>>();
.....
_sut.Publish(payloads, exchange, routingKey, type);
}

You can find the source code here.

 

2 thoughts on “RabbitMQ publisher with C#

Add yours

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

Create a website or blog at WordPress.com

Up ↑

%d bloggers like this: