MassTransit / RabbitMQ - 为啥跳过这么多消息?
Posted
技术标签:
【中文标题】MassTransit / RabbitMQ - 为啥跳过这么多消息?【英文标题】:MassTransit / RabbitMQ - why so many messages get skipped?MassTransit / RabbitMQ - 为什么跳过这么多消息? 【发布时间】:2019-09-26 13:04:58 【问题描述】:我正在使用 MassTransit/RabbitMQ 在生产者/消费者场景中使用 2 个 .NET Core 控制台应用程序。我需要确保即使没有消费者启动并运行,来自生产者的消息仍然成功排队。这似乎不适用于 Publish() - 消息刚刚消失,所以我使用 Send() 代替。消息至少会排队,但如果没有任何消费者运行消息,所有消息都会最终进入“_skipped”队列。
这是我的第一个问题:这是基于需求的正确方法吗(即使没有消费者启动并运行,来自生产者的消息仍然成功排队)?
使用 Send(),我的消费者确实可以工作,但仍有许多消息从裂缝中掉入并被转储到“_skipped”队列中。消费者的逻辑很少(此时只是记录消息),所以它不是一个长时间运行的过程。
这是我的第二个问题:为什么还有这么多消息被转储到“_skipped”队列中?
这就引出了我的第三个问题:这是否意味着我的消费者也需要收听“_skipped”队列?
我不确定您需要为这个问题查看什么代码,但这里是 RabbitMQ 管理 UI 的屏幕截图:
生产者配置:
static IHostBuilder CreateHostBuilder(string[] args)
return Host.CreateDefaultBuilder()
.ConfigureServices((hostContext, services) =>
services.Configure<ApplicationConfiguration>(hostContext.Configuration.GetSection(nameof(ApplicationConfiguration)));
services.AddMassTransit(cfg =>
cfg.AddBus(ConfigureBus);
);
services.AddHostedService<CardMessageProducer>();
)
.UseConsoleLifetime()
.UseSerilog();
static IBusControl ConfigureBus(IServiceProvider provider)
var options = provider.GetRequiredService<IOptions<ApplicationConfiguration>>().Value;
return Bus.Factory.CreateUsingRabbitMq(cfg =>
var host = cfg.Host(new Uri(options.RabbitMQ_ConnectionString), h =>
h.Username(options.RabbitMQ_Username);
h.Password(options.RabbitMQ_Password);
);
cfg.ReceiveEndpoint(host, typeof(CardMessage).FullName, e =>
EndpointConvention.Map<CardMessage>(e.InputAddress);
);
);
生产者代码:
Bus.Send(message);
消费者配置:
static IHostBuilder CreateHostBuilder(string[] args)
return Host.CreateDefaultBuilder()
.ConfigureServices((hostContext, services) =>
services.AddSingleton<CardMessageConsumer>();
services.Configure<ApplicationConfiguration>(hostContext.Configuration.GetSection(nameof(ApplicationConfiguration)));
services.AddMassTransit(cfg =>
cfg.AddBus(ConfigureBus);
);
services.AddHostedService<MassTransitHostedService>();
)
.UseConsoleLifetime()
.UseSerilog();
static IBusControl ConfigureBus(IServiceProvider provider)
var options = provider.GetRequiredService<IOptions<ApplicationConfiguration>>().Value;
return Bus.Factory.CreateUsingRabbitMq(cfg =>
var host = cfg.Host(new Uri(options.RabbitMQ_ConnectionString), h =>
h.Username(options.RabbitMQ_Username);
h.Password(options.RabbitMQ_Password);
);
cfg.ReceiveEndpoint(host, typeof(CardMessage).FullName, e =>
e.Consumer<CardMessageConsumer>(provider);
);
//cfg.ReceiveEndpoint(host, typeof(CardMessage).FullName + "_skipped", e =>
//
// e.Consumer<CardMessageConsumer>(provider);
//);
);
消费者代码:
class CardMessageConsumer : IConsumer<CardMessage>
private readonly ILogger<CardMessageConsumer> logger;
private readonly ApplicationConfiguration configuration;
private long counter;
public CardMessageConsumer(ILogger<CardMessageConsumer> logger, IOptions<ApplicationConfiguration> options)
this.logger = logger;
this.configuration = options.Value;
public async Task Consume(ConsumeContext<CardMessage> context)
this.counter++;
this.logger.LogTrace($"Message #this.counter consumed: context.Message");
【问题讨论】:
【参考方案1】:在 MassTransit 中,_skipped
队列是 dead letter queue 概念的实现。消息到达那里是因为它们没有被消费。
带有 RMQ 的 MassTransit 始终将消息传递到 exchange,而不是 queue。默认情况下,每个 MassTransit 端点创建(如果没有现有队列)具有端点名称的队列、具有相同名称的交换并将它们绑定在一起。当应用程序具有配置的使用者(或处理程序)时,该消息类型的交换(使用消息类型作为交换名称)也被创建,并且端点交换被绑定到消息类型交换。因此,当您使用Publish
时,消息将发布到消息类型交换器,并使用端点绑定(或多个绑定)相应地传递。当您使用Send
时,没有使用消息类型交换,因此消息直接到达目标交换。而且,正如@maldworth 正确指出的那样,每个 MassTransit 端点只希望获得它可以使用的消息。如果它不知道如何使用消息 - 消息将被移动到死信队列。这与有害消息队列一样,都是消息传递的基本模式。
如果您需要排队等待稍后使用的消息,最好的方法是设置接线,但端点本身(我的意思是应用程序)不应该运行。一旦应用程序启动,它将消耗所有排队的消息。
【讨论】:
使用 RMQ 的 MassTransit 总是将消息传递到交换器,而不是队列。 那么在 MassTransit 中发送和发布消息有什么不同呢?【参考方案2】:当消费者启动总线bus.Start()
时,它所做的一件事就是为传输创建所有交换和队列。如果您要求在消费者之前发布/发送,您唯一的选择是运行 DeployTopologyOnly。不幸的是,官方文档中没有记录此功能,但单元测试在这里:https://github.com/MassTransit/MassTransit/blob/develop/src/MassTransit.RabbitMqTransport.Tests/BuildTopology_Specs.cs
当消息发送给不知道如何处理的消费者时,会发生跳过队列。
例如,如果您有一个可以处理接收端点名称“my-queue-a”的IConsumer<MyMessageA>
的消费者。但是你的消息生产者会Send<MyMessageB>(Uri("my-queue-a")...)
,这是一个问题。消费者只了解 A,它不知道如何处理 B。因此它只是将其移动到跳过的队列并继续。
【讨论】:
就我而言,这个概念验证中只有一种类型的消息。如果我让消费者订阅两个队列(标准队列和“_skipped”队列),它会尽快处理所有消息。 您不应该让消费者订阅“_skipped”队列。或者“_error”队列。这表明某些东西没有正确配置/接线。通常,如果您想重新处理 _error 队列中的内容,请使用 rabbitmq 铲子插件。 根据我添加的代码(上图),有没有什么特别突出的地方没有正确配置/接线?以上是关于MassTransit / RabbitMQ - 为啥跳过这么多消息?的主要内容,如果未能解决你的问题,请参考以下文章
我们可以通过 masstransit 一起使用 RabbitMQ 和 Mediatr 吗?
是否可以使用 MassTransit 为 RabbitMQ 队列注册多个消费者?
.Net Core RabbitMQ/Masstransit 同一应用程序中每个可配置线程数一个消费者