Note: if your new to RabbitMQ you might want to read the tutorial I’ve posted on using it first. RabbitMQ C# Tutorial
Using the BasicConsumer class in RabbitMQ blocks which isn’t always desirable. If you prefer your code to continue without blocking you should look at the EventingBasicConsumer class.
RabbitMQ has a handy consumer you can use instead of the BasicConsumer called EventingBasicConsumer. This Consumer class exposes a Received Event, so you can bind to your channel and have RabbitMQ give you a nice event when a message is ready.
The main difference between using an EventingBasicConsumer and BasicConsumer is the no blocking occurs. Instead the EventingBasicConsumer exposes a Received Event when a new message is ready.
EventingBasicConsumer consumer = new EventingBasicConsumer();
consumer.Received += (o, e) =>
{
string data = Encoding.ASCII.GetString(e.Body);
Console.WriteLine(data);
};
string consumerTag = channel.BasicConsume(queueName, true, consumer);
channel.QueueBind(queueName, EXCHANGE_NAME, "myTopic");
Notice that the EventingBasicConsumer passes arguments with a Body field. This is where the message payload is stored. Other information is also given in the arguments, such as headers, topic name etc…
For completeness sake here is a full Listener class using the EventingBasicConsumer, and a Sender class.
The full code for the Listener:
using System;
using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
namespace RabbitMQListener
{
internal class Program
{
private static void Main(string[] args)
{
Console.WriteLine("Listener");
const string EXCHANGE_NAME = "EXCHANGE3";
ConnectionFactory factory = new ConnectionFactory();
using (IConnection connection = factory.CreateConnection())
{
using (IModel channel = connection.CreateModel())
{
channel.ExchangeDeclare(EXCHANGE_NAME, ExchangeType.Topic, false, true, null);
string queueName = channel.QueueDeclare();
EventingBasicConsumer consumer = new EventingBasicConsumer();
consumer.Received += (o, e) =>
{
string data = Encoding.ASCII.GetString(e.Body);
Console.WriteLine(data);
};
string consumerTag = channel.BasicConsume(queueName, true, consumer);
channel.QueueBind(queueName, EXCHANGE_NAME, "myTopic");
Console.WriteLine("Listening press ENTER to quit");
Console.ReadLine();
channel.QueueUnbind(queueName, EXCHANGE_NAME, "myTopic", null);
}
}
}
}
}
And here is code for a sample sender:
using System;
using System.Text;
using RabbitMQ.Client;
namespace RabbitMQSender
{
internal class Program
{
private static void Main(string[] args)
{
Console.WriteLine("Sender");
Console.WriteLine("Press Enter to send data");
Console.ReadLine();
const string EXCHANGE_NAME = "EXCHANGE3";
ConnectionFactory factory = new ConnectionFactory();
using (IConnection connection = factory.CreateConnection())
{
using (IModel channel = connection.CreateModel())
{
factory.HostName = "localhost";
channel.ExchangeDeclare(EXCHANGE_NAME, ExchangeType.Topic, false, true, null);
for (int i = 0; i < 1000; i++)
{
string data = i.ToString();
byte[] payload = Encoding.ASCII.GetBytes(data);
channel.BasicPublish(EXCHANGE_NAME, "myTopic", null, payload);
Console.WriteLine(data);
}
}
}
Console.WriteLine("Done Sending");
}
}
}
Note that this example uses a topic exchange. This doesn’t matter for this example, so you can make it direct, or fanout if you prefer.
4 Comments
Join the conversation and post a comment.



Hi Kelly,
For the callback:
consumer.Received += Method
What thread is Method executed on? Is it a general thread pool thread like QueueUserWorkItem. Or does it block something internal to Rabbit SDK? Like does the sdk only execute one call back at a time.
Great question, one I had to unfortunately learn about the hard way. It runs on an internal RabbitMQ thread, so I would suggest launching your own thread in that method, or you run the risk of several interesting deadlock scenarios. (ie: do not subscribe or unsubscribe etc in that method or your app will hang)
Thanks, that gives me an idea. How often did you find yourself unsubscribing from a channel?
I’m receiving in an application with many windows. Each window displays different things that are being broadcast. All rabbitmq stuff is centralized in one wrapper class. When a window starts up it registers interest in certain topics with that class. This wrapper class subscribes to rabbitmq. When a window is closed the window unsubscrribes from those topics. That wrapper class checks and if no subscribers need that topic any more it unsubscrribes from rabbitmq.