MassTransit:在消费者消费完所有消息后如何停止公共汽车?
Posted
技术标签:
【中文标题】MassTransit:在消费者消费完所有消息后如何停止公共汽车?【英文标题】:MassTransit: How to stop the bus after all messages are consumed by consumer? 【发布时间】:2019-09-25 21:44:56 【问题描述】:我尝试在使用 MassTransit 消耗队列中的所有消息后停止总线。我将并发消息限制设置为 1,因为我的消费者需要一次处理一条消息。
我尝试将 bus.StopAsync() 放在 bus.StartAsync 后面,如下所示。结果显示,一条消息被消费后,总线就会停止。
总线配置:
IBusControl bus = Bus.Factory.CreateUsingRabbitMq(cfg =>
IRabbitMqHost host = cfg.Host(new Uri("rabbitmq://localhost"), hostConfigurator =>
hostConfigurator.Username("username");
hostConfigurator.Password("password");
);
cfg.ReceiveEndpoint(host, "MyResult", ep =>
ep.Bind("MyExchange", s => s.Durable = true; );
ep.Consumer<MessageConsumer>(mc =>
mc.UseConcurrentMessageLimit(1);
);
);
);
公交车起停:
await bus.StartAsync();
await bus.StopAsync();
我的问题是如何在队列中的所有消息都被消耗后停止总线。我对 MassTransit 很陌生,对调用消费者和停止公共汽车的顺序非常好奇。感谢有人可以提供帮助。谢谢。
【问题讨论】:
【参考方案1】:在 Testing 命名空间中,有一个功能用于监视总线上的活动,可用于发出没有消息被消费的信号(之后,您可以按照您的建议停止总线)。
你可以看到单元测试: https://github.com/MassTransit/MassTransit/blob/v7.0.3/tests/MassTransit.Tests/BusActivityMonitor_Specs.cs#L53
观察者被添加使用:
var activityMonitor = bus.CreateBusActivityMonitor(TimeSpan.FromMilliseconds(500));
一旦总线空闲,超时就会为真:
var timeout = await activityMonitor.AwaitBusInactivity(TimeSpan.FromSeconds(10));
如果timeout
为真,则在指定时间内总线上没有活动。
【讨论】:
以上是关于MassTransit:在消费者消费完所有消息后如何停止公共汽车?的主要内容,如果未能解决你的问题,请参考以下文章
如果我有消息类型列表,如何在 MassTransit 中注册通用消费者适配器
普通消费者可以在 MassTransit 中重试消息之前延长超时时间吗?
MassTransit / RabbitMQ - 为啥跳过这么多消息?