Jarloo

Menu

Listening to RabbitMQ Events

Note: if your new to RabbitMQ you might want to read the tutorial I’ve posted on using it first. RabbitMQ C# Tutorial

Using the BasicConsumer class in RabbitMQ blocks which isn’t always desirable. If you prefer your code to continue without blocking you should look at the EventingBasicConsumer class.

RabbitMQ has a handy consumer you can use instead of the BasicConsumer called EventingBasicConsumer. This Consumer class exposes a Received Event, so you can bind to your channel and have RabbitMQ give you a nice event when a message is ready.

The main difference between using an EventingBasicConsumer and BasicConsumer is the no blocking occurs. Instead the EventingBasicConsumer exposes a Received Event when a new message is ready.

EventingBasicConsumer consumer = new EventingBasicConsumer();
consumer.Received += (o, e) =>
                            {
                                string data = Encoding.ASCII.GetString(e.Body);
                                Console.WriteLine(data);
                            };

string consumerTag = channel.BasicConsume(queueName, true, consumer);
channel.QueueBind(queueName, EXCHANGE_NAME, "myTopic");

Notice that the EventingBasicConsumer passes arguments with a Body field. This is where the message payload is stored. Other information is also given in the arguments, such as headers, topic name etc…

For completeness sake here is a full Listener class using the EventingBasicConsumer, and a Sender class.

The full code for the Listener:

using System;
using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;

namespace RabbitMQListener
{
    internal class Program
    {
        private static void Main(string[] args)
        {
            Console.WriteLine("Listener");

            const string EXCHANGE_NAME = "EXCHANGE3";
            ConnectionFactory factory = new ConnectionFactory();

            using (IConnection connection = factory.CreateConnection())
            {
                using (IModel channel = connection.CreateModel())
                {
                    channel.ExchangeDeclare(EXCHANGE_NAME, ExchangeType.Topic, false, true, null);

                    string queueName = channel.QueueDeclare();

                    EventingBasicConsumer consumer = new EventingBasicConsumer();
                    consumer.Received += (o, e) =>
                                             {
                                                 string data = Encoding.ASCII.GetString(e.Body);
                                                 Console.WriteLine(data);
                                             };

                    string consumerTag = channel.BasicConsume(queueName, true, consumer);

                    channel.QueueBind(queueName, EXCHANGE_NAME, "myTopic");

                    Console.WriteLine("Listening press ENTER to quit");
                    Console.ReadLine();

                    channel.QueueUnbind(queueName, EXCHANGE_NAME, "myTopic", null);
                }
            }
        }
    }
}

And here is code for a sample sender:

using System;
using System.Text;
using RabbitMQ.Client;

namespace RabbitMQSender
{
    internal class Program
    {
        private static void Main(string[] args)
        {
            Console.WriteLine("Sender");
            Console.WriteLine("Press Enter to send data");
            Console.ReadLine();

            const string EXCHANGE_NAME = "EXCHANGE3";

            ConnectionFactory factory = new ConnectionFactory();

            using (IConnection connection = factory.CreateConnection())
            {
                using (IModel channel = connection.CreateModel())
                {
                    factory.HostName = "localhost";

                    channel.ExchangeDeclare(EXCHANGE_NAME, ExchangeType.Topic, false, true, null);

                    for (int i = 0; i < 1000; i++)
                    {
                        string data = i.ToString();

                        byte[] payload = Encoding.ASCII.GetBytes(data);
                        channel.BasicPublish(EXCHANGE_NAME, "myTopic", null, payload);
                        Console.WriteLine(data);
                    }
                }
            }

            Console.WriteLine("Done Sending");
        }
    }
}

Note that this example uses a topic exchange. This doesn’t matter for this example, so you can make it direct, or fanout if you prefer.

Categories:   Code

Tags:  , , ,

Comments

  • Posted: August 27, 2012 14:57

    Bill Robertson

    Hi Kelly, For the callback: consumer.Received += Method What thread is Method executed on? Is it a general thread pool thread like QueueUserWorkItem. Or does it block something internal to Rabbit SDK? Like does the sdk only execute one call back at a time.
    • Posted: August 27, 2012 15:05

      admin

      Great question, one I had to unfortunately learn about the hard way. It runs on an internal RabbitMQ thread, so I would suggest launching your own thread in that method, or you run the risk of several interesting deadlock scenarios. (ie: do not subscribe or unsubscribe etc in that method or your app will hang)
      • Posted: August 27, 2012 16:17

        Bill Robertson

        Thanks, that gives me an idea. How often did you find yourself unsubscribing from a channel?
      • Posted: August 27, 2012 16:48

        admin

        I'm receiving in an application with many windows. Each window displays different things that are being broadcast. All rabbitmq stuff is centralized in one wrapper class. When a window starts up it registers interest in certain topics with that class. This wrapper class subscribes to rabbitmq. When a window is closed the window unsubscrribes from those topics. That wrapper class checks and if no subscribers need that topic any more it unsubscrribes from rabbitmq.
  • Posted: April 10, 2014 06:43

    Manjay

    Suppose, I queued 10 messages and no listeners are on. In this program if I will make queue durable and start first listener then he will get all 10 queued messages but I want listeners should get messages one by one and if in between other listeners will come then messages should be distributed.
  • Posted: November 18, 2014 19:51

    Abi

    Just the implementation we needed.
  • Posted: February 3, 2015 22:26

    Jitendra

    Any update of Manjay Question? I am also facing same problem.
  • Posted: August 14, 2015 21:03

    scott

    Great post Bill. How do you manage Ack / Nack when using EventingBasicConsumer? Using RabbitMQ.Client 3.5.4.0.
    • Posted: August 19, 2015 11:19

      Kelly Elias

      I've never had to manage that myself. All my messages are essentially fire and forget as they are price quotes such as bids and offers and update too frequently to really require it.