在 c# 中使用依赖注入的 RabbitMQ 工作队列
Posted
技术标签:
【中文标题】在 c# 中使用依赖注入的 RabbitMQ 工作队列【英文标题】:RabbitMQ work queues using Dependency Injection in c# 【发布时间】:2020-12-07 01:30:40 【问题描述】:我在“工作队列”场景中使用 rabbitmq。
我需要例如。由 5 个消费者组成的池(每个消费者都有自己的通道),因此一个消费者进行 I/O 操作,不会阻塞同一队列的其他消费者。
例如。 如果我的队列中有: 消息 1、消息 2、消息 3、消息 4。(FistConsumerHandler) 的每个实例将使用轮询从队列中获取 1 条消息(默认 rabbitmq 行为)
我面临的问题是我需要使用依赖注入来做到这一点。
这是我目前所拥有的:
在 Windows 服务启动时(我的消费者托管在 Windows 服务中):
protected override void OnStart(string[] args)
BuildConnections();
// Register the consumers. For simplicity only showing FirstConsumerHandler.
AddConsumerHandlers<FistConsumerHandler>(ConstantesProcesos.Exchange, ConstantesProcesos.QueueForFirstHandler);
BuildStartup();
var logger = GetLogger<ServicioProcesos>();
logger.LogInformation("Windows Service Started");
Console.WriteLine("Press [enter] to exit.");
protected virtual void BuildConnections(
string notificationHubPath = "notificationhub_path",
string rabbitMQHostname = "rabbitmq_hostname",
string rabbitMQPort = "rabbitmq_port",
string rabbitMQUserName = "rabbitmq_username",
string rabbitMQPassword = "rabbitmq_password")
ContextHelpers.Setup(ConfigurationManager.ConnectionStrings[appContextConnectionString].ConnectionString);
if (_connection == null)
var factory = new ConnectionFactory
HostName = ConfigurationManager.AppSettings[rabbitMQHostname],
Port = int.Parse(ConfigurationManager.AppSettings[rabbitMQPort]),
UserName = ConfigurationManager.AppSettings[rabbitMQUserName],
Password = ConfigurationManager.AppSettings[rabbitMQPassword],
DispatchConsumersAsync = true,
;
// Create a connection
do
try
_connection = factory.CreateConnection();
catch (RabbitMQ.Client.Exceptions.BrokerUnreachableException e)
Thread.Sleep(5000);
while (_connection == null);
_startupBuilder = new StartupBuilder(_connection);
protected void AddConsumerHandlers<THandler>(string exchange, string queue)
var consumerHandlerItem = new ConsumerHandlerItem
ConsumerType = typeof(THandler),
Exchange = exchange,
Queue = queue
;
_startupBuilder._consumerHandlerItems.Add(consumerHandlerItem);
protected void BuildStartup()
ServiceProvider = _startupBuilder.Build();
启动生成器:
using Microsoft.Extensions.DependencyInjection;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
public class StartupBuilder
private static IConnection _connection;
private IModel _channel;
public List<ConsumerHandlerItem> _consumerHandlerItems;
public IServiceCollection Services get; private set;
public StartupBuilder(IConnection connection)
_connection = connection;
_consumerHandlerItems = new List<ConsumerHandlerItem>();
Services = new ServiceCollection();
public IServiceProvider Build()
_channel = _connection.CreateModel();
Services.InitSerilog();
// Add channel as singleton (this is not correct as I need 1 channel per ConsumerHandler)
Services.AddSingleton(_channel);
// Register the ConsumerHandler to DI
foreach (var item in _consumerHandlerItems)
// Add FirstHandler to DI
Type consumerType = item.ConsumerType;
Services.AddSingleton(consumerType);
// Finish DI Setup
var serviceProvider = Services.BuildServiceProvider();
// Bind the consumer handler to the channel and queue
foreach (var item in _consumerHandlerItems)
var consumerHandler = (AsyncEventingBasicConsumer)serviceProvider.GetRequiredService(item.ConsumerType);
_channel.AssignNewProcessor(item, consumerHandler);
return serviceProvider;
助手:
public static class QueuesHelpers
public static void AssignNewProcessor(this IModel channel, ConsumerHandlerItem item, AsyncEventingBasicConsumer consumerHandler)
channel.ExchangeDeclare(item.Exchange, ExchangeType.Topic, durable: true);
channel.QueueDeclare(item.Queue, true, false, false, null);
channel.QueueBind(item.Queue, item.Exchange, item.Queue, null);
channel.BasicConsume(item.Queue, false, consumerHandler);
消费者处理程序:
public class FistConsumerHandler : AsyncEventingBasicConsumer
private readonly ILogger<FistConsumerHandler> _logger;
private Guid guid = Guid.NewGuid();
public FistConsumerHandler(
IModel channel,
ILogger<FistConsumerHandler> logger) : base(channel)
Received += ConsumeMessageAsync;
_logger = logger;
private async Task ConsumeMessageAsync(object sender, BasicDeliverEventArgs eventArgs)
try
// consumer logic to consume the message
catch (Exception ex)
finally
Model.Acknowledge(eventArgs);
这段代码的问题是:
-
只有 1 个 FistConsumerHandler 实例(重新注册为单例)。例如,我需要 5 个。
我只有 1 个频道,每个实例需要 1 个频道。
综上所述,使用 Microsoft.Extensions.DependencyInjection 的预期行为应该是:
-
创建连接(与所有消费者共享此连接)
当消息被接收到队列时,它应该被 1 个消费者使用自己的通道消费
如果队列收到另一条消息,则应该由另一个消费者使用它
【问题讨论】:
【参考方案1】:TL;DR;创建自己的范围
我在我正在开发的应用程序中做了类似的事情,尽管没有我想要的那么干净(这也是我遇到这篇文章的原因)。对我来说,关键是使用IServiceScopeFactory
获取注入服务并在消费者方法中使用它们。在典型的 HTTP 请求中,API 将分别在请求进入/响应发出时自动为您创建/关闭范围。但由于这不是 HTTP 请求,我们需要创建/关闭使用注入服务的范围。
这是获取注入的 DB 上下文(但可以是任何内容)的简化示例,假设我已经设置了 RabbitMQ 消费者,将消息反序列化为对象(在此示例中为 FooEntity
):
public class RabbitMQConsumer
private readonly IServiceProvider _provider;
public RabbitMQConsumer(IServiceProvider serviceProvider)
this._serviceProvider = serviceProvider;
public async Task ConsumeMessageAsync()
// Using statement ensures we close scope when finished, helping avoid memory leaks
using (var scope = this._serviceProvider.CreateScope())
// Get your service(s) within the scope
var context = scope.ServiceProvider.GetRequiredService<MyDBContext>();
// Do things with dbContext
请务必将RabbitMQConsumer
注册为单例,而不是Startup.cs
中的瞬态。
参考资料:
Similar SO post MS Docs【讨论】:
以上是关于在 c# 中使用依赖注入的 RabbitMQ 工作队列的主要内容,如果未能解决你的问题,请参考以下文章