如何独立于其他人配置某些类型/组的消费者?

Posted

技术标签:

【中文标题】如何独立于其他人配置某些类型/组的消费者?【英文标题】:How to configure some types/groups of consumers independently of others? 【发布时间】:2022-01-24 02:26:26 【问题描述】:

目前我使用以下代码添加所有消费者并配置端点:

services.AddMassTransit(x =>

    x.AddConsumers(assemblyToLoadConsumersFrom);

    x.UsingRabbitMq((context, cfg) =>
    
        cfg.Host(new Uri(settings.ConnectionString));

        cfg.ConfigureEndpoints(context);
    );
);

但是,现在我有不同类型的消费者,这些消费者由抽象类定义,如 CommandResponseConsumerBaseQueryConsumerBase(这些抽象类实现 IConsumer 接口,实际消费者实现继承自这些类)。每个消费者实现都需要构造函数中的一些参数由 DI 注入(使用 Microsoft DI 和上面的代码正确地将参数注入消费者构造函数)。

问题是,例如对于QueryConsumerBase 类型的所有消费者,我想设置endpointConfiguration.DiscardFaultedMessages(),但为其他消费者(如CommandResponseConsumerBase)保留默认值(而不是丢弃)。我还想保留默认拓扑/端点名称,如上面代码分配的名称。

另外我不想单独配置消费者,因为我有很多消费者,而且很乏味。相反,我只想在程序集中找到所有 CommandResponseConsumerBase 消费者并注册它们并一次性配置它们,然后在程序集中找到所有 QueryConsumerBase 并注册它们并一次配置它们。

代码如下所示:

services.AddMassTransit(x =>

    // x.AddConsumers(assemblyToLoadConsumersFrom);

    assemblyToLoadConsumersFrom
        .GetAllImplementations(typeof(CommandResponseConsumerBase<,>)).ToList() // returns classes which implement CommandResponseConsumerBase<,>)
        .ForEach(impl =>
        
            x.AddConsumer(impl.ImplementedType); // this works like x.AddConsumers(assemblyToLoadConsumersFrom);
            // but how to configure consumer here?
        );

    x.UsingRabbitMq((context, cfg) =>
    
        cfg.Host(new Uri(settings.ConnectionString));

        cfg.ConfigureEndpoints(context);
    );
);

x.AddConsumer(impl.ImplementedType) 方法允许我添加消费者,但我无法传递 ConsumerTypeDefinition,因为我没有任何定义,我不想为每个消费者定义一个。也不能使用x.AddConsumer&lt;T&gt;(...),因为我在动态变量中有消费者类型。也试过x.AddConsumer(impl.ImplementedType).Endpoint(e =&gt; e.?),但端点配置器只允许更改名称、临时设置和其他一些设置,但不能更改DiscardFaultedMessages()

使用 MassTransit / AspNetCore / RabbitMq 7.2.4


最终解决方案(感谢 Chris):

// Actually this definition class is not even aware of exact 
// consumer type.

public class CommandResponseConsumerDefinition<TConsumer> : ConsumerDefinition<TConsumer>
    where TConsumer : class, IConsumer

    protected override void ConfigureConsumer(
        IReceiveEndpointConfigurator endpointConfigurator,
        IConsumerConfigurator<TConsumer> consumerConfigurator)
    
        endpointConfigurator.DiscardFaultedMessages();
    

assemblyToLoadConsumersFrom!
    .GetAllImplementations(typeof(CommandResponseConsumerBase<,>))
    .ToList()
    .ForEach(impl =>
    
        // create definition type that is aware of exact consumer type
        var definitionType = typeof(CommandResponseConsumerDefinition<>)
            .MakeGenericType(impl.ImplementedType);

        // Add consumer type along with definition type
        x.AddConsumer(impl.ImplementedType, definitionType);
    );

【问题讨论】:

【参考方案1】:

您也许可以创建两个通用消费者定义,一个用于查询库,一个用于命令库,将定义的实现通用类型与消费者一起注册(调用AddConsumer 时)。

类似:

public class QueryConsumerDefinition<TConsumer> :
    ConsumerDefinition<TConsumer>
    where TConsumer : QueryConsumerBase

    protected override void ConfigureConsumer(IReceiveEndpointConfigurator endpointConfigurator, IConsumerConfigurator<TConsumer> consumerConfigurator)
    
        endpointConfigurator.DiscardFaultedMessages();
    

【讨论】:

由于我的 QueryConsumerBase(和其他人)采用更通用的类型参数,我找到了其他非常相似的解决方案(请参阅我上次编辑的解决方案)-谢谢!

以上是关于如何独立于其他人配置某些类型/组的消费者?的主要内容,如果未能解决你的问题,请参考以下文章

Lmax Disruptor,许多消费者 - 如何让消费者只接受特定类型的消息并独立进行?

Consumer.endOffsets 如何在 Kafka 中工作?

来自不同消费者组的多个消费者如何从同一个分区中读取?

聊聊 Kafka:如何避免消费组的 Rebalance

聊聊 Kafka:如何避免消费组的 Rebalance

聊聊 Kafka:如何避免消费组的 Rebalance