RabbitMQ C#客户端自动重连
Posted weschen
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ C#客户端自动重连相关的知识,希望对你有一定的参考价值。
重要参考文章来源:http://gigi.nullneuron.net/gigilabs/resilient-connections-with-rabbitmq-net-client/
原因是这样的,我在Windows客户端有一个Windows后台服务,负责与服务端的数据交互,数据上传及数据下载
1.数据上传部分是使用的rabbitmq donnet库发送消息至RabbittMQ服务器,服务器另外有一个应用程序会监控RabbitMQ服务器的指定队列,完成数据的上传服务
2.数据下载部分是使用的rabbitmq donnet库监控RabbitMQ服务器指定的队列,服务器应用程序将数据发送到指定的RabbitMQ服务器的队列中,客户端就会能获取到服务器应用发送过来的数据
这样的架构还是比较好用的,使用的效果目前还行,但遇到一个比较头痛的问题,Windows后台服务一直在Windows平板电脑上运行,除非手动安装及更新应用时,才会将Windows服务进行重新安装或重启,其他的情况是不会进行重启的
了解到RabbitMQ是有自动重连的技术的,可以参考地址:https://yq.aliyun.com/articles/369969
这个效果只作用于,服务器没有挂掉,只是中间有一些网络问题时才可以进行重连
但有一种情况是没有处理到的,我们已经在客户端对RabbitMQ某个队列进行监控,但服务器突然挂掉,然后几分钟后重新启动了,这时,客户端可以重新建立连接,但却不会自动对队列产生监控,无法拿到消息
现时对代码做出一些处理
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; using RabbitMQ.Client; using RabbitMQ.Client.Exceptions; using System.Threading; using RabbitMQ.Client.Events; using System.IO; namespace MES_MonitoringService.Common { public class RabbitMQClientHandler { private static string defaultRabbitMQHostName = Common.ConfigFileHandler.GetAppConfig("RabbitMQServerHostName"); private static string defaultRabbitMQPort = Common.ConfigFileHandler.GetAppConfig("RabbitMQServerPort"); private static string defaultRabbitMQUserName = Common.ConfigFileHandler.GetAppConfig("RabbitMQUserName"); private static string defaultRabbitMQPassword = Common.ConfigFileHandler.GetAppConfig("RabbitMQPassword"); private static string defaultRabbitVirtualHost = Common.ConfigFileHandler.GetAppConfig("RabbitMQVirtualHost"); // 定义一个静态变量来保存类的实例 private static RabbitMQClientHandler uniqueInstance; //定义一个标识确保线程同步 private static readonly object locker = new object(); /*-------------------------------------------------------------------------------------*/ //ConnectionFactory private static ConnectionFactory mc_ConnectionFactory = null; //Connection private static IConnection Connection; //发送频道及接收频道分开,避免互相影响,导致整个服务不可用 //Send Channel private static IModel SendChannel; //Listen Channel private static IModel ListenChannel; //数据监控队列 private static string MC_SyncDataConsume = string.Empty; // //private SyncDataHandler syncDataHandlerClass; /*-------------------------------------------------------------------------------------*/ /// <summary> /// 定义私有构造函数,使外界不能创建该类实例 /// </summary> public RabbitMQClientHandler() { Reconnect(); } /// <summary> /// 定义公有方法提供一个全局访问点,同时你也可以定义公有属性来提供全局访问点 /// </summary> /// <returns></returns> public static RabbitMQClientHandler GetInstance() { // 当第一个线程运行到这里时,此时会对locker对象 "加锁", // 当第二个线程运行该方法时,首先检测到locker对象为"加锁"状态,该线程就会挂起等待第一个线程解锁 // lock语句运行完之后(即线程运行完之后)会对该对象"解锁" // 双重锁定只需要一句判断就可以了 if (uniqueInstance == null) { lock (locker) { // 如果类的实例不存在则创建,否则直接返回 if (uniqueInstance == null) { uniqueInstance = new RabbitMQClientHandler(); } } } return uniqueInstance; } static void Connect() { try { Common.LogHandler.WriteLog("获取RabbitMQ服务器参数:" + defaultRabbitMQHostName + ":" + defaultRabbitMQPort + " (" + defaultRabbitMQUserName + "/" + defaultRabbitMQPassword + ")"); //连接工厂 mc_ConnectionFactory = new ConnectionFactory(); //连接工厂信息 mc_ConnectionFactory.HostName = defaultRabbitMQHostName;// "localhost"; int rabbitmq_port = 5672;// 默认是5672端口 int.TryParse(defaultRabbitMQPort, out rabbitmq_port); mc_ConnectionFactory.Port = rabbitmq_port;// "5672" mc_ConnectionFactory.UserName = defaultRabbitMQUserName;// "guest"; mc_ConnectionFactory.Password = defaultRabbitMQPassword;// "guest"; mc_ConnectionFactory.VirtualHost = defaultRabbitVirtualHost;// "/" mc_ConnectionFactory.RequestedHeartbeat = 30;//心跳包 mc_ConnectionFactory.AutomaticRecoveryEnabled = true;//自动重连 mc_ConnectionFactory.TopologyRecoveryEnabled = true;//拓扑重连 mc_ConnectionFactory.NetworkRecoveryInterval = TimeSpan.FromSeconds(10); //创建连接 Connection = mc_ConnectionFactory.CreateConnection(); //断开连接时,调用方法自动重连 Connection.ConnectionShutdown += Connection_ConnectionShutdown; //创建发送频道 SendChannel = Connection.CreateModel(); //创建接收频道 ListenChannel = Connection.CreateModel(); //发送频道确认模式,发送了消息后,可以收到回应 SendChannel.ConfirmSelect(); if(!string.IsNullOrEmpty(MC_SyncDataConsume)) { //重新监控消息 RabbitmqMessageConsume(MC_SyncDataConsume); } Common.LogHandler.WriteLog("尝试连接至RabbitMQ服务器:" + defaultRabbitMQHostName); } catch (RabbitMQ.Client.Exceptions.BrokerUnreachableException e) { throw e; } catch (Exception ex) { throw ex; } } static void Cleanup() { try { if (SendChannel != null && SendChannel.IsOpen) { try { SendChannel.Close(); } catch (Exception ex) { LogHandler.WriteLog("RabbitMQ重新连接,正在尝试关闭之前的Channel[发送],但遇到错误", ex); } SendChannel = null; } if (ListenChannel != null && ListenChannel.IsOpen) { try { ListenChannel.Close(); } catch (Exception ex) { LogHandler.WriteLog("RabbitMQ重新连接,正在尝试关闭之前的Channel[接收],但遇到错误", ex); } ListenChannel = null; } if (Connection != null && Connection.IsOpen) { try { Connection.Close(); } catch (Exception ex) { LogHandler.WriteLog("RabbitMQ重新连接,正在尝试关闭之前的连接,但遇到错误", ex); } Connection = null; } } catch (IOException ex) { throw ex; } } private static void Connection_ConnectionShutdown(object sender, ShutdownEventArgs e) { LogHandler.WriteLog("): RabbitMQ已经断开连接,正在尝试重新连接至RabbitMQ服务器"); Reconnect(); } private static void Reconnect() { try { //清除连接及频道 Cleanup(); var mres = new ManualResetEventSlim(false); // state is initially false while (!mres.Wait(3000)) // loop until state is true, checking every 3s { try { //连接 Connect(); mres.Set(); // state set to true - breaks out of loop } catch (Exception ex) { LogHandler.WriteLog("RabbitMQ尝试连接RabbitMQ服务器出现错误:" + ex.Message, ex); } } } catch (Exception ex) { LogHandler.WriteLog("RabbitMQ尝试重新连接RabbitMQ服务器出现错误:" + ex.Message, ex); } } /*-------------------------------------------------------------------------------------*/ /// <summary> /// Direct路由,发送消息至服务端 /// </summary> /// <param name="exchangeName">交换机名称</param> /// <param name="routingKey">RoutingKey</param> /// <param name="queueName">队列名称</param> /// <param name="message">消息内容</param> /// <returns></returns> public bool DirectExchangePublishMessageToServerAndWaitConfirm(string exchangeName, string routingKey, string queueName, string message) { try { if (Connection == null || !Connection.IsOpen) throw new Exception("连接为空或连接已经关闭"); if (SendChannel == null || !SendChannel.IsOpen) throw new Exception("通道为空或通道已经关闭"); //创建一个持久化的队列 bool queueDurable = true; string QueueName = queueName; string ExchangeName = exchangeName; string RoutingKey = routingKey; //声明交换机 SendChannel.ExchangeDeclare(ExchangeName, ExchangeType.Direct); //声明队列 SendChannel.QueueDeclare(QueueName, queueDurable, false, false, null); //路由绑定队列 SendChannel.QueueBind(QueueName, ExchangeName, RoutingKey, null); //设置消息持久性 IBasicProperties props = SendChannel.CreateBasicProperties(); props.ContentType = "text/plain"; props.DeliveryMode = 2;//持久性 //消息内容转码,并发送至服务器 var messageBody = System.Text.Encoding.UTF8.GetBytes(message); SendChannel.BasicPublish(ExchangeName, RoutingKey, props, messageBody); //等待确认 return SendChannel.WaitForConfirms(); } catch (Exception ex) { LogHandler.WriteLog("RabbitMQ出现通用问题" + ex.Message, ex); return false; } } /// <summary> /// Fanout路由,发送消息至服务端 /// </summary> /// <param name="exchangeName">交换机名称</param> /// <param name="routingKey">RoutingKey</param> /// <param name="queueName">队列名称</param> /// <param name="message">消息内容</param> /// <returns></returns> public bool FanoutExchangePublishMessageToServerAndWaitConfirm(string exchangeName, string routingKey, string queueName, string message) { try { if (Connection == null || !Connection.IsOpen) throw new Exception("连接为空或连接已经关闭"); if (SendChannel == null || !SendChannel.IsOpen) throw new Exception("通道为空或通道已经关闭"); //创建一个持久化的频道 bool queueDurable = true; string QueueName = queueName; string ExchangeName = exchangeName; string RoutingKey = routingKey; //声明交换机 SendChannel.ExchangeDeclare(ExchangeName, ExchangeType.Fanout); //声明队列 SendChannel.QueueDeclare(QueueName, queueDurable, false, false, null); //路由绑定队列 SendChannel.QueueBind(QueueName, ExchangeName, RoutingKey, null); //设置消息持久性 IBasicProperties props = SendChannel.CreateBasicProperties(); props.ContentType = "text/plain"; props.DeliveryMode = 2;//持久性 //消息内容转码,并发送至服务器 var messageBody = System.Text.Encoding.UTF8.GetBytes(message); SendChannel.BasicPublish(ExchangeName, RoutingKey, props, messageBody); //等待确认 return SendChannel.WaitForConfirms(); } catch (Exception ex) { LogHandler.WriteLog("RabbitMQ出现通用问题" + ex.Message, ex); return false; } } /// <summary> /// Topic路由,发送消息至服务端 /// </summary> /// <param name="exchangeName">交换机名称</param> /// <param name="routingKey">RoutingKey</param> /// <param name="queueName">队列名称</param> /// <param name="message">消息内容</param> /// <returns></returns> public bool TopicExchangePublishMessageToServerAndWaitConfirm(string exchangeName, string routingKey, string queueName, string message) { try { if (Connection == null || !Connection.IsOpen) throw new Exception("连接为空或连接已经关闭"); if (SendChannel == null || !SendChannel.IsOpen) throw new Exception("通道为空或通道已经关闭"); //创建一个持久化的频道 bool queueDurable = true; string QueueName = queueName; string ExchangeName = exchangeName; string RoutingKey = routingKey; //声明交换机 SendChannel.ExchangeDeclare(ExchangeName, ExchangeType.Topic); //声明队列 SendChannel.QueueDeclare(QueueName, queueDurable, false, false, null); //路由绑定队列 SendChannel.QueueBind(QueueName, ExchangeName, RoutingKey, null); //设置消息持久性 IBasicProperties props = SendChannel.CreateBasicProperties(); props.ContentType = "text/plain"; props.DeliveryMode = 2;//持久性 //消息内容转码,并发送至服务器 var messageBody = System.Text.Encoding.UTF8.GetBytes(message); SendChannel.BasicPublish(ExchangeName, RoutingKey, props, messageBody); //等待确认 return SendChannel.WaitForConfirms(); } catch (Exception ex) { LogHandler.WriteLog("RabbitMQ出现通用问题" + ex.Message, ex); return false; } } /// <summary> /// Topic路由,接收同步消息 /// </summary> /// <param name="queueName">监听的队列</param> public void SyncDataFromServer(string queueName) { try { //设定参数,方便重启RabbitMQ服务器时处理 MC_SyncDataConsume = queueName; RabbitmqMessageConsume(MC_SyncDataConsume); } catch (Exception ex) { LogHandler.WriteLog("TopicExchangeConsumeMessageFromServer运行错误:" + ex.Message, ex); throw ex; } } private static void RabbitmqMessageConsume(string queueName) { try { if (Connection == null || !Connection.IsOpen) throw new Exception("连接为空或连接已经关闭"); if (ListenChannel == null || !ListenChannel.IsOpen) throw new Exception("通道为空或通道已经关闭"); bool queueDurable = true; string QueueName = queueName; //在MQ上定义一个持久化队列,如果名称相同不会重复创建 ListenChannel.QueueDeclare(QueueName, queueDurable, false, false, null); //输入1,那如果接收一个消息,但是没有应答,则客户端不会收到下一个消息 ListenChannel.BasicQos(0, 1, false); //创建基于该队列的消费者,绑定事件 var consumer = new EventingBasicConsumer(ListenChannel); //回应消息监控 consumer.Received += SyncData_Received; //绑定消费者 ListenChannel.BasicConsume(QueueName, //队列名 false, //false:手动应答;true:自动应答 consumer); Common.LogHandler.WriteLog("开始监控RabbitMQ服务器,队列" + QueueName); } catch (Exception ex) { throw ex; } } private static void SyncData_Received(object sender, BasicDeliverEventArgs e) { try { //TOOD 验证程序退出后消费者是否退出去了 var body = e.Body; //消息主体 var message = Encoding.UTF8.GetString(body); LogHandler.WriteLog("[x] 队列接收到消息:" + message.ToString()); //处理数据 bool processSuccessFlag = new SyncDataHandler().ProcessSyncData(message); if (processSuccessFlag) { //回复确认 ListenChannel.BasicAck(e.DeliveryTag, false); } else { //未正常处理的消息,重新放回队列 ListenChannel.BasicReject(e.DeliveryTag, true); } } catch (RabbitMQ.Client.Exceptions.OperationInterruptedException ex1) { Thread.Sleep(5000); ListenChannel.BasicNack(e.DeliveryTag, false, true); } catch (Exception ex) { Thread.Sleep(5000); ListenChannel.BasicNack(e.DeliveryTag, false, true); } } } }
以上是关于RabbitMQ C#客户端自动重连的主要内容,如果未能解决你的问题,请参考以下文章