如何在 .Net 中使用不同类型的消费者使用 RabbitMq 消息?

Posted

技术标签:

【中文标题】如何在 .Net 中使用不同类型的消费者使用 RabbitMq 消息?【英文标题】:How can I consume RabbitMq messages with different type of consumers in .Net? 【发布时间】:2015-05-25 07:19:00 【问题描述】:

消息类型:“PublishX”

消费者:

Type1ConsumerX

Type2ConsumerX

Type3ConsumerX

所有消费者必须立即捕获消息,但在自己内部同步消费..

例如,队列中有 100 条“PublishX”消息。 Type1ConsumerX 消费了 30 条消息(同步),Type2ConsumerX 消费了 50 条消息(同步),Type3ConsumerX 消费了 100 条消息(同步)。 我怎么知道消息被“所有类型的消费者”消费了?

RabbitMQ/MassTransit 能否推送消息给消费者?

RabbitMQ/MassTransit 能否以间隔 (1s) 推送消息(合并它们)以减少网络流量?

RabbitMQ/MassTransit 能否将相同的消息推送给不同类型的消费者?

【问题讨论】:

我不太明白这个问题。您是否只想设置竞争消费者,即rabbitmq.com/tutorials/tutorial-two-dotnet.html 没有消费者POP消息,认为我们有一个“NOTIFICATION_ITEM”并且我们不想为SmsConsumer减速通知..(认为我们的SmsConsumer比MailConsumer快)有1000 NotificationItem 在队列中,因此 SmsConsumer 和 MailConsumer 将使用相同的队列。 感谢您的链接,我想我需要一个将相同消息发送到 3 个队列的 Exchange?像 channel.exchange_declare(exchange='logs', type='fanout') 【参考方案1】:

如果我正确理解了这个问题,您只需要设置一个基本的pub/sub 模式。这将允许您将相同的消息传递给多个消费者。

示例发布者:

public static void PublishMessageToFanout()

    var factory = new ConnectionFactory  HostName = "localhost" ;

    using (var connection = factory.CreateConnection())
    using (var channel = connection.CreateModel())
    
        channel.ExchangeDeclare("messages", "fanout");

        var message = new Message  Text = "This is a message to send" ;

        var json = JsonConvert.SerializeObject(message);
        var body = Encoding.UTF8.GetBytes(json);

        channel.BasicPublish("messages", string.Empty, null, body);
    

消费者示例:

SubscribeToMessages("sms-messages", (s) => Console.WriteLine("SMS Message: 0", s));
SubscribeToMessages("email-messages", (s) => Console.WriteLine("Email Message: 0", s));

public static void SubscribeToMessages(string queueName, Action<string> messageAction)

    var factory = new ConnectionFactory()  HostName = "localhost" ;

    using (var connection = factory.CreateConnection())
    using (var channel = connection.CreateModel())
    
        channel.ExchangeDeclare("messages", "fanout");
        channel.QueueDeclare(queueName, true, false, false, null);
        channel.QueueBind(queueName, "messages", string.Empty);

        var consumer = new QueueingBasicConsumer(channel);

        channel.BasicConsume(queueName, true, consumer);

        while (true)
        
            var ea = consumer.Queue.Dequeue();

            var body = ea.Body;
            var message = Encoding.UTF8.GetString(body);
            messageAction(message);
        
    

如果您在单独的进程或控制台应用程序中运行 SubscribeToMessages 循环,您会看到无论何时调用 PublishMessageToFanout,它们都会打印出消息。您还将看到两个队列都存在于 RabbitMQ 管理中的 Queues 下。

【讨论】:

【参考方案2】:

关于您问题中的 MassTransit 部分

    RabbitMQ/MassTransit 向消费者推送消息? 是的,MassTransit 将消息发布到总线,然后由消费者处理它们

    RabbitMQ/MassTransit 能否以间隔 (1s) 推送消息(合并它们)以减少网络流量? 不知道是否有此功能,您可以编写自己的功能,但您必须非常小心丢失消息。

    RabbitMQ/MassTransit 能否将相同的消息推送给不同类型的消费者? 是的,多个消费者可以消费相同类型的消息。

我编写了一个简单的 hello world 应用程序来展示基础知识 - http://nodogmablog.bryanhogan.net/2015/04/mass-transit-with-rabbitmq-hello-world/

【讨论】:

以上是关于如何在 .Net 中使用不同类型的消费者使用 RabbitMq 消息?的主要内容,如果未能解决你的问题,请参考以下文章

MassTransit - 如何创建具有通用类型和 2 个不同消费者的交换主题

在 .NET MVC 4 (Web API) 中,如何拦截对控制器的请求并更改其 Content-Type?

详解RocketMQ不同类型的消费者

如何根据 HTTP 响应中的条件返回不同的对象类型(.net 核心)

如何在 C# 中使用反射调用具有不同类型参数的方法

如果我有消息类型列表,如何在 MassTransit 中注册通用消费者适配器