为啥我有以下错误(MassTransit)

Posted

技术标签:

【中文标题】为啥我有以下错误(MassTransit)【英文标题】:Why do I have the following error (MassTransit)为什么我有以下错误(MassTransit) 【发布时间】:2022-01-23 20:47:27 【问题描述】:

错误是 - ConfigurationException:已添加具有相同密钥的接收端点:事件

我有 appsettings.Development.json

"EventsBusOptions": 
    "HostUri": "rabbitmq://rabbitmq.test.com/gate",
    "UserName": "xxx",
    "Password": "xxxxxx",
    "QueueName": "events", //<<< if is change queue name some different string e.g. "events1" - NO error
    "PrefetchCount": 16,
    "UseConcurrencyLimit": 15
  

和 Startup.cs(带有 MultiBus 配置)

    services.AddMassTransit<IEventsBus>(x =>
    
        x.UsingRabbitMq((context, cfg) =>
        
            var _options = context.GetRequiredService<IOptions<EventsBusOptions>>().Value;

            cfg.Host(new Uri(_options.HostUri), h =>
            
                h.Username(_options.UserName);
                h.Password(_options.Password);
            );

            cfg.ReceiveEndpoint(_options.QueueName, ep =>
            
                ep.Consumer<EventsConsumer>(context);
                ep.PrefetchCount = _options.PrefetchCount ?? 15;
                ep.UseConcurrencyLimit(_options.UseConcurrencyLimit ?? 16);
            );

            cfg.ConfigureEndpoints(context);
        );

        x.AddConsumer<EventsConsumer>();

    );

为什么我在使用 "QueueName": "events" 时出现错误?

【问题讨论】:

【参考方案1】:

因为您使用 错误 方法来配置使用者。应该使用ConfigureConsumer 而不仅仅是Consumer,如下面的更新配置所示。

services.AddMassTransit<IEventsBus>(x =>

    x.AddConsumer<EventsConsumer>();

    x.UsingRabbitMq((context, cfg) =>
    
        var _options = context.GetRequiredService<IOptions<EventsBusOptions>>().Value;

        cfg.Host(new Uri(_options.HostUri), h =>
        
            h.Username(_options.UserName);
            h.Password(_options.Password);
        );

        cfg.ReceiveEndpoint(_options.QueueName, ep =>
        
            ep.PrefetchCount = _options.PrefetchCount ?? 16;
            ep.ConcurrentMessageLimit = _options.ConcurrentMessageLimit ?? 16;

            ep.ConfigureConsumer<EventsConsumer>(context);
        );

        cfg.ConfigureEndpoints(context);
    );
);

注意我还修复了您的并发消息限制配置以使用内置限制器,而不是添加过滤器。

另外您可以省略 ConfigureEndpoints,因为您正在为消费者手动配置接收端点。

【讨论】:

以上是关于为啥我有以下错误(MassTransit)的主要内容,如果未能解决你的问题,请参考以下文章

MassTransit / RabbitMQ - 为啥跳过这么多消息?

RabbitMQ 和 MassTransit 之间的连接

注册IConsumer 使用Autofac在Masstransit中依赖

如果我有消息类型列表,如何在 MassTransit 中注册通用消费者适配器

MassTransit 与 Kafka 和 NodaTime

如何在实体框架中持久保存 MassTransit 状态数据?