MassTransit / RabbitMQ - 为啥跳过这么多消息?

Posted

技术标签:

【中文标题】MassTransit / RabbitMQ - 为啥跳过这么多消息?【英文标题】:MassTransit / RabbitMQ - why so many messages get skipped?MassTransit / RabbitMQ - 为什么跳过这么多消息? 【发布时间】:2019-09-26 13:04:58 【问题描述】:

我正在使用 MassTransit/RabbitMQ 在生产者/消费者场景中使用 2 个 .NET Core 控制台应用程序。我需要确保即使没有消费者启动并运行,来自生产者的消息仍然成功排队。这似乎不适用于 Publish() - 消息刚刚消失,所以我使用 Send() 代替。消息至少会排队,但如果没有任何消费者运行消息,所有消息都会最终进入“_skipped”队列。

这是我的第一个问题:这是基于需求的正确方法吗(即使没有消费者启动并运行,来自生产者的消息仍然成功排队)?

使用 Send(),我的消费者确实可以工作,但仍有许多消息从裂缝中掉入并被转储到“_skipped”队列中。消费者的逻辑很少(此时只是记录消息),所以它不是一个长时间运行的过程。

这是我的第二个问题:为什么还有这么多消息被转储到“_skipped”队列中?

这就引出了我的第三个问题:这是否意味着我的消费者也需要收听“_skipped”队列?


我不确定您需要为这个问题查看什么代码,但这里是 RabbitMQ 管理 UI 的屏幕截图:

生产者配置:

    static IHostBuilder CreateHostBuilder(string[] args)
    
        return Host.CreateDefaultBuilder()
                      .ConfigureServices((hostContext, services) =>
                      
                          services.Configure<ApplicationConfiguration>(hostContext.Configuration.GetSection(nameof(ApplicationConfiguration)));

                          services.AddMassTransit(cfg =>
                          
                              cfg.AddBus(ConfigureBus);
                          );

                          services.AddHostedService<CardMessageProducer>();
                      )
                      .UseConsoleLifetime()
                      .UseSerilog();
    

    static IBusControl ConfigureBus(IServiceProvider provider)
    
        var options = provider.GetRequiredService<IOptions<ApplicationConfiguration>>().Value;

        return Bus.Factory.CreateUsingRabbitMq(cfg =>
        
            var host = cfg.Host(new Uri(options.RabbitMQ_ConnectionString), h =>
            
                h.Username(options.RabbitMQ_Username);
                h.Password(options.RabbitMQ_Password);
            );

            cfg.ReceiveEndpoint(host, typeof(CardMessage).FullName, e =>
            
                EndpointConvention.Map<CardMessage>(e.InputAddress);
            );
        );
    

生产者代码:

Bus.Send(message);

消费者配置:

    static IHostBuilder CreateHostBuilder(string[] args)
    
        return Host.CreateDefaultBuilder()
                      .ConfigureServices((hostContext, services) =>
                      
                          services.AddSingleton<CardMessageConsumer>();

                          services.Configure<ApplicationConfiguration>(hostContext.Configuration.GetSection(nameof(ApplicationConfiguration)));

                          services.AddMassTransit(cfg =>
                          
                              cfg.AddBus(ConfigureBus);
                          );

                          services.AddHostedService<MassTransitHostedService>();
                      )
                      .UseConsoleLifetime()
                      .UseSerilog();
    

    static IBusControl ConfigureBus(IServiceProvider provider)
    
        var options = provider.GetRequiredService<IOptions<ApplicationConfiguration>>().Value;

        return Bus.Factory.CreateUsingRabbitMq(cfg =>
        
            var host = cfg.Host(new Uri(options.RabbitMQ_ConnectionString), h =>
            
                h.Username(options.RabbitMQ_Username);
                h.Password(options.RabbitMQ_Password);
            );

            cfg.ReceiveEndpoint(host, typeof(CardMessage).FullName, e =>
            
                e.Consumer<CardMessageConsumer>(provider);
            );

            //cfg.ReceiveEndpoint(host, typeof(CardMessage).FullName + "_skipped", e =>
            //
            //    e.Consumer<CardMessageConsumer>(provider);
            //);
        );
    

消费者代码:

class CardMessageConsumer : IConsumer<CardMessage>

    private readonly ILogger<CardMessageConsumer> logger;
    private readonly ApplicationConfiguration configuration;
    private long counter;

    public CardMessageConsumer(ILogger<CardMessageConsumer> logger, IOptions<ApplicationConfiguration> options)
    
        this.logger = logger;
        this.configuration = options.Value;
    

    public async Task Consume(ConsumeContext<CardMessage> context)
    
        this.counter++;

        this.logger.LogTrace($"Message #this.counter consumed: context.Message");
    

【问题讨论】:

【参考方案1】:

在 MassTransit 中,_skipped 队列是 dead letter queue 概念的实现。消息到达那里是因为它们没有被消费。

带有 RMQ 的 MassTransit 始终将消息传递到 exchange,而不是 queue。默认情况下,每个 MassTransit 端点创建(如果没有现有队列)具有端点名称的队列、具有相同名称的交换并将它们绑定在一起。当应用程序具有配置的使用者(或处理程序)时,该消息类型的交换(使用消息类型作为交换名称)也被创建,并且端点交换被绑定到消息类型交换。因此,当您使用Publish 时,消息将发布到消息类型交换器,并使用端点绑定(或多个绑定)相应地传递。当您使用Send 时,没有使用消息类型交换,因此消息直接到达目标交换。而且,正如@maldworth 正确指出的那样,每个 MassTransit 端点只希望获得它可以使用的消息。如果它不知道如何使用消息 - 消息将被移动到死信队列。这与有害消息队列一样,都是消息传递的基本模式。

如果您需要排队等待稍后使用的消息,最好的方法是设置接线,但端点本身(我的意思是应用程序)不应该运行。一旦应用程序启动,它将消耗所有排队的消息。

【讨论】:

使用 RMQ 的 MassTransit 总是将消息传递到交换器,而不是队列。 那么在 MassTransit 中发送和发布消息有什么不同呢?【参考方案2】:

当消费者启动总线bus.Start() 时,它所做的一件事就是为传输创建所有交换和队列。如果您要求在消费者之前发布/发送,您唯一的选择是运行 DeployTopologyOnly。不幸的是,官方文档中没有记录此功能,但单元测试在这里:https://github.com/MassTransit/MassTransit/blob/develop/src/MassTransit.RabbitMqTransport.Tests/BuildTopology_Specs.cs

当消息发送给不知道如何处理的消费者时,会发生跳过队列。

例如,如果您有一个可以处理接收端点名称“my-queue-a”的IConsumer&lt;MyMessageA&gt; 的消费者。但是你的消息生产者会Send&lt;MyMessageB&gt;(Uri("my-queue-a")...),这是一个问题。消费者只了解 A,它不知道如何处理 B。因此它只是将其移动到跳过的队列并继续。

【讨论】:

就我而言,这个概念验证中只有一种类型的消息。如果我让消费者​​订阅两个队列(标准队列和“_skipped”队列),它会尽快处理所有消息。 您不应该让消费者订阅“_skipped”队列。或者“_error”队列。这表明某些东西没有正确配置/接线。通常,如果您想重新处理 _error 队列中的内容,请使用 rabbitmq 铲子插件。 根据我添加的代码(上图),有没有什么特别突出的地方没有正确配置/接线?

以上是关于MassTransit / RabbitMQ - 为啥跳过这么多消息?的主要内容,如果未能解决你的问题,请参考以下文章

RabbitMQ 和 MassTransit 之间的连接

我们可以通过 masstransit 一起使用 RabbitMQ 和 Mediatr 吗?

是否可以使用 MassTransit 为 RabbitMQ 队列注册多个消费者?

.Net Core RabbitMQ/Masstransit 同一应用程序中每个可配置线程数一个消费者

未在代码中指定的交换上的MassTransit ACCESS_REFUSED

MassTransit一个优秀的.NET消息(事件)总线框架