RabbitMQ消息队列随笔

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ消息队列随笔相关的知识,希望对你有一定的参考价值。

本文权当各位看官对RabbitMQ的基本概念以及使用场景有了一定的了解,如果你还对它所知甚少或者只是停留在仅仅是听说过,建议你先看看这篇文章,在对RabbitMQ有了基本认识后,我们正式开启我们的RabbitMQ之旅吧,希望本文能够帮助大家在实际用到消息队列时有所帮助,如有表述的不当之处,还望各位看官指正。

一、消息队列的安装

1、 RabbitMQ是用Erlang编程语言进行开发,所以首先得在Erlang官网下载Erlang运行的环境,如图,选择所需对应文件进行下载,并进行安装:

技术分享

2、 设置环境变量,如图

技术分享

3、 去RabbitMQ官网下载对应操作系统的安装文件,并进行安装如图:

技术分享

4、 以管理员方式打开cmd,定位到RabbitMQ安装目录sbin文件夹下,依次执行以下命令

(1)     rabbitmq-service install

(2)     rabbitmq-service enable

(3)     rabbitmq-service start

如图:技术分享

到这里我们的RabbitMQ服务已经安装好了,利用Windows + R键,输入services.msc查看,RabbitMQ服务已经处于运行的状态了。

技术分享

5、 到这里不要以为我们的工作就结束了,接下来我们为RabbitMQ设置用户以及密码

在cmd中执行 rabbitmqctl list_users 查看RabbitMQ已存在的用户,如图

技术分享

这里发现有两个账号 rabbit 是我之前添加的,guest 是RabbitMQ自带的,

执行以下命令,添加RabbitMQ用户,并设置相应权限:

rabbitmqctl add_user bestadmin 123456

rabbitmqctl set_permissions  bestadmin ".*"  ".*"  ".*"

rabbitmqctl set_user_tags bestadmin administrator

如图:技术分享

 

 

6、 RabbitMQ有一个可视化界面,进行消息的管理,不过需要用命名rabbitmq-plugins enable rabbitmq_management 命令进行启动,接下来我们便可以在浏览器输入127.0.0.1:15672中进行查看,如图:

技术分享

输入我们刚设置的用户名bestuser 密码123,如图:

技术分享

一、消息队列的使用

我们知道RabbitMQ的Exchange常用交换器类型分为fanout、direct、topic、headers 4种类型,这里我们将对fanout、direct、topic 3种类型以实际代码的形式进行讲解,至于关于交换器对各类型的具体讲解,请参照文章开始给出的链接进行了解,这里就不再赘述,我们新建了如下图的解决方案:

技术分享

1、RabbitMQHelper 帮助类,对常用的消息入队以及消费消息进行了简单的封装:

  1     /// <summary>
  2     /// RabbitMQHelper
  3     /// </summary>
  4     public class RabbitMQHelper
  5     {
  6         private static ConnectionFactory _connectionFactory = null;
  7 
  8         /// <summary>
  9         /// 构造函数
 10         /// </summary>
 11         static RabbitMQHelper()
 12         {
 13             _connectionFactory = new ConnectionFactory();
 14             _connectionFactory.HostName = ConfigurationManager.AppSettings["HostName"].ToString();
 15             _connectionFactory.UserName = ConfigurationManager.AppSettings["UserName"].ToString();
 16             _connectionFactory.Password = ConfigurationManager.AppSettings["Password"].ToString();
 17             _connectionFactory.AutomaticRecoveryEnabled = true;
 18         }
 19 
 20         #region 单消息入队
 21         /// <summary>
 22         /// 单消息入队
 23         /// </summary>
 24         /// <param name="exchangeName">交换器名称</param>
 25         /// <param name="exchangeType">交换器类型</param>
 26         /// <param name="routingKey">路由关键字</param>
 27         /// <param name="message">消息实例</param>
 28         public static void Enqueue<TItem>(string exchangeName, string exchangeType, string routingKey, TItem message)
 29         {
 30             try
 31             {
 32                 if (message != null)
 33                 {
 34                     using (IConnection connection = _connectionFactory.CreateConnection())
 35                     {
 36                         using (IModel channel = connection.CreateModel())
 37                         {
 38                             channel.ExchangeDeclare(exchangeName, exchangeType, true, false, null);
 39                             string messageString = JsonConvert.SerializeObject(message);
 40                             byte[] body = Encoding.UTF8.GetBytes(messageString);
 41                             var properties = channel.CreateBasicProperties();
 42                             properties.Persistent = true; //使消息持久化
 43                             channel.BasicPublish(exchangeName, routingKey, properties, body);
 44                         }
 45                     }
 46                 }
 47             }
 48             catch (Exception ex)
 49             {
 50                 throw new Exception(ex.Message);
 51             }
 52         }
 53         #endregion
 54 
 55         #region 消息批量入队
 56         /// <summary>
 57         /// 消息批量入队
 58         /// </summary>
 59         /// <param name="exchangeName">交换器名称</param>
 60         /// <param name="exchangeType">交换器类型</param>
 61         /// <param name="routingKey">路由关键字</param>
 62         /// <param name="list">消息集合</param>
 63         public static void Enqueue<TItem>(string exchangeName, string exchangeType, string routingKey, List<TItem> list)
 64         {
 65             try
 66             {
 67                 if (list != null && list.Count > 0)
 68                 {
 69                     using (IConnection connection = _connectionFactory.CreateConnection())
 70                     {
 71                         using (IModel channel = connection.CreateModel())
 72                         {
 73                             foreach (TItem item in list)
 74                             {
 75                                 if (item != null)
 76                                 {
 77                                     channel.ExchangeDeclare(exchangeName, exchangeType, true, false, null);
 78                                     string messageString = JsonConvert.SerializeObject(item);
 79                                     byte[] body = Encoding.UTF8.GetBytes(messageString);
 80                                     var properties = channel.CreateBasicProperties();//使消息持久化
 81                                     properties.Persistent = true;
 82                                     channel.BasicPublish(exchangeName, routingKey, properties, body);
 83                                 }
 84                             }
 85                         }
 86                     }
 87                 }
 88             }
 89             catch (Exception ex)
 90             {
 91                 throw new Exception(ex.Message);
 92             }
 93         }
 94         #endregion
 95 
 96         #region 消费消息队列(旧的方式)
 97         /// <summary>
 98         /// 消费消息队列
 99         /// </summary>
100         /// <typeparam name="TItem">消息对象</typeparam>
101         /// <param name="exchangeName">交换器名称</param>
102         /// <param name="exchangeType">交换器类型</param>
103         /// <param name="routingKey">路由关键字</param>
104         /// <param name="queueName">队列名称</param>
105         /// <param name="func">消费消息的具体操作</param>
106         /// <param name="tryTimes">消费失败后,继续尝试消费的次数</param>
107 
108         public static void Consume<TItem>(string exchangeName, string exchangeType, string routingKey, string queueName, Func<TItem, bool> func, int tryTimes = 5)
109         {
110             try
111             {
112                 int consumeCount = 0;//尝试消费次数
113                 bool isConsumeSuccess = false;// 是否消费成功
114                 using (IConnection connection = _connectionFactory.CreateConnection())
115                 {
116                     using (IModel channel = connection.CreateModel())
117                     {
118                         channel.ExchangeDeclare(exchangeName, exchangeType, true, false, null);
119                         channel.QueueDeclare(queueName, true, false, false, null);
120                         channel.QueueBind(queueName, exchangeName, routingKey);
121                         QueueingBasicConsumer consumer = new QueueingBasicConsumer(channel);
122                         channel.BasicConsume(queueName, false, consumer);
123                         while (true)
124                         {
125                             var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
126                             var body = ea.Body;
127                             var message = Encoding.UTF8.GetString(body);
128                             TItem queueMessage = JsonConvert.DeserializeObject<TItem>(message);
129                             if (queueMessage != null)
130                             {
131                                 while (!isConsumeSuccess)
132                                 {
133                                     consumeCount++;
134                                     isConsumeSuccess = func(queueMessage);
135                                     if (isConsumeSuccess || consumeCount >= tryTimes) channel.BasicAck(ea.DeliveryTag, false);//将队列里面的消息进行释放
136                                 }
137                             }
138                         }
139                     }
140                 }
141             }
142             catch (Exception ex)
143             {
144                 throw new Exception(ex.Message);
145             }
146         }
147         #endregion
148 
149         #region 消费消息队列(新的方式)
150         /// <summary>
151         /// 消费消息队列
152         /// </summary>
153         /// <typeparam name="TItem">消息对象</typeparam>
154         /// <param name="exchangeName">交换器名称</param>
155         /// <param name="exchangeType">交换器类型</param>
156         /// <param name="routingKey">路由关键字</param>
157         /// <param name="queueName">队列名称</param>
158         /// <param name="func">消费消息的具体操作</param>
159         /// <param name="tryTimes">消费失败后,继续尝试消费的次数</param>
160         public static void NewConsume<TItem>(string exchangeName, string exchangeType, string routingKey, string queueName, Func<TItem, bool> func, int tryTimes = 5)
161         {
162             try
163             {
164                 int consumeCount = 0;//尝试消费次数
165                 bool isConsumeSuccess = false;// 是否消费成功
166                 using (IConnection connection = _connectionFactory.CreateConnection())
167                 {
168                     using (IModel channel = connection.CreateModel())
169                     {
170                         channel.ExchangeDeclare(exchangeName, exchangeType, true, false, null);
171                         channel.QueueDeclare(queueName, true, false, false, null);
172                         channel.QueueBind(queueName, exchangeName, routingKey);
173                         EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
174                         consumer.Received += (sender, eventArgs) =>
175                         {
176                             byte[] body = eventArgs.Body;
177                             if (body != null && body.Length > 0)
178                             {
179                                 string message = Encoding.UTF8.GetString(body);
180                                 if (!string.IsNullOrWhiteSpace(message))
181                                 {
182                                     TItem queueMessage = JsonConvert.DeserializeObject<TItem>(message);
183                                     if (queueMessage != null)
184                                     {
185                                         while (!isConsumeSuccess)
186                                         {
187                                             consumeCount++;
188                                             isConsumeSuccess = func(queueMessage);
189                                             if (isConsumeSuccess || consumeCount >= tryTimes) channel.BasicAck(eventArgs.DeliveryTag, false);
190                                         }
191                                     }
192                                 }
193                             }
194                         };
195                         channel.BasicConsume(queueName, false, consumer);
196                     }
197                 }
198             }
199             catch (Exception ex)
200             {
201                 throw new Exception(ex.Message);
202             }
203         }
204         #endregion
205     }

 备注:需要说明的是这里在对消费消息的方法进行封装的过程中使用了泛型委托,这样我们就只需要按照自己的业务需求,对消息进行处理了。

 2、 fanout类型:简单的说适合这样的场景,一个生产者产生的消息,需要将该消息发送到多个消息队列,供多个消费者进行消费。这里,为了对该场景进行还原,所以新建了RabbitMQConsumer,RabbitMQConsumer1两个消费者。

 生产者代码:

       public Form1()
        {
            InitializeComponent();
        }

        private void Producer_Click(object sender, EventArgs e)
        {
            RabbitMQHelper.Enqueue("developExchange", "fanout", "", new Developer() { Id = Guid.NewGuid(), Name = "nickdeng", Position = "开发" });
        }

 消费者1代码:

  class Program
    {
        static void Main(string[] args)
        {
            RabbitMQHelper.Consume<Developer>("developExchange", "fanout", "", "developQueue", ConsumeMessage);
        }

        /// <summary>
        /// 消费消息
        /// </summary>
        /// <param name="developer">处理对象</param>
        /// <returns>消费结果</returns>
        public static bool ConsumeMessage(Developer developer)
        {
            string message = JsonConvert.SerializeObject(developer);
            Console.Write(message);
            return true;
        }
    }

消费者2代码:

  class Program
    {
        static void Main(string[] args)
        {
            RabbitMQHelper.Consume<Developer>("developExchange", "fanout", "", "developQueue1", ConsumeMessage);
        }

        /// <summary>
        /// 消费消息
        /// </summary>
        /// <param name="developer">处理对象</param>
        /// <returns>消费结果</returns>
        public static bool ConsumeMessage(Developer developer)
        {
            string message = JsonConvert.SerializeObject(developer);
            Console.Write(message);
            return true;
        }
    }

消费者1与消费者2的代码,眨眼一看,不是一样的吗?仔细看会发现它们在的消息队列名称不一样,消费者1的队列名称是“developQueue”,消息者2的队列名称是“developQueue1”,因为这两个消息队列都与交换器“developExchange”进行了绑定,所以生产者产生的消息将被推送到这两个消息队列。运行代码,得到如下图结果:

技术分享

 3、direct类型:直译过来就是直接的意思,该类型适用于点对点的使用场景,生产者将消息发送到指定的消息队列:

  生产者代码:

      public Form1()
        {
            InitializeComponent();
        }

        private void Producer_Click(object sender, EventArgs e)
        {
            RabbitMQHelper.Enqueue("developExchange1", "direct", "directkey", new Developer() { Id = Guid.NewGuid(), Name = "nickdeng", Position = "开发" });
        }

消费者1代码:

        static void Main(string[] args)
        {
            RabbitMQHelper.Consume<Developer>("developExchange1", "direct", "directkey", "developQueue", ConsumeMessage);
        }

        /// <summary>
        /// 消费消息
        /// </summary>
        /// <param name="developer">处理对象</param>
        /// <returns>消费结果</returns>
        public static bool ConsumeMessage(Developer developer)
        {
            string message = JsonConvert.SerializeObject(developer);
            Console.Write(message);
            return true;
        }

消费者2代码:

        static void Main(string[] args)
        {
            RabbitMQHelper.Consume<Developer>("developExchange1", "direct", "directkey1", "developQueue1", ConsumeMessage);
        }

        /// <summary>
        /// 消费消息
        /// </summary>
        /// <param name="developer">处理对象</param>
        /// <returns>消费结果</returns>
        public static bool ConsumeMessage(Developer developer)
        {
            string message = JsonConvert.SerializeObject(developer);
            Console.Write(message);
            return true;
        }

生产者的路由关键字是“directkey”,消费者1的路由关键字为”directkey“,消费者2的路由关键字为”directkey1“,紧紧一字相差,生产者产生的消息就只有消费者1能够收到,运行代码得到如图结果:

技术分享

 至于topic类型,这里就不再以代码进行讲解了,其实大致使用方法与上面的direct类型相似,不同之处在于topic类型可通过路由关键字进行模糊匹配,将消息路由到相应队列,大家可根据自己的实际使用场景,进行类型的选择。

以上是关于RabbitMQ消息队列随笔的主要内容,如果未能解决你的问题,请参考以下文章

RabbitMQ原理与相关操作

RabbitMQ---延迟队列,整合springboot

RabbitMQ学习笔记五:RabbitMQ之优先级消息队列

RabbitMQ:第二章:Spring整合RabbitMQ(简单模式,广播模式,路由模式,通配符模式,消息可靠性投递,防止消息丢失,TTL,死信队列,延迟队列,消息积压,消息幂等性)(代码

Rabbitmq 消息队列

微服务专题之.Net6下集成消息队列-RabbitMQ交换机模式代码演示(全)