C# Code, Tutorials and Full Visual Studio Projects

Listening to RabbitMQ Events

Posted by on May 24, 2012 in Code Snippets, Networking | 5 comments

Listening to RabbitMQ Events

Note: if your new to RabbitMQ you might want to read the tutorial I’ve posted on using it first. RabbitMQ C# Tutorial

Using the BasicConsumer class in RabbitMQ blocks which isn’t always desirable. If you prefer your code to continue without blocking you should look at the EventingBasicConsumer class.

RabbitMQ has a handy consumer you can use instead of the BasicConsumer called EventingBasicConsumer. This Consumer class exposes a Received Event, so you can bind to your channel and have RabbitMQ give you a nice event when a message is ready.

The main difference between using an EventingBasicConsumer and BasicConsumer is the no blocking occurs. Instead the EventingBasicConsumer exposes a Received Event when a new message is ready.

EventingBasicConsumer consumer = new EventingBasicConsumer();
consumer.Received += (o, e) =>
                            {
                                string data = Encoding.ASCII.GetString(e.Body);
                                Console.WriteLine(data);
                            };

string consumerTag = channel.BasicConsume(queueName, true, consumer);
channel.QueueBind(queueName, EXCHANGE_NAME, "myTopic");

Notice that the EventingBasicConsumer passes arguments with a Body field. This is where the message payload is stored. Other information is also given in the arguments, such as headers, topic name etc…

For completeness sake here is a full Listener class using the EventingBasicConsumer, and a Sender class.

The full code for the Listener:

using System;
using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;

namespace RabbitMQListener
{
    internal class Program
    {
        private static void Main(string[] args)
        {
            Console.WriteLine("Listener");

            const string EXCHANGE_NAME = "EXCHANGE3";
            ConnectionFactory factory = new ConnectionFactory();

            using (IConnection connection = factory.CreateConnection())
            {
                using (IModel channel = connection.CreateModel())
                {
                    channel.ExchangeDeclare(EXCHANGE_NAME, ExchangeType.Topic, false, true, null);

                    string queueName = channel.QueueDeclare();

                    EventingBasicConsumer consumer = new EventingBasicConsumer();
                    consumer.Received += (o, e) =>
                                             {
                                                 string data = Encoding.ASCII.GetString(e.Body);
                                                 Console.WriteLine(data);
                                             };

                    string consumerTag = channel.BasicConsume(queueName, true, consumer);

                    channel.QueueBind(queueName, EXCHANGE_NAME, "myTopic");

                    Console.WriteLine("Listening press ENTER to quit");
                    Console.ReadLine();

                    channel.QueueUnbind(queueName, EXCHANGE_NAME, "myTopic", null);
                }
            }
        }
    }
}

And here is code for a sample sender:

using System;
using System.Text;
using RabbitMQ.Client;

namespace RabbitMQSender
{
    internal class Program
    {
        private static void Main(string[] args)
        {
            Console.WriteLine("Sender");
            Console.WriteLine("Press Enter to send data");
            Console.ReadLine();

            const string EXCHANGE_NAME = "EXCHANGE3";

            ConnectionFactory factory = new ConnectionFactory();

            using (IConnection connection = factory.CreateConnection())
            {
                using (IModel channel = connection.CreateModel())
                {
                    factory.HostName = "localhost";

                    channel.ExchangeDeclare(EXCHANGE_NAME, ExchangeType.Topic, false, true, null);

                    for (int i = 0; i < 1000; i++)
                    {
                        string data = i.ToString();

                        byte[] payload = Encoding.ASCII.GetBytes(data);
                        channel.BasicPublish(EXCHANGE_NAME, "myTopic", null, payload);
                        Console.WriteLine(data);
                    }
                }
            }

            Console.WriteLine("Done Sending");
        }
    }
}

Note that this example uses a topic exchange. This doesn’t matter for this example, so you can make it direct, or fanout if you prefer.

5 Comments

Join the conversation and post a comment.

  1. Bill Robertson

    Hi Kelly,

    For the callback:

    consumer.Received += Method

    What thread is Method executed on? Is it a general thread pool thread like QueueUserWorkItem. Or does it block something internal to Rabbit SDK? Like does the sdk only execute one call back at a time.

  2. admin

    Great question, one I had to unfortunately learn about the hard way. It runs on an internal RabbitMQ thread, so I would suggest launching your own thread in that method, or you run the risk of several interesting deadlock scenarios. (ie: do not subscribe or unsubscribe etc in that method or your app will hang)

  3. Bill Robertson

    Thanks, that gives me an idea. How often did you find yourself unsubscribing from a channel?

  4. admin

    I’m receiving in an application with many windows. Each window displays different things that are being broadcast. All rabbitmq stuff is centralized in one wrapper class. When a window starts up it registers interest in certain topics with that class. This wrapper class subscribes to rabbitmq. When a window is closed the window unsubscrribes from those topics. That wrapper class checks and if no subscribers need that topic any more it unsubscrribes from rabbitmq.

  5. Manjay

    Suppose, I queued 10 messages and no listeners are on. In this program if I will make queue durable and start first listener then he will get all 10 queued messages but I want listeners should get messages one by one and if in between other listeners will come then messages should be distributed.

Leave a Comment

Your email address will not be published. Required fields are marked *

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>