同一消息的多个订阅者 Rebus Azure 服务总线

Posted

技术标签:

【中文标题】同一消息的多个订阅者 Rebus Azure 服务总线【英文标题】:Multiple Subscribers to same message Rebus Azure Service Bus 【发布时间】:2020-07-06 08:54:23 【问题描述】:

我有两个完全相同的消费者

消费者1

using (var adapter = new BuiltinHandlerActivator())
        
            adapter.Handle<string>(async (bus, message) =>
            
                Console.WriteLine("Got message > " + message);

                await bus.Reply("Received in consumer 1");
            );

            Configure.With(adapter)
                .Transport(t => t.UseAzureServiceBus(connectionString, "server"))
                .Start();

            adapter.Bus.Subscribe<string>().Wait();

            Console.WriteLine("Press ENTER to quit");
            Console.ReadLine();
        

消费者 2

using (var adapter = new BuiltinHandlerActivator())
        
            adapter.Handle<string>(async (bus, message) =>
            
                Console.WriteLine("Got message > " + message);

                await bus.Reply("Received in Consumer 2");
            );

            Configure.With(adapter)
                .Transport(t => t.UseAzureServiceBus(connectionString, "server"))
                .Start();

            adapter.Bus.Subscribe<string>().Wait();

            Console.WriteLine("Press ENTER to quit");
            Console.ReadLine();
        

制作人

using (var adapter = new BuiltinHandlerActivator())
        
            adapter.Handle<string>(async message =>
            
                Console.WriteLine("Returned > " + message);
            );

            var bus = Configure
                .With(adapter)
                .Transport(t => t.UseAzureServiceBus(connectionString, "client"))
                .Routing(r => r.TypeBased().Map<string>("server"))
                .Start();

            Console.WriteLine("Press Q to quit or any other key to produce a job");
            while (true)
            
                Console.Write("Write something > ");
                var text = Console.ReadLine();

                if (string.IsNullOrWhiteSpace(text)) break;

                bus.Publish(text).Wait();
            
        

我期望每当我从生产者发送消息时,我的两个消费者都会显示该消息。 现在它只在其中一个中执行此操作。当我关闭那个并发送另一条消息时,其余的会收到它。

【问题讨论】:

【参考方案1】:

基本上,您只需要给消费者起不同的名字。 Rebus 为每个生产者创建一个主题(基于程序集、命名空间、类型),并为这些主题中的每个消费者创建订阅。 如果两个消费者使用相同的名字,他们就会竞争消息。

                .Transport(t => t.UseAzureServiceBus(connectionString, "consumer1"))
                .Transport(t => t.UseAzureServiceBus(connectionString, "consumer2"))

完整示例:https://github.com/rebus-org/RebusSamples/tree/master/PubSubNative

其他一些有用的链接:

How does Rebus work with Azure Service Bus topics? https://github.com/rebus-org/Rebus/wiki/Azure-Service-Bus-transport

【讨论】:

感谢这个作品,但我很困惑。第二个参数是inputQueueAddress,我认为它需要相同(例如服务器)才能使消息到达那里。现在我在每个类中测试这个参数的不同字符串值。而且它还在工作。 现在两个消费者都消费所有字符串消息。基本上我正在玩的是 1 个生产者 - 2 个消费者(发送 1 条消息并在消费者中接收)现在正在工作。现在我想尝试的是 1 个生产者发送 1 条字符串消息但给给定的消费者。在这种情况下我需要使用 .Send() 吗? 很高兴我能帮上忙。不过,您可能应该在一个新问题中提出这个问题。我不太清楚你的意思,如果你提出一个新问题,就会被更多人看到。

以上是关于同一消息的多个订阅者 Rebus Azure 服务总线的主要内容,如果未能解决你的问题,请参考以下文章

Azure Queues dequeue 计数随着 Rebus 核心增长

Azure 服务总线 - 订阅者可以独立订阅订阅并共享相同的消息?

为啥 Azure 事件中心订阅者不起作用?

Azure 服务总线主题订阅者接收订单

消息正文上的 Azure 服务总线订阅筛选器

C# 消息队列-Microsoft Azure service bus 服务总线