Jarloo

Menu

Listening to RabbitMQ Events

Note: if your new to RabbitMQ you might want to read the tutorial I’ve posted on using it first. RabbitMQ C# Tutorial

Using the BasicConsumer class in RabbitMQ blocks which isn’t always desirable. If you prefer your code to continue without blocking you should look at the EventingBasicConsumer class.

RabbitMQ has a handy consumer you can use instead of the BasicConsumer called EventingBasicConsumer. This Consumer class exposes a Received Event, so you can bind to your channel and have RabbitMQ give you a nice event when a message is ready.

The main difference between using an EventingBasicConsumer and BasicConsumer is the no blocking occurs. Instead the EventingBasicConsumer exposes a Received Event when a new message is ready.

EventingBasicConsumer consumer = new EventingBasicConsumer();
consumer.Received += (o, e) =>
                            {
                                string data = Encoding.ASCII.GetString(e.Body);
                                Console.WriteLine(data);
                            };

string consumerTag = channel.BasicConsume(queueName, true, consumer);
channel.QueueBind(queueName, EXCHANGE_NAME, "myTopic");

Notice that the EventingBasicConsumer passes arguments with a Body field. This is where the message payload is stored. Other information is also given in the arguments, such as headers, topic name etc…

For completeness sake here is a full Listener class using the EventingBasicConsumer, and a Sender class.

The full code for the Listener:

using System;
using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;

namespace RabbitMQListener
{
    internal class Program
    {
        private static void Main(string[] args)
        {
            Console.WriteLine("Listener");

            const string EXCHANGE_NAME = "EXCHANGE3";
            ConnectionFactory factory = new ConnectionFactory();

            using (IConnection connection = factory.CreateConnection())
            {
                using (IModel channel = connection.CreateModel())
                {
                    channel.ExchangeDeclare(EXCHANGE_NAME, ExchangeType.Topic, false, true, null);

                    string queueName = channel.QueueDeclare();

                    EventingBasicConsumer consumer = new EventingBasicConsumer();
                    consumer.Received += (o, e) =>
                                             {
                                                 string data = Encoding.ASCII.GetString(e.Body);
                                                 Console.WriteLine(data);
                                             };

                    string consumerTag = channel.BasicConsume(queueName, true, consumer);

                    channel.QueueBind(queueName, EXCHANGE_NAME, "myTopic");

                    Console.WriteLine("Listening press ENTER to quit");
                    Console.ReadLine();

                    channel.QueueUnbind(queueName, EXCHANGE_NAME, "myTopic", null);
                }
            }
        }
    }
}

And here is code for a sample sender:

using System;
using System.Text;
using RabbitMQ.Client;

namespace RabbitMQSender
{
    internal class Program
    {
        private static void Main(string[] args)
        {
            Console.WriteLine("Sender");
            Console.WriteLine("Press Enter to send data");
            Console.ReadLine();

            const string EXCHANGE_NAME = "EXCHANGE3";

            ConnectionFactory factory = new ConnectionFactory();

            using (IConnection connection = factory.CreateConnection())
            {
                using (IModel channel = connection.CreateModel())
                {
                    factory.HostName = "localhost";

                    channel.ExchangeDeclare(EXCHANGE_NAME, ExchangeType.Topic, false, true, null);

                    for (int i = 0; i < 1000; i++)
                    {
                        string data = i.ToString();

                        byte[] payload = Encoding.ASCII.GetBytes(data);
                        channel.BasicPublish(EXCHANGE_NAME, "myTopic", null, payload);
                        Console.WriteLine(data);
                    }
                }
            }

            Console.WriteLine("Done Sending");
        }
    }
}

Note that this example uses a topic exchange. This doesn’t matter for this example, so you can make it direct, or fanout if you prefer.

Categories:   Code

Tags:  , , ,

Comments

Sorry, comments are closed for this item.