MassTransit:在消费者消费完所有消息后如何停止公共汽车?

Posted

技术标签:

【中文标题】MassTransit:在消费者消费完所有消息后如何停止公共汽车?【英文标题】:MassTransit: How to stop the bus after all messages are consumed by consumer? 【发布时间】:2019-09-25 21:44:56 【问题描述】:

我尝试在使用 MassTransit 消耗队列中的所有消息后停止总线。我将并发消息限制设置为 1,因为我的消费者需要一次处理一条消息。

我尝试将 bus.StopAsync() 放在 bus.StartAsync 后面,如下所示。结果显示,一条消息被消费后,总线就会停止。

总线配置:

IBusControl bus = Bus.Factory.CreateUsingRabbitMq(cfg =>
            
                IRabbitMqHost host = cfg.Host(new Uri("rabbitmq://localhost"), hostConfigurator =>
                
                    hostConfigurator.Username("username");
                    hostConfigurator.Password("password");
                );

            cfg.ReceiveEndpoint(host, "MyResult", ep =>
            
                ep.Bind("MyExchange", s =>  s.Durable = true; );

                ep.Consumer<MessageConsumer>(mc =>
                
                    mc.UseConcurrentMessageLimit(1);
                );
            );
        );

公交车起停:

await bus.StartAsync();

await bus.StopAsync();

我的问题是如何在队列中的所有消息都被消耗后停止总线。我对 MassTransit 很陌生,对调用消费者和停止公共汽车的顺序非常好奇。感谢有人可以提供帮助。谢谢。

【问题讨论】:

【参考方案1】:

在 Testing 命名空间中,有一个功能用于监视总线上的活动,可用于发出没有消息被消费的信号(之后,您可以按照您的建议停止总线)。

你可以看到单元测试: https://github.com/MassTransit/MassTransit/blob/v7.0.3/tests/MassTransit.Tests/BusActivityMonitor_Specs.cs#L53

观察者被添加使用:

var activityMonitor = bus.CreateBusActivityMonitor(TimeSpan.FromMilliseconds(500));

一旦总线空闲,超时就会为真:

var timeout = await activityMonitor.AwaitBusInactivity(TimeSpan.FromSeconds(10));

如果timeout 为真,则在指定时间内总线上没有活动。

【讨论】:

以上是关于MassTransit:在消费者消费完所有消息后如何停止公共汽车?的主要内容,如果未能解决你的问题,请参考以下文章

如果我有消息类型列表,如何在 MassTransit 中注册通用消费者适配器

普通消费者可以在 MassTransit 中重试消息之前延长超时时间吗?

masstransit请求/响应:在消费者中获取调用者超时

MassTransit / RabbitMQ - 为啥跳过这么多消息?

是否可以使用 MassTransit 为 RabbitMQ 队列注册多个消费者?

未登录 MassTransit Mediator 的异常