Jarloo

Menu

RabbitMQ C# Tutorial

RabbitMQ is a message queue similiar to ActiveMQ, IBM MQ Series, and Microsoft Message Queue (MSMQ). It’s free, robust, simple to use, and supports most operating systems.

Getting Started

To get started you will need to download the server and it’s dependencies. RabbitMQ has a nice Windows bundle that you can grab : RabbitMQ Windows Bundle . If your using a different operating system you can grab what you need from the RabbitMQ Server download page.

Once downloaded run the installer for ERLang, then install the RabbitMQ Server, then the RabbitMQ DotNet client.

Thats all you need to get things running, so lets jump into the code!

The full source code for this article is available for download, go grab a copy and follow along!

[button link=”http://jarloo.com/downloads/rabbitmqtest.zip” type=”icon”]Download the full source code[/button]

[box type=”info”]

Reference the RabbitMQ Client Library for DotNet

To communicate with RabbitMQ you first need to add a reference to the RabbitMQ.Client.dll library that you installed. (the default location would be: C:Program Files (x86)RabbitMQDotNetClientbinRabbitMQ.Client.dll).

[/box]

 

RabbitMQ Basics

To talk with RabbitMQ you first need to make a connection,  define a channel , declare an exchange.

private const string EXCHANGE_NAME = "helloworld";

ConnectionFactory factory = new ConnectionFactory();
factory.HostName = "localhost";

IConnection connection = factory.CreateConnection();
IModel channel = connection.CreateModel();

channel.ExchangeDeclare(EXCHANGE_NAME, "fanout");

You can see we use localhost as our name for the host. You can do this if RabbitMQ is on the same machine that your code is on. If they are on different machines you would need to put the name of the machine that is running RabbitMQ in the place of localhost.

The ExchangeDeclare is very important to how RabbitMQ works. Your applications will typically all use the same exchange name so they can communicate and share data, but the second parameter is the interesting one. This parameter defines the type of exchange. There are three basic types:

  • fanout – all messages that are published go to every client.
  • direct – the client can define a filter and then only message that match the filter will get delivered to the client.
  • topic – much like direct except you can use wildcards in your topics to fine tune exactly what is delivered to each client.

 

Simple Publish/Subscribe with RabbitMQ and C#

Publishing

To publish you really just need one more command then the basic connection stuff shown above:

payload = Encoding.ASCII.GetBytes("Hello world!");
channel.BasicPublish(EXCHANGE_NAME,"", null, payload);

The full publisher code looks like this:

using System;
using System.Text;
using System.Threading;
using RabbitMQ.Client;

namespace Producer
{
    internal class Program
    {
        private const string EXCHANGE_NAME = "helloworld";

        public static void Main(string[] args)
        {
            ConnectionFactory factory = new ConnectionFactory {HostName = "localhost"};

            using(IConnection connection = factory.CreateConnection())
            {
                using(IModel channel = connection.CreateModel())
                {
                    channel.ExchangeDeclare(EXCHANGE_NAME, "fanout");

                    for (int i = 0; i < 100; i++)
                    {
                        byte[] payload = Encoding.ASCII.GetBytes("hello world");
                        channel.BasicPublish(EXCHANGE_NAME, "", null, payload);

                        Console.WriteLine("Sent Message " + i);
                        Thread.Sleep(1000);
                    }
                }
            }
        }
    }
}

 

Subscribing

To subscribe you also do the same basic connection code, and then you declare a queue where the messages that are to be routed to you get deposited.

channel.ExchangeDeclare(EXCHANGE_NAME, "fanout");

string queueName = channel.QueueDeclare();

channel.QueueBind(queueName, EXCHANGE_NAME, "");

Then you need to create a consumer class to get the messages from the queue.

QueueingBasicConsumer consumer = new QueueingBasicConsumer(channel);
channel.BasicConsume(queueName, true, consumer);

BasicDeliverEventArgs e = (BasicDeliverEventArgs) consumer.Queue.Dequeue();
Console.WriteLine(Encoding.ASCII.GetString(e.Body));

Thats all you need to get the messages.

Here is the full subscribe code:

using System;
using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;

namespace Consumer
{
    public class Program
    {
        private const string EXCHANGE_NAME = "helloworld";

        private static void Main(string[] args)
        {
            ConnectionFactory factory = new ConnectionFactory {HostName = "localhost"};

            using(IConnection connection = factory.CreateConnection())
            {
                using(IModel channel = connection.CreateModel())
                {
                    channel.ExchangeDeclare(EXCHANGE_NAME, "fanout");

                    string queueName = channel.QueueDeclare();

                    channel.QueueBind(queueName, EXCHANGE_NAME, "");

                    Console.WriteLine("Waiting for messages");

                    QueueingBasicConsumer consumer = new QueueingBasicConsumer(channel);
                    channel.BasicConsume(queueName, true, consumer);

                    while (true)
                    {
                        BasicDeliverEventArgs e = (BasicDeliverEventArgs) consumer.Queue.Dequeue();
                        Console.WriteLine(Encoding.ASCII.GetString(e.Body));
                    }
                }
            }
        }
    }
}

 

Publish / Subscribe using Topics

Many times when you publish info your clients may only be interested in a subset. For instance perhaps your sending out stock information. Some clients might be interested in listening to all updates about a range of different stocks, but mabye some just want a specific stock such as AAPL. This is where topics come in. You can define topics when a message is published, then the subscribers can filter the messages based on the topic. The filtering occurs at the server, so the client isn’t notified until there is a message that matches the clients filter.

There are only a few small changes needed to get the example above to use topics.

Both the publisher and subscriber must define the type in the ExchangeDeclare statement to be “topic” instead of fanout, like so:

channel.ExchangeDeclare(EXCHANGE_NAME, "topic");

The publisher must enter the name of the topic for each message when publishing, like so:

channel.BasicPublish(EXCHANGE_NAME, "stock.AAPL", null, payload);

Notice the “stock.AAPL”, previously this was left blank. It’s important to note that RabbitMQ uses a period “.” to denote different levels in a message. Messages typically go from most generic first to most specific. ie: TopicName = “UNSPECIFIC.MORE_SPECIFIC.MOST_SPECIFIC”. So you can go “STOCK.NASDAQ.BID.APPL”. This would be a reasonable description for a bid from nasdaq for Apple stock.

The subscriber gets to filter the messages using the topics. The topic filter is defined in the QueueBind statement like so:

channel.QueueBind(queueName, EXCHANGE_NAME, "stock.AAPL");

So you can see this will only get items published with a topic of “stock.APPL”. If the publisher sends “stock.GOOG”, it will not be delivered into the clients queue.

There are two wildcards are available.

  • * is a subsitute for exactly one word
  • # is a subsitute for zero or more words

So suppose we had these topics being published:
stock.nasdaq.google.bid
stock.nasdaq.appl.bid
stock.nasdaq.appl.offer
stock.dow.appl.offer
stock.nasdaq.google.offer

if you wanted only bids on nasdaq regardless of what stock you could subscribe to stock.nasdaq.*.bid.
if you wanted bids on any exchange for any stock you could do this stock.*.*.bid
if you wanted bids and offers on any exchange you could do this stock.#

Here is the full code for the topic publisher:

using System;
using System.Text;
using System.Threading;
using RabbitMQ.Client;

namespace Producer
{
    internal class Program
    {
        private const string EXCHANGE_NAME = "topicexchange";

        public static void Main(string[] args)
        {
            ConnectionFactory factory = new ConnectionFactory {HostName = "localhost"};

            using(IConnection connection = factory.CreateConnection())
            {
                using(IModel channel = connection.CreateModel())
                {
                    channel.ExchangeDeclare(EXCHANGE_NAME, "topic");

                    for (int i = 0; i < 100; i++)
                    {
                        byte[] payload = Encoding.ASCII.GetBytes("AAPL " + i);
                        channel.BasicPublish(EXCHANGE_NAME, "stock.AAPL", null, payload);

                        payload = Encoding.ASCII.GetBytes("GGOG " + i);
                        channel.BasicPublish(EXCHANGE_NAME, "stock.GOOG", null, payload);

                        Console.WriteLine("Sent Message " + i);
                        Thread.Sleep(1000);
                    }

                }
            }
        }
    }
}

And the full code for the topic subscriber:

using System;
using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;

namespace Consumer
{
    public class Program
    {
        private const string EXCHANGE_NAME = "topicexchange";

        private static void Main(string[] args)
        {
            ConnectionFactory factory = new ConnectionFactory {HostName = "localhost"};

            using(IConnection connection = factory.CreateConnection())
            {
                using(IModel channel = connection.CreateModel())
                {
                    channel.ExchangeDeclare(EXCHANGE_NAME, "topic");

                    string queueName = channel.QueueDeclare();

                    channel.QueueBind(queueName, EXCHANGE_NAME, "stock.AAPL");

                    Console.WriteLine("Waiting for messages");

                    QueueingBasicConsumer consumer = new QueueingBasicConsumer(channel);
                    channel.BasicConsume(queueName, true, consumer);

                    while (true)
                    {
                        BasicDeliverEventArgs e = (BasicDeliverEventArgs) consumer.Queue.Dequeue();
                        Console.WriteLine(Encoding.ASCII.GetString(e.Body));
                    }
                }
            }
        }
    }
}

 

More RabbitMQ Info

This is obviously just scratching the surface of what you can do with RabbitMQ.

I also have a tutorial that shows how to use the EventBasicConsumer to listen to Events instead of blocking. (In the event you didn’t notice the above code does block) The RabbitMQ EventBasicConsumer uses an Received Event instead so you don’t need to worry about the threading yourself.

Categories:   Code

Comments

Sorry, comments are closed for this item.