RabbitMq acts like a FIFO queue. That is, the data enters the queue in sequence and is sent to the consumers in order. To get started, create a new solution called RabbitMqExample and add the following projects.
- We create an Asp.Net Core Web Application project called RabbiMqExample.Producer which is the same as the Producer.
- An Asp.Net Core Web Application project called RabbitMqExample.Consumer for Consumer.
- A Class .Net Core library project called RabbitMqExample.Common that includes services and models shared between Producer and Consumer.
First in the Common layer we create a class to get RabbitMq information from appsettings.json.
public class RabbitMqConfiguration
{
public string HostName { get; set; }
public string Username { get; set; }
public string Password { get; set; }
}
Then we create a service to communicate with RabbitMq :
public interface IRabbitMqService
{
IConnection CreateChannel();
}
public class RabbitMqService : IRabbitMqService
{
private readonly RabbitMqConfiguration _configuration;
public RabbitMqService(IOptions<RabbitMqConfiguration> options)
{
_configuration = options.Value;
}
public IConnection CreateChannel()
{
ConnectionFactory connection = new ConnectionFactory()
{
UserName = _configuration.Username,
Password = _configuration.Password,
HostName = _configuration.HostName
};
connection.DispatchConsumersAsync = true;
var channel = connection.CreateConnection();
return channel;
}
}
In the CreateChannel method, we enter the information needed to connect to RabbitMq, such as the host, username and password, which are read from appsettings.json. The default value of the username and password is "guest".
If you want your Consumer to receive queue data async, you must set the DispatchConsumersAsync property value for ConnectionFactory to true. The default value is false.
Next we create a class to register services In the Common layer.
public static class StartupExtension
{
public static void AddCommonService(this IServiceCollection services, IConfiguration configuration)
{
services.Configure<RabbitMqConfiguration>(a => configuration.GetSection(nameof(RabbitMqConfiguration)).Bind(a));
services.AddSingleton<IRabbitMqService, RabbitMqService>();
}
}
Packages required for this layer:
<ItemGroup>
<PackageReference Include="Microsoft.Extensions.Configuration.Abstractions" Version="5.0.0" />
<PackageReference Include="Microsoft.Extensions.Configuration.Binder" Version="5.0.0" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="5.0.0" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="5.0.0" />
<PackageReference Include="Microsoft.Extensions.Options" Version="5.0.0" />
<PackageReference Include="RabbitMQ.Client" Version="6.2.1" />
</ItemGroup>
So far, the Common Layer is over, Next we need to create a Consumer and a Producer.
In the Consumer layer to receive data from RabbitMq, We create a service called ConsumerService:
public interface IConsumerService
{
Task ReadMessgaes();
}
public class ConsumerService : IConsumerService, IDisposable
{
private readonly IModel _model;
private readonly IConnection _connection;
public ConsumerService(IRabbitMqService rabbitMqService)
{
_connection = rabbitMqService.CreateChannel();
_model = _connection.CreateModel();
_model.QueueDeclare(_queueName, durable: true, exclusive: false, autoDelete: false);
_model.ExchangeDeclare("UserExchange", ExchangeType.Fanout, durable: true, autoDelete: false);
_model.QueueBind(_queueName, "UserExchange", string.Empty);
}
const string _queueName = "User";
public async Task ReadMessgaes()
{
var consumer = new AsyncEventingBasicConsumer(_model);
consumer.Received += async (ch, ea) =>
{
var body = ea.Body.ToArray();
var text = System.Text.Encoding.UTF8.GetString(body);
Console.WriteLine(text);
await Task.CompletedTask;
_model.BasicAck(ea.DeliveryTag, false);
};
_model.BasicConsume(_queueName, false, consumer);
await Task.CompletedTask;
}
public void Dispose()
{
if (_model.IsOpen)
_model.Close();
if (_connection.IsOpen)
_connection.Close();
}
}
First we have created the connection, By the CreateChannel method that we implemented in the previous service.
After creating the IModel, we need to introduce the corresponding queue, which we did using the QueueDeclare method.
QueueDeclare method parameters:
- The first parameter is the name queue
- The durable parameter specifies whether the data is permanent or not. If true, queue data is stored on disk, But if it is false, they are stored in memory. In environments where the meaning of the information is important, you must set the value of this parameter to true.
- Third parameter: more information
- If the autoDelete parameter is true, the queue is cleared when all Consumers are disconnected from RabbitMq. But if it is true, the queue remains, Even if no Consumer is connected to it.
Next we need to specify the exchange related to the queue. The ExchangeDeclare method creates an Exchange. ExchangeDeclare method parameters:
- Exchange name
- Exchange type which can be Headers, Topic, Fanout or Direct. If it is equal to Fanout, and if the data enters the Exchange, it will send it to all the queues to which it is attached. But if the type is Direct, it sends the data to a specific queue; Using the routeKey parameter.
- The next parameters are durable and autoDelete, which act like the QueueDeclare parameters.
Then, using the QueueBind method, we can bind the created queue to the created exchange. The first parameter is the name of the queue and the second parameter is the name of the exchange, and the third parameter is the routeKey, and since the created Exchange type is of the Fanout type, we leave it blank.
Because we set the value of the ConnectionFactory DispatchConsumersAsync parameter to true when defining the queue, we should use AsyncEventingBasicConsumer here instead of EventingBasicConsumer. If the DispatchConsumersAsync value is false, you must use EventingBasicConsumer to create a Consumer.
Then we need to implement the EventHandler for receiving data from the queue. The Received event runs when data is sent to the queue. When data is sent, it enters the relevant event and we first receive it in bytes. Then we get the string sent by the GetString method and display the sent data on the console screen.
We further inform RabbitMq that the data submitted for the queue was received by the Consumer, Using the BasicAck method. This will send a delivery to RabbitMq to clear the sent data. If we do not call this method, every time the program is run, we retrieve all the previous data and it does not delete the data until we send the delivery to RabbitMq.
The last point in Consumer is the BasicConsume method, which actually introduces the created Consumer to RabbitMq. To receive the data and register the Consumer, we need it to call the ReadMessage method once. That's why we create a HostedService to call this method once:
public class ConsumerHostedService : BackgroundService
{
private readonly IConsumerService _consumerService;
public ConsumerHostedService(IConsumerService consumerService)
{
_consumerService = consumerService;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
await _consumerService.ReadMessgaes();
}
}
Finally, we register the created services; In the Startup Consumer layer :
public class Startup
{
public Startup(IConfiguration configuration)
{
Configuration = configuration;
}
public IConfiguration Configuration { get; set; }
public void ConfigureServices(IServiceCollection services)
{
services.AddCommonService(Configuration);
services.AddSingleton<IConsumerService, ConsumerService>();
services.AddHostedService<ConsumerHostedService>();
}
public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
{
}
}
So far, the Consumer work is over and we have to implement the Producer part.
In the Producer layer we create a controller called RabbitController which contains a method that sends the data to Queue:
[Route("api/[controller]/[action]")]
[ApiController]
public class RabbitController : ControllerBase
{
private readonly IRabbitMqService _rabbitMqService;
public HomeController(IRabbitMqService rabbitMqService)
{
_rabbitMqService = rabbitMqService;
}
[HttpPost]
public IActionResult SendMessage()
{
using var connection = _rabbitMqService.CreateChannel();
using var model = connection.CreateModel();
var body = Encoding.UTF8.GetBytes("Hi");
model.BasicPublish("UserExchange",
string.Empty,
basicProperties: null,
body: body);
return Ok();
}
}
In the SendMessage method, we first connect to RabbitMq and then send the "Hi" data in bytes to RabbitMq, By the BasicPublish method.
The first parameter is the name of Exchange and the second parameter, routeKey and body are the transmitted data. Finally, we register the services related to the Producer layer, In the Startup Consumer layer:
public class Startup
{
public Startup(IConfiguration configuration)
{
Configuration = configuration;
}
public IConfiguration Configuration { get; set; }
public void ConfigureServices(IServiceCollection services)
{
services.AddControllers();
services.AddCommonService(Configuration);
}
public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
{
app.UseRouting();
app.UseEndpoints(endpoints =>
{
endpoints.MapDefaultControllerRoute();
});
}
}
Now if you run both projects and call the SendMessage method for the Rabbit controller, after your message is sent, you will see the sent string on the Consumer console screen.
The appsetting.json file for the Consumer and Producer projects:
{
"RabbitMqConfiguration": {
"HostName": "localhost",
"Username": "guest",
"Password": "guest"
}
}
docker-compose.yml file to run RabbitMq on Docker:
version: "3.2"
services:
rabbitmq:
image: rabbitmq:3-management-alpine
container_name: 'rabbitmq'
ports:
- 5672:5672
- 15672:15672
You can download the source codes of this article from GitHub .
Thank you for your good training
Posted at Tuesday, April 19, 2022Thanks for the excellent article!
Posted at Sunday, December 11, 2022Is it thread safe to use model.BasicPublish() is this example?IModel instance usage by more than one thread simultaneously should be avoided. Application code should maintain a clear notion of thread ownership for IModel instances.This is a hard requirement for publishers: sharing a channel (an IModel instance) for concurrent publishing will lead to incorrect frame interleaving at the protocol level. Channel instances must not be shared by threads that publish on them.If more than one thread needs to access a particular IModel instances, the application should enforce mutual exclusion. One way of achieving this is for all users of an IModel to lock the instance itself:IModel ch = RetrieveSomeSharedIModelInstance();lock (ch) { ch.BasicPublish(...);}Source: https://www.rabbitmq.com/client-libraries/dotnet-api-guide#concurrency
Posted at Monday, April 22, 2024Thank you for your good training
Posted at Saturday, May 18, 2024Useful starter code. I would just like to point out that according to the docs (found at https://www.rabbitmq.com/docs/publishers#lifecycle) "Publishers are often long lived: that is, throughout the lifetime of a publisher it publishes multiple messages. Opening a connection or channel (session) to publish a single message is not optimal.Publishers usually open their connection(s) during application startup. They often would live as long as their connection or even application runs."So The CreateChannel() method should have some logic that open opens a connection and channel once and again if closed for some reason
Posted at Tuesday, August 13, 2024Thanks for your note, I'll update my post
Posted at Friday, August 16, 2024