C#利用RabbitMQ实现消息订阅与发布

Posted 天马行空

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了C#利用RabbitMQ实现消息订阅与发布相关的知识,希望对你有一定的参考价值。

在消息队列模型中,如何将消息广播到所有的消费者,这种模式成为“发布/订阅”。本文主要以一个简单的小例子,简述通过fanout交换机,实现消息的发布与订阅,仅供学习分享使用,如有不足之处,还请指正。

Fanout交换机模型

扇形交换机,采用广播模式,根据绑定的交换机,路由到与之对应的所有队列。一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。Fanout交换机转发消息是最快的。

RabbitMQ控制台操作

新增两个队列

在同一个Virtual host下新增两个队列Q1,Q2,如下图所示:

绑定fanout交换机

将两个队列绑定到系统默认的fanout交换机,如下所示:

示例效果图

生产者,采用Fanout类型交换机发布消息,如下图所示:

 

 当生产者发布 一条消息时,Q1,Q2两个队列均会收到,如下图所示:

当启动消费者后,两个消费者,均会订阅到相关消息,如下图所示:

核心代码

消息发布

建立连接后,将通道声明类型为Fanout的交换机,如下所示:

 1     /// <summary>
 2     /// fanout类型交换机,发送消息
 3     /// </summary>
 4     public class RabbitMqFanoutSendHelper : RabbitMqHelper {
 5         /// <summary>
 6         /// 发送消息
 7         /// </summary>
 8         /// <param name="msg"></param>
 9         /// <returns></returns>
10         public bool SendMsg(string msg)
11         {
12             try
13             {
14                 using (var conn = GetConnection("/Alan.hsiang"))
15                 {
16                     using (var channel = conn.CreateModel())
17                     {
18                         channel.ExchangeDeclare(exchange: "amq.fanout", type: ExchangeType.Fanout,durable:true);
19                         
20                         var body = Encoding.UTF8.GetBytes(msg);
21 
22                         channel.BasicPublish(exchange: "amq.fanout",
23                                              routingKey: "",
24                                              basicProperties: null,
25                                              body: body);
26 
27                         //Console.WriteLine(" [x] Sent {0}", message);
28                     };
29                 };
30                 return true;
31             }
32             catch (Exception ex)
33             {
34                 throw ex;
35             }
36         }
37     }

消息订阅

建立连接后,通道声明类型为Fanout的交换机,并绑定队列进行订阅,如下所示:

 1    /// <summary>
 2     /// 扇形交换机接收消息
 3     /// </summary>
 4     public class RabbitMqFanoutReceiveHelper : RabbitMqHelper
 5     {
 6         public RabbitMqReceiveEventHandler OnReceiveEvent;
 7 
 8         private IConnection conn;
 9 
10         private IModel channel;
11 
12         private EventingBasicConsumer consumer;
13 
14         public bool StartReceiveMsg(string queueName)
15         {
16             try
17             {
18                 conn = GetConnection("/Alan.hsiang");
19 
20                 channel = conn.CreateModel();
21                 channel.ExchangeDeclare(exchange: "amq.fanout", type: ExchangeType.Fanout,durable:true);
22                 //此处随机取出交换机下的队列
23                 //var queueName = channel.QueueDeclare().QueueName;
24                 channel.QueueBind(queue: queueName, exchange: "amq.fanout", routingKey: "");
25                 consumer = new EventingBasicConsumer(channel);
26                 consumer.Received += (model, ea) =>
27                 {
28                     var body = ea.Body.ToArray();
29                     var message = Encoding.UTF8.GetString(body);
30                     //Console.WriteLine(" [x] Received {0}", message);
31                     if (OnReceiveEvent != null)
32                     {
33                         OnReceiveEvent(queueName+"::"+message);
34                     }
35                 };
36                 channel.BasicConsume(queue: queueName,
37                                         autoAck: true,
38                                         consumer: consumer);
39                 return true;
40             }
41             catch (Exception ex)
42             {
43                 throw ex;
44             }
45         }
46     }

关于RabbitMQ的基础知识介绍,可参考前几篇博文。

备注

遣怀

唐代  [杜牧]

落魄江湖载酒行,楚腰纤细掌中轻。
十年一觉扬州梦,赢得青楼薄幸名。 

以上是关于C#利用RabbitMQ实现消息订阅与发布的主要内容,如果未能解决你的问题,请参考以下文章

Rabbitmq 介绍 安装基于Queue实现生产者消费者模型基本使用消息安全之ackdurable持久化利用闲置消费发布订阅发布订阅高级之Royting(按关键字匹配)Topic关键字模糊匹配基于r

RabbitMQ学习第三记:发布/订阅模式(Publish/Subscribe)

Python-RabbitMQ消息队列的发布与订阅

RabbitMq初探——发布与订阅

RabbitMQ 07 发布订阅模式

基于Hyperf实现RabbitMQ+WebSocket消息推送