C# Code, Tutorials and Full Visual Studio Projects

Listening to RabbitMQ Events

Posted by on May 24, 2012 in Code Snippets, Networking | 7 comments

Listening to RabbitMQ Events

Note: if your new to RabbitMQ you might want to read the tutorial I’ve posted on using it first. RabbitMQ C# Tutorial

Using the BasicConsumer class in RabbitMQ blocks which isn’t always desirable. If you prefer your code to continue without blocking you should look at the EventingBasicConsumer class.

RabbitMQ has a handy consumer you can use instead of the BasicConsumer called EventingBasicConsumer. This Consumer class exposes a Received Event, so you can bind to your channel and have RabbitMQ give you a nice event when a message is ready.

The main difference between using an EventingBasicConsumer and BasicConsumer is the no blocking occurs. Instead the EventingBasicConsumer exposes a Received Event when a new message is ready.

EventingBasicConsumer consumer = new EventingBasicConsumer();
consumer.Received += (o, e) =>
                            {
                                string data = Encoding.ASCII.GetString(e.Body);
                                Console.WriteLine(data);
                            };

string consumerTag = channel.BasicConsume(queueName, true, consumer);
channel.QueueBind(queueName, EXCHANGE_NAME, "myTopic");

Notice that the EventingBasicConsumer passes arguments with a Body field. This is where the message payload is stored. Other information is also given in the arguments, such as headers, topic name etc…

For completeness sake here is a full Listener class using the EventingBasicConsumer, and a Sender class.

The full code for the Listener:

using System;
using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;

namespace RabbitMQListener
{
    internal class Program
    {
        private static void Main(string[] args)
        {
            Console.WriteLine("Listener");

            const string EXCHANGE_NAME = "EXCHANGE3";
            ConnectionFactory factory = new ConnectionFactory();

            using (IConnection connection = factory.CreateConnection())
            {
                using (IModel channel = connection.CreateModel())
                {
                    channel.ExchangeDeclare(EXCHANGE_NAME, ExchangeType.Topic, false, true, null);

                    string queueName = channel.QueueDeclare();

                    EventingBasicConsumer consumer = new EventingBasicConsumer();
                    consumer.Received += (o, e) =>
                                             {
                                                 string data = Encoding.ASCII.GetString(e.Body);
                                                 Console.WriteLine(data);
                                             };

                    string consumerTag = channel.BasicConsume(queueName, true, consumer);

                    channel.QueueBind(queueName, EXCHANGE_NAME, "myTopic");

                    Console.WriteLine("Listening press ENTER to quit");
                    Console.ReadLine();

                    channel.QueueUnbind(queueName, EXCHANGE_NAME, "myTopic", null);
                }
            }
        }
    }
}

And here is code for a sample sender:

using System;
using System.Text;
using RabbitMQ.Client;

namespace RabbitMQSender
{
    internal class Program
    {
        private static void Main(string[] args)
        {
            Console.WriteLine("Sender");
            Console.WriteLine("Press Enter to send data");
            Console.ReadLine();

            const string EXCHANGE_NAME = "EXCHANGE3";

            ConnectionFactory factory = new ConnectionFactory();

            using (IConnection connection = factory.CreateConnection())
            {
                using (IModel channel = connection.CreateModel())
                {
                    factory.HostName = "localhost";

                    channel.ExchangeDeclare(EXCHANGE_NAME, ExchangeType.Topic, false, true, null);

                    for (int i = 0; i < 1000; i++)
                    {
                        string data = i.ToString();

                        byte[] payload = Encoding.ASCII.GetBytes(data);
                        channel.BasicPublish(EXCHANGE_NAME, "myTopic", null, payload);
                        Console.WriteLine(data);
                    }
                }
            }

            Console.WriteLine("Done Sending");
        }
    }
}

Note that this example uses a topic exchange. This doesn’t matter for this example, so you can make it direct, or fanout if you prefer.

7 Comments

Join the conversation and post a comment.

  1. Bill Robertson

    Hi Kelly,

    For the callback:

    consumer.Received += Method

    What thread is Method executed on? Is it a general thread pool thread like QueueUserWorkItem. Or does it block something internal to Rabbit SDK? Like does the sdk only execute one call back at a time.

  2. admin

    Great question, one I had to unfortunately learn about the hard way. It runs on an internal RabbitMQ thread, so I would suggest launching your own thread in that method, or you run the risk of several interesting deadlock scenarios. (ie: do not subscribe or unsubscribe etc in that method or your app will hang)

  3. Bill Robertson

    Thanks, that gives me an idea. How often did you find yourself unsubscribing from a channel?

  4. admin

    I’m receiving in an application with many windows. Each window displays different things that are being broadcast. All rabbitmq stuff is centralized in one wrapper class. When a window starts up it registers interest in certain topics with that class. This wrapper class subscribes to rabbitmq. When a window is closed the window unsubscrribes from those topics. That wrapper class checks and if no subscribers need that topic any more it unsubscrribes from rabbitmq.

  5. Manjay

    Suppose, I queued 10 messages and no listeners are on. In this program if I will make queue durable and start first listener then he will get all 10 queued messages but I want listeners should get messages one by one and if in between other listeners will come then messages should be distributed.

  6. Abi

    Just the implementation we needed.

  7. Jitendra

    Any update of Manjay Question? I am also facing same problem.

Leave a Comment

Your email address will not be published. Required fields are marked *

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>