如何独立于其他人配置某些类型/组的消费者?
Posted
技术标签:
【中文标题】如何独立于其他人配置某些类型/组的消费者?【英文标题】:How to configure some types/groups of consumers independently of others? 【发布时间】:2022-01-24 02:26:26 【问题描述】:目前我使用以下代码添加所有消费者并配置端点:
services.AddMassTransit(x =>
x.AddConsumers(assemblyToLoadConsumersFrom);
x.UsingRabbitMq((context, cfg) =>
cfg.Host(new Uri(settings.ConnectionString));
cfg.ConfigureEndpoints(context);
);
);
但是,现在我有不同类型的消费者,这些消费者由抽象类定义,如 CommandResponseConsumerBase
和 QueryConsumerBase
(这些抽象类实现 IConsumer
接口,实际消费者实现继承自这些类)。每个消费者实现都需要构造函数中的一些参数由 DI 注入(使用 Microsoft DI 和上面的代码正确地将参数注入消费者构造函数)。
问题是,例如对于QueryConsumerBase
类型的所有消费者,我想设置endpointConfiguration.DiscardFaultedMessages()
,但为其他消费者(如CommandResponseConsumerBase
)保留默认值(而不是丢弃)。我还想保留默认拓扑/端点名称,如上面代码分配的名称。
另外我不想单独配置消费者,因为我有很多消费者,而且很乏味。相反,我只想在程序集中找到所有 CommandResponseConsumerBase
消费者并注册它们并一次性配置它们,然后在程序集中找到所有 QueryConsumerBase
并注册它们并一次配置它们。
代码如下所示:
services.AddMassTransit(x =>
// x.AddConsumers(assemblyToLoadConsumersFrom);
assemblyToLoadConsumersFrom
.GetAllImplementations(typeof(CommandResponseConsumerBase<,>)).ToList() // returns classes which implement CommandResponseConsumerBase<,>)
.ForEach(impl =>
x.AddConsumer(impl.ImplementedType); // this works like x.AddConsumers(assemblyToLoadConsumersFrom);
// but how to configure consumer here?
);
x.UsingRabbitMq((context, cfg) =>
cfg.Host(new Uri(settings.ConnectionString));
cfg.ConfigureEndpoints(context);
);
);
x.AddConsumer(impl.ImplementedType)
方法允许我添加消费者,但我无法传递 ConsumerTypeDefinition,因为我没有任何定义,我不想为每个消费者定义一个。也不能使用x.AddConsumer<T>(...)
,因为我在动态变量中有消费者类型。也试过x.AddConsumer(impl.ImplementedType).Endpoint(e => e.?)
,但端点配置器只允许更改名称、临时设置和其他一些设置,但不能更改DiscardFaultedMessages()
。
使用 MassTransit / AspNetCore / RabbitMq 7.2.4
最终解决方案(感谢 Chris):
// Actually this definition class is not even aware of exact
// consumer type.
public class CommandResponseConsumerDefinition<TConsumer> : ConsumerDefinition<TConsumer>
where TConsumer : class, IConsumer
protected override void ConfigureConsumer(
IReceiveEndpointConfigurator endpointConfigurator,
IConsumerConfigurator<TConsumer> consumerConfigurator)
endpointConfigurator.DiscardFaultedMessages();
assemblyToLoadConsumersFrom!
.GetAllImplementations(typeof(CommandResponseConsumerBase<,>))
.ToList()
.ForEach(impl =>
// create definition type that is aware of exact consumer type
var definitionType = typeof(CommandResponseConsumerDefinition<>)
.MakeGenericType(impl.ImplementedType);
// Add consumer type along with definition type
x.AddConsumer(impl.ImplementedType, definitionType);
);
【问题讨论】:
【参考方案1】:您也许可以创建两个通用消费者定义,一个用于查询库,一个用于命令库,将定义的实现通用类型与消费者一起注册(调用AddConsumer
时)。
类似:
public class QueryConsumerDefinition<TConsumer> :
ConsumerDefinition<TConsumer>
where TConsumer : QueryConsumerBase
protected override void ConfigureConsumer(IReceiveEndpointConfigurator endpointConfigurator, IConsumerConfigurator<TConsumer> consumerConfigurator)
endpointConfigurator.DiscardFaultedMessages();
【讨论】:
由于我的 QueryConsumerBase(和其他人)采用更通用的类型参数,我找到了其他非常相似的解决方案(请参阅我上次编辑的解决方案)-谢谢!以上是关于如何独立于其他人配置某些类型/组的消费者?的主要内容,如果未能解决你的问题,请参考以下文章
Lmax Disruptor,许多消费者 - 如何让消费者只接受特定类型的消息并独立进行?