C# Code, Tutorials and Full Visual Studio Projects

RabbitMQ C# Tutorial

Posted by on May 14, 2011 in Featured, Networking, Tutorials | 8 comments

RabbitMQ C# Tutorial

RabbitMQ is a message queue similiar to ActiveMQ, IBM MQ Series, and Microsoft Message Queue (MSMQ). It’s free, robust, simple to use, and supports most operating systems.



Getting Started

To get started you will need to download the server and it’s dependencies. RabbitMQ has a nice Windows bundle that you can grab : RabbitMQ Windows Bundle . If your using a different operating system you can grab what you need from the RabbitMQ Server download page.

Once downloaded run the installer for ERLang, then install the RabbitMQ Server, then the RabbitMQ DotNet client.

Thats all you need to get things running, so lets jump into the code!

The full source code for this article is available for download, go grab a copy and follow along!

Download the full source code



Reference the RabbitMQ Client Library for DotNet

To communicate with RabbitMQ you first need to add a reference to the RabbitMQ.Client.dll library that you installed. (the default location would be: C:Program Files (x86)RabbitMQDotNetClientbinRabbitMQ.Client.dll).

 

RabbitMQ Basics

To talk with RabbitMQ you first need to make a connection,  define a channel , declare an exchange.

private const string EXCHANGE_NAME = "helloworld";

ConnectionFactory factory = new ConnectionFactory();
factory.HostName = "localhost";

IConnection connection = factory.CreateConnection();
IModel channel = connection.CreateModel();

channel.ExchangeDeclare(EXCHANGE_NAME, "fanout");

You can see we use localhost as our name for the host. You can do this if RabbitMQ is on the same machine that your code is on. If they are on different machines you would need to put the name of the machine that is running RabbitMQ in the place of localhost.

The ExchangeDeclare is very important to how RabbitMQ works. Your applications will typically all use the same exchange name so they can communicate and share data, but the second parameter is the interesting one. This parameter defines the type of exchange. There are three basic types:

  • fanout – all messages that are published go to every client.
  • direct – the client can define a filter and then only message that match the filter will get delivered to the client.
  • topic – much like direct except you can use wildcards in your topics to fine tune exactly what is delivered to each client.


Simple Publish/Subscribe with RabbitMQ and C#

Publishing

To publish you really just need one more command then the basic connection stuff shown above:

payload = Encoding.ASCII.GetBytes("Hello world!");
channel.BasicPublish(EXCHANGE_NAME,"", null, payload);

The full publisher code looks like this:

using System;
using System.Text;
using System.Threading;
using RabbitMQ.Client;

namespace Producer
{
    internal class Program
    {
        private const string EXCHANGE_NAME = "helloworld";

        public static void Main(string[] args)
        {
            ConnectionFactory factory = new ConnectionFactory {HostName = "localhost"};

            using(IConnection connection = factory.CreateConnection())
            {
                using(IModel channel = connection.CreateModel())
                {
                    channel.ExchangeDeclare(EXCHANGE_NAME, "fanout");

                    for (int i = 0; i < 100; i++)
                    {
                        byte[] payload = Encoding.ASCII.GetBytes("hello world");
                        channel.BasicPublish(EXCHANGE_NAME, "", null, payload);

                        Console.WriteLine("Sent Message " + i);
                        Thread.Sleep(1000);
                    }
                }
            }
        }
    }
}


Subscribing

To subscribe you also do the same basic connection code, and then you declare a queue where the messages that are to be routed to you get deposited.

channel.ExchangeDeclare(EXCHANGE_NAME, "fanout");

string queueName = channel.QueueDeclare();

channel.QueueBind(queueName, EXCHANGE_NAME, "");

Then you need to create a consumer class to get the messages from the queue.

QueueingBasicConsumer consumer = new QueueingBasicConsumer(channel);
channel.BasicConsume(queueName, true, consumer);

BasicDeliverEventArgs e = (BasicDeliverEventArgs) consumer.Queue.Dequeue();
Console.WriteLine(Encoding.ASCII.GetString(e.Body));

Thats all you need to get the messages.

Here is the full subscribe code:

using System;
using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;

namespace Consumer
{
    public class Program
    {
        private const string EXCHANGE_NAME = "helloworld";

        private static void Main(string[] args)
        {
            ConnectionFactory factory = new ConnectionFactory {HostName = "localhost"};

            using(IConnection connection = factory.CreateConnection())
            {
                using(IModel channel = connection.CreateModel())
                {
                    channel.ExchangeDeclare(EXCHANGE_NAME, "fanout");

                    string queueName = channel.QueueDeclare();

                    channel.QueueBind(queueName, EXCHANGE_NAME, "");

                    Console.WriteLine("Waiting for messages");

                    QueueingBasicConsumer consumer = new QueueingBasicConsumer(channel);
                    channel.BasicConsume(queueName, true, consumer);

                    while (true)
                    {
                        BasicDeliverEventArgs e = (BasicDeliverEventArgs) consumer.Queue.Dequeue();
                        Console.WriteLine(Encoding.ASCII.GetString(e.Body));
                    }
                }
            }
        }
    }
}


Publish / Subscribe using Topics

Many times when you publish info your clients may only be interested in a subset. For instance perhaps your sending out stock information. Some clients might be interested in listening to all updates about a range of different stocks, but mabye some just want a specific stock such as AAPL. This is where topics come in. You can define topics when a message is published, then the subscribers can filter the messages based on the topic. The filtering occurs at the server, so the client isn’t notified until there is a message that matches the clients filter.

There are only a few small changes needed to get the example above to use topics.

Both the publisher and subscriber must define the type in the ExchangeDeclare statement to be “topic” instead of fanout, like so:

channel.ExchangeDeclare(EXCHANGE_NAME, "topic");

The publisher must enter the name of the topic for each message when publishing, like so:

channel.BasicPublish(EXCHANGE_NAME, "stock.AAPL", null, payload);

Notice the “stock.AAPL”, previously this was left blank. It’s important to note that RabbitMQ uses a period “.” to denote different levels in a message. Messages typically go from most generic first to most specific. ie: TopicName = “UNSPECIFIC.MORE_SPECIFIC.MOST_SPECIFIC”. So you can go “STOCK.NASDAQ.BID.APPL”. This would be a reasonable description for a bid from nasdaq for Apple stock.

The subscriber gets to filter the messages using the topics. The topic filter is defined in the QueueBind statement like so:

channel.QueueBind(queueName, EXCHANGE_NAME, "stock.AAPL");

So you can see this will only get items published with a topic of “stock.APPL”. If the publisher sends “stock.GOOG”, it will not be delivered into the clients queue.

There are two wildcards are available.

  • * is a subsitute for exactly one word
  • # is a subsitute for zero or more words

So suppose we had these topics being published:
stock.nasdaq.google.bid
stock.nasdaq.appl.bid
stock.nasdaq.appl.offer
stock.dow.appl.offer
stock.nasdaq.google.offer

if you wanted only bids on nasdaq regardless of what stock you could subscribe to stock.nasdaq.*.bid.
if you wanted bids on any exchange for any stock you could do this stock.*.*.bid
if you wanted bids and offers on any exchange you could do this stock.#

Here is the full code for the topic publisher:

using System;
using System.Text;
using System.Threading;
using RabbitMQ.Client;

namespace Producer
{
    internal class Program
    {
        private const string EXCHANGE_NAME = "topicexchange";

        public static void Main(string[] args)
        {
            ConnectionFactory factory = new ConnectionFactory {HostName = "localhost"};

            using(IConnection connection = factory.CreateConnection())
            {
                using(IModel channel = connection.CreateModel())
                {
                    channel.ExchangeDeclare(EXCHANGE_NAME, "topic");

                    for (int i = 0; i < 100; i++)
                    {
                        byte[] payload = Encoding.ASCII.GetBytes("AAPL " + i);
                        channel.BasicPublish(EXCHANGE_NAME, "stock.AAPL", null, payload);

                        payload = Encoding.ASCII.GetBytes("GGOG " + i);
                        channel.BasicPublish(EXCHANGE_NAME, "stock.GOOG", null, payload);

                        Console.WriteLine("Sent Message " + i);
                        Thread.Sleep(1000);
                    }

                }
            }
        }
    }
}

And the full code for the topic subscriber:

using System;
using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;

namespace Consumer
{
    public class Program
    {
        private const string EXCHANGE_NAME = "topicexchange";

        private static void Main(string[] args)
        {
            ConnectionFactory factory = new ConnectionFactory {HostName = "localhost"};

            using(IConnection connection = factory.CreateConnection())
            {
                using(IModel channel = connection.CreateModel())
                {
                    channel.ExchangeDeclare(EXCHANGE_NAME, "topic");

                    string queueName = channel.QueueDeclare();

                    channel.QueueBind(queueName, EXCHANGE_NAME, "stock.AAPL");

                    Console.WriteLine("Waiting for messages");

                    QueueingBasicConsumer consumer = new QueueingBasicConsumer(channel);
                    channel.BasicConsume(queueName, true, consumer);

                    while (true)
                    {
                        BasicDeliverEventArgs e = (BasicDeliverEventArgs) consumer.Queue.Dequeue();
                        Console.WriteLine(Encoding.ASCII.GetString(e.Body));
                    }
                }
            }
        }
    }
}


More RabbitMQ Info

This is obviously just scratching the surface of what you can do with RabbitMQ.

I also have a tutorial that shows how to use the EventBasicConsumer to listen to Events instead of blocking. (In the event you didn’t notice the above code does block) The RabbitMQ EventBasicConsumer uses an Received Event instead so you don’t need to worry about the threading yourself.

8 Comments

Join the conversation and post a comment.

  1. Daniel

    Hi

    The download of the full source code does not work. Is it possible to fix it?

    Thanks

  2. admin

    Thanks for letting me know! The download has been fixed.

  3. Gaurav

    please can u tell me code in wcf ?
    & using exchange & it is a website in which i am going to use it …
    please tell me …

  4. khan

    hi i want to make an group messaging service with website . an example site is http://www.smsall.pk kindly do check it and tell if this is possible. using my gsm modem/phone connected to pc. for sms sending and recieving

  5. admin

    I posted a link to the WCF bindings for RabbitMQ, but I don’t have code that uses it as I opted to use the native API and not WCF.

  6. Sukhjeet

    I am still getting a exception “None of the specified endpoints were reachable”. Can you please help me regarding this. I will be thankful to you.How can I solve this error/Exception.

  7. kk

    can you suggest how to implement Data Access Layer in this projects. like, can we have another layer to manage db or subscriber is good enuf to manage??

    thanks in advance….

  8. Kelly Elias

    It really depends on the nature of your solution. Most people end up going for a service orientated architecture (SOA).

    Either you have each app in the architecture handle there own DB or you make a service that does it for them; such a service would just hang off rabbitmq (your service bus) and watch the traffic and write to the db as needed. If you like it can also perform queries and publish the results on the bus.

    Most of the time I see apps that are hybrid approaches since sending all your SQL traffic over the bus is slow. It’s best to keep in mind whom the audience is when deciding. If only one app or instance gets a benefit then do it locally, if multiple apps or instances benefit then do it as a service. What I see quite a bit in financial circles is local apps reading from the db to get historical info, then listening to the bus for realtime info. Apps that gather the realtime data and broadcast on the bus don’t save to the db. Instead a service is added that listens to the traffic just like a client and it’s responsibility is to persist the messages to the db, cache them for quick retrieval etc.

Leave a Comment

Your email address will not be published. Required fields are marked *

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>