RabbitMQ消息队列随笔
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ消息队列随笔相关的知识,希望对你有一定的参考价值。
本文权当各位看官对RabbitMQ的基本概念以及使用场景有了一定的了解,如果你还对它所知甚少或者只是停留在仅仅是听说过,建议你先看看这篇文章,在对RabbitMQ有了基本认识后,我们正式开启我们的RabbitMQ之旅吧,希望本文能够帮助大家在实际用到消息队列时有所帮助,如有表述的不当之处,还望各位看官指正。
一、消息队列的安装
1、 RabbitMQ是用Erlang编程语言进行开发,所以首先得在Erlang官网下载Erlang运行的环境,如图,选择所需对应文件进行下载,并进行安装:
2、 设置环境变量,如图
3、 去RabbitMQ官网下载对应操作系统的安装文件,并进行安装如图:
4、 以管理员方式打开cmd,定位到RabbitMQ安装目录sbin文件夹下,依次执行以下命令
(1) rabbitmq-service install
(2) rabbitmq-service enable
(3) rabbitmq-service start
如图:
到这里我们的RabbitMQ服务已经安装好了,利用Windows + R键,输入services.msc查看,RabbitMQ服务已经处于运行的状态了。
5、 到这里不要以为我们的工作就结束了,接下来我们为RabbitMQ设置用户以及密码
在cmd中执行 rabbitmqctl list_users 查看RabbitMQ已存在的用户,如图
这里发现有两个账号 rabbit 是我之前添加的,guest 是RabbitMQ自带的,
执行以下命令,添加RabbitMQ用户,并设置相应权限:
rabbitmqctl add_user bestadmin 123456
rabbitmqctl set_permissions bestadmin ".*" ".*" ".*"
rabbitmqctl set_user_tags bestadmin administrator
如图:
6、 RabbitMQ有一个可视化界面,进行消息的管理,不过需要用命名rabbitmq-plugins enable rabbitmq_management 命令进行启动,接下来我们便可以在浏览器输入127.0.0.1:15672中进行查看,如图:
输入我们刚设置的用户名bestuser 密码123,如图:
一、消息队列的使用
我们知道RabbitMQ的Exchange常用交换器类型分为fanout、direct、topic、headers 4种类型,这里我们将对fanout、direct、topic 3种类型以实际代码的形式进行讲解,至于关于交换器对各类型的具体讲解,请参照文章开始给出的链接进行了解,这里就不再赘述,我们新建了如下图的解决方案:
1、RabbitMQHelper 帮助类,对常用的消息入队以及消费消息进行了简单的封装:
1 /// <summary> 2 /// RabbitMQHelper 3 /// </summary> 4 public class RabbitMQHelper 5 { 6 private static ConnectionFactory _connectionFactory = null; 7 8 /// <summary> 9 /// 构造函数 10 /// </summary> 11 static RabbitMQHelper() 12 { 13 _connectionFactory = new ConnectionFactory(); 14 _connectionFactory.HostName = ConfigurationManager.AppSettings["HostName"].ToString(); 15 _connectionFactory.UserName = ConfigurationManager.AppSettings["UserName"].ToString(); 16 _connectionFactory.Password = ConfigurationManager.AppSettings["Password"].ToString(); 17 _connectionFactory.AutomaticRecoveryEnabled = true; 18 } 19 20 #region 单消息入队 21 /// <summary> 22 /// 单消息入队 23 /// </summary> 24 /// <param name="exchangeName">交换器名称</param> 25 /// <param name="exchangeType">交换器类型</param> 26 /// <param name="routingKey">路由关键字</param> 27 /// <param name="message">消息实例</param> 28 public static void Enqueue<TItem>(string exchangeName, string exchangeType, string routingKey, TItem message) 29 { 30 try 31 { 32 if (message != null) 33 { 34 using (IConnection connection = _connectionFactory.CreateConnection()) 35 { 36 using (IModel channel = connection.CreateModel()) 37 { 38 channel.ExchangeDeclare(exchangeName, exchangeType, true, false, null); 39 string messageString = JsonConvert.SerializeObject(message); 40 byte[] body = Encoding.UTF8.GetBytes(messageString); 41 var properties = channel.CreateBasicProperties(); 42 properties.Persistent = true; //使消息持久化 43 channel.BasicPublish(exchangeName, routingKey, properties, body); 44 } 45 } 46 } 47 } 48 catch (Exception ex) 49 { 50 throw new Exception(ex.Message); 51 } 52 } 53 #endregion 54 55 #region 消息批量入队 56 /// <summary> 57 /// 消息批量入队 58 /// </summary> 59 /// <param name="exchangeName">交换器名称</param> 60 /// <param name="exchangeType">交换器类型</param> 61 /// <param name="routingKey">路由关键字</param> 62 /// <param name="list">消息集合</param> 63 public static void Enqueue<TItem>(string exchangeName, string exchangeType, string routingKey, List<TItem> list) 64 { 65 try 66 { 67 if (list != null && list.Count > 0) 68 { 69 using (IConnection connection = _connectionFactory.CreateConnection()) 70 { 71 using (IModel channel = connection.CreateModel()) 72 { 73 foreach (TItem item in list) 74 { 75 if (item != null) 76 { 77 channel.ExchangeDeclare(exchangeName, exchangeType, true, false, null); 78 string messageString = JsonConvert.SerializeObject(item); 79 byte[] body = Encoding.UTF8.GetBytes(messageString); 80 var properties = channel.CreateBasicProperties();//使消息持久化 81 properties.Persistent = true; 82 channel.BasicPublish(exchangeName, routingKey, properties, body); 83 } 84 } 85 } 86 } 87 } 88 } 89 catch (Exception ex) 90 { 91 throw new Exception(ex.Message); 92 } 93 } 94 #endregion 95 96 #region 消费消息队列(旧的方式) 97 /// <summary> 98 /// 消费消息队列 99 /// </summary> 100 /// <typeparam name="TItem">消息对象</typeparam> 101 /// <param name="exchangeName">交换器名称</param> 102 /// <param name="exchangeType">交换器类型</param> 103 /// <param name="routingKey">路由关键字</param> 104 /// <param name="queueName">队列名称</param> 105 /// <param name="func">消费消息的具体操作</param> 106 /// <param name="tryTimes">消费失败后,继续尝试消费的次数</param> 107 108 public static void Consume<TItem>(string exchangeName, string exchangeType, string routingKey, string queueName, Func<TItem, bool> func, int tryTimes = 5) 109 { 110 try 111 { 112 int consumeCount = 0;//尝试消费次数 113 bool isConsumeSuccess = false;// 是否消费成功 114 using (IConnection connection = _connectionFactory.CreateConnection()) 115 { 116 using (IModel channel = connection.CreateModel()) 117 { 118 channel.ExchangeDeclare(exchangeName, exchangeType, true, false, null); 119 channel.QueueDeclare(queueName, true, false, false, null); 120 channel.QueueBind(queueName, exchangeName, routingKey); 121 QueueingBasicConsumer consumer = new QueueingBasicConsumer(channel); 122 channel.BasicConsume(queueName, false, consumer); 123 while (true) 124 { 125 var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue(); 126 var body = ea.Body; 127 var message = Encoding.UTF8.GetString(body); 128 TItem queueMessage = JsonConvert.DeserializeObject<TItem>(message); 129 if (queueMessage != null) 130 { 131 while (!isConsumeSuccess) 132 { 133 consumeCount++; 134 isConsumeSuccess = func(queueMessage); 135 if (isConsumeSuccess || consumeCount >= tryTimes) channel.BasicAck(ea.DeliveryTag, false);//将队列里面的消息进行释放 136 } 137 } 138 } 139 } 140 } 141 } 142 catch (Exception ex) 143 { 144 throw new Exception(ex.Message); 145 } 146 } 147 #endregion 148 149 #region 消费消息队列(新的方式) 150 /// <summary> 151 /// 消费消息队列 152 /// </summary> 153 /// <typeparam name="TItem">消息对象</typeparam> 154 /// <param name="exchangeName">交换器名称</param> 155 /// <param name="exchangeType">交换器类型</param> 156 /// <param name="routingKey">路由关键字</param> 157 /// <param name="queueName">队列名称</param> 158 /// <param name="func">消费消息的具体操作</param> 159 /// <param name="tryTimes">消费失败后,继续尝试消费的次数</param> 160 public static void NewConsume<TItem>(string exchangeName, string exchangeType, string routingKey, string queueName, Func<TItem, bool> func, int tryTimes = 5) 161 { 162 try 163 { 164 int consumeCount = 0;//尝试消费次数 165 bool isConsumeSuccess = false;// 是否消费成功 166 using (IConnection connection = _connectionFactory.CreateConnection()) 167 { 168 using (IModel channel = connection.CreateModel()) 169 { 170 channel.ExchangeDeclare(exchangeName, exchangeType, true, false, null); 171 channel.QueueDeclare(queueName, true, false, false, null); 172 channel.QueueBind(queueName, exchangeName, routingKey); 173 EventingBasicConsumer consumer = new EventingBasicConsumer(channel); 174 consumer.Received += (sender, eventArgs) => 175 { 176 byte[] body = eventArgs.Body; 177 if (body != null && body.Length > 0) 178 { 179 string message = Encoding.UTF8.GetString(body); 180 if (!string.IsNullOrWhiteSpace(message)) 181 { 182 TItem queueMessage = JsonConvert.DeserializeObject<TItem>(message); 183 if (queueMessage != null) 184 { 185 while (!isConsumeSuccess) 186 { 187 consumeCount++; 188 isConsumeSuccess = func(queueMessage); 189 if (isConsumeSuccess || consumeCount >= tryTimes) channel.BasicAck(eventArgs.DeliveryTag, false); 190 } 191 } 192 } 193 } 194 }; 195 channel.BasicConsume(queueName, false, consumer); 196 } 197 } 198 } 199 catch (Exception ex) 200 { 201 throw new Exception(ex.Message); 202 } 203 } 204 #endregion 205 }
备注:需要说明的是这里在对消费消息的方法进行封装的过程中使用了泛型委托,这样我们就只需要按照自己的业务需求,对消息进行处理了。
2、 fanout类型:简单的说适合这样的场景,一个生产者产生的消息,需要将该消息发送到多个消息队列,供多个消费者进行消费。这里,为了对该场景进行还原,所以新建了RabbitMQConsumer,RabbitMQConsumer1两个消费者。
生产者代码:
public Form1() { InitializeComponent(); } private void Producer_Click(object sender, EventArgs e) { RabbitMQHelper.Enqueue("developExchange", "fanout", "", new Developer() { Id = Guid.NewGuid(), Name = "nickdeng", Position = "开发" }); }
消费者1代码:
class Program { static void Main(string[] args) { RabbitMQHelper.Consume<Developer>("developExchange", "fanout", "", "developQueue", ConsumeMessage); } /// <summary> /// 消费消息 /// </summary> /// <param name="developer">处理对象</param> /// <returns>消费结果</returns> public static bool ConsumeMessage(Developer developer) { string message = JsonConvert.SerializeObject(developer); Console.Write(message); return true; } }
消费者2代码:
class Program { static void Main(string[] args) { RabbitMQHelper.Consume<Developer>("developExchange", "fanout", "", "developQueue1", ConsumeMessage); } /// <summary> /// 消费消息 /// </summary> /// <param name="developer">处理对象</param> /// <returns>消费结果</returns> public static bool ConsumeMessage(Developer developer) { string message = JsonConvert.SerializeObject(developer); Console.Write(message); return true; } }
消费者1与消费者2的代码,眨眼一看,不是一样的吗?仔细看会发现它们在的消息队列名称不一样,消费者1的队列名称是“developQueue”,消息者2的队列名称是“developQueue1”,因为这两个消息队列都与交换器“developExchange”进行了绑定,所以生产者产生的消息将被推送到这两个消息队列。运行代码,得到如下图结果:
3、direct类型:直译过来就是直接的意思,该类型适用于点对点的使用场景,生产者将消息发送到指定的消息队列:
生产者代码:
public Form1() { InitializeComponent(); } private void Producer_Click(object sender, EventArgs e) { RabbitMQHelper.Enqueue("developExchange1", "direct", "directkey", new Developer() { Id = Guid.NewGuid(), Name = "nickdeng", Position = "开发" }); }
消费者1代码:
static void Main(string[] args) { RabbitMQHelper.Consume<Developer>("developExchange1", "direct", "directkey", "developQueue", ConsumeMessage); } /// <summary> /// 消费消息 /// </summary> /// <param name="developer">处理对象</param> /// <returns>消费结果</returns> public static bool ConsumeMessage(Developer developer) { string message = JsonConvert.SerializeObject(developer); Console.Write(message); return true; }
消费者2代码:
static void Main(string[] args) { RabbitMQHelper.Consume<Developer>("developExchange1", "direct", "directkey1", "developQueue1", ConsumeMessage); } /// <summary> /// 消费消息 /// </summary> /// <param name="developer">处理对象</param> /// <returns>消费结果</returns> public static bool ConsumeMessage(Developer developer) { string message = JsonConvert.SerializeObject(developer); Console.Write(message); return true; }
生产者的路由关键字是“directkey”,消费者1的路由关键字为”directkey“,消费者2的路由关键字为”directkey1“,紧紧一字相差,生产者产生的消息就只有消费者1能够收到,运行代码得到如图结果:
至于topic类型,这里就不再以代码进行讲解了,其实大致使用方法与上面的direct类型相似,不同之处在于topic类型可通过路由关键字进行模糊匹配,将消息路由到相应队列,大家可根据自己的实际使用场景,进行类型的选择。
以上是关于RabbitMQ消息队列随笔的主要内容,如果未能解决你的问题,请参考以下文章
RabbitMQ学习笔记五:RabbitMQ之优先级消息队列
RabbitMQ:第二章:Spring整合RabbitMQ(简单模式,广播模式,路由模式,通配符模式,消息可靠性投递,防止消息丢失,TTL,死信队列,延迟队列,消息积压,消息幂等性)(代码