RabbitMQ 消费者断线重连

Posted lee576

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ 消费者断线重连相关的知识,希望对你有一定的参考价值。

 虽然RabbitMQ.Client 库有心跳机制,有断线重连机制,但是在网络断掉的时候并不能重连,下面的代码就是解决这个问题,经本人测试有效,适合作为挂机程序

using Newtonsoft.Json;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using RabbitMQ.Client.Exceptions;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace HenryMes.RabbitMQ.Consumer

    class Program
    
        /// <summary>
        /// RabbitMQ 服务器IP
        /// </summary>
        private static string ServerIp => string.IsNullOrEmpty(Com.DotNet.Utilities.IO.Config.Select("RabbitMQInfo", "HostIp")) ? "127.0.0.1" :
            Com.DotNet.Utilities.IO.Config.Select("RabbitMQInfo", "HostIp");

        /// <summary>
        /// RabbitMQ 服务器端口, 默认 5672, 网页监控页面是 15672
        /// </summary>
        private static string ServerPort => string.IsNullOrEmpty(Com.DotNet.Utilities.IO.Config.Select("RabbitMQInfo", "HostPort")) ? "5672" :
            Com.DotNet.Utilities.IO.Config.Select("RabbitMQInfo", "HostPort");

        /// <summary>
        /// RabbitMQ 用户名, 默认 guest
        /// </summary>
        private static string UserName => string.IsNullOrEmpty(Com.DotNet.Utilities.IO.Config.Select("RabbitMQInfo", "UserName")) ? "guest" :
            Com.DotNet.Utilities.IO.Config.Select("RabbitMQInfo", "UserName");

        /// <summary>
        /// RabbitMQ 密码, 默认 guest
        /// </summary>
        private static string Password => string.IsNullOrEmpty(Com.DotNet.Utilities.IO.Config.Select("RabbitMQInfo", "Password")) ? "guest" :
            Com.DotNet.Utilities.IO.Config.Select("RabbitMQInfo", "Password");

        public static string ChannelName => string.IsNullOrEmpty(Com.DotNet.Utilities.IO.Config.Select("RabbitMQInfo", "Channel")) ? "tags_queue" :
            Com.DotNet.Utilities.IO.Config.Select("RabbitMQInfo", "Channel");

        /// <summary>
        /// Main entry point to the RabbitMQ .NET AMQP client API. Constructs RabbitMQ.Client.IConnection instances.
        /// </summary>
        private static ConnectionFactory _factory;

        private static readonly object Sync = new object();

        /// <summary>
        /// Main interface to an AMQP connection.
        /// </summary>
        private static IConnection _conn;

        public static IModel _model;

        static void Connect()
        
            try
            
                //连接工厂
                _factory = new ConnectionFactory();

                //连接工厂信息
                _factory.HostName = ServerIp;// "localhost";

                int rabbitmq_port = 5672;    // 默认是5672端口
                int.TryParse(ServerPort, out rabbitmq_port);
                _factory.Port = rabbitmq_port;// "5672"

                _factory.UserName = UserName;
                _factory.Password = Password;
                _factory.VirtualHost = "/";

                _factory.RequestedHeartbeat = TimeSpan.FromSeconds(2);//心跳包
                _factory.AutomaticRecoveryEnabled = true;//自动重连
                _factory.TopologyRecoveryEnabled = true;//拓扑重连
                _factory.NetworkRecoveryInterval = TimeSpan.FromSeconds(10);

                //创建连接
                _conn = _factory.CreateConnection();

                //断开连接时,调用方法自动重连
                _conn.ConnectionShutdown += Connection_ConnectionShutdown;

                //创建接收频道
                _model = _conn.CreateModel();

                // 监控消息
                RabbitmqMessageConsume();

                Console.WriteLine("尝试连接至RabbitMQ服务器:" + ServerIp);
            
            catch (BrokerUnreachableException e)
            
                throw e;
            
            catch (Exception ex)
            
                throw ex;
            
        

        private static void Connection_ConnectionShutdown(object sender, ShutdownEventArgs e)
        
            Console.WriteLine("RabbitMQ已经断开连接,正在尝试重新连接至RabbitMQ服务器");
            Reconnect();
        

        private static void Reconnect()
        
            try
            
                //清除连接及频道
                Cleanup();
                var mres = new ManualResetEventSlim(false); // state is initially false
                while (!mres.Wait(3000)) // loop until state is true, checking every 3s
                
                    try
                    
                        //连接
                        Connect();
                        mres.Set(); // state set to true - breaks out of loop
                    
                    catch (Exception ex)
                    
                        Console.WriteLine("RabbitMQ尝试连接RabbitMQ服务器出现错误:" + ex.Message, ex);
                    
                
            
            catch (Exception ex)
            
                Console.WriteLine("RabbitMQ尝试重新连接RabbitMQ服务器出现错误:" + ex.Message, ex);
            
        

        static void Cleanup()
        
            try
            
                if (_model != null && _model.IsOpen)
                
                    try
                    
                        _model.Close();
                    
                    catch (Exception ex)
                    
                        Console.WriteLine("RabbitMQ重新连接,正在尝试关闭之前的Channel[接收],但遇到错误", ex);
                    
                    _model = null;
                

                if (_conn != null && _conn.IsOpen)
                
                    try
                    
                        _conn.Close();
                    
                    catch (Exception ex)
                    
                        Console.WriteLine("RabbitMQ重新连接,正在尝试关闭之前的连接,但遇到错误", ex);
                    
                    _conn = null;
                
                // 重置OPC连接
                if (OpcManager.GetInstance.Instance != null)
                
                    OpcManager.GetInstance.Instance = null;
                
            
            catch (IOException ex)
            
                throw ex;
            
        

        private static void RabbitmqMessageConsume()
        
            try
            
                if (_conn == null || !_conn.IsOpen) throw new Exception("连接为空或连接已经关闭");
                if (_model == null || !_model.IsOpen) throw new Exception("通道为空或通道已经关闭");


                bool queueDurable = true;

                //在MQ上定义一个持久化队列,如果名称相同不会重复创建
                _model.QueueDeclare(ChannelName, queueDurable, false, false, null);
                //输入1,那如果接收一个消息,但是没有应答,则客户端不会收到下一个消息
                _model.BasicQos(0, 1, true);

                //创建基于该队列的消费者,绑定事件
                var consumer = new EventingBasicConsumer(_model);

                //回应消息监控
                consumer.Received += SyncData_Received;

                //绑定消费者
                _model.BasicConsume(ChannelName, //队列名
                                      false,     //false:手动应答;true:自动应答
                                      consumer);

                Console.WriteLine("开始监控RabbitMQ服务器,队列" + ChannelName);

            
            catch (AggregateException ae)
            
                //错误信息去重
                var errorList = (from error in ae.InnerExceptions select error.Message).Distinct().ToList();
                //打印所有错误信息
                foreach (string error in errorList)
                
                    Console.WriteLine(error);
                
            
            catch (Exception ex)
            
                throw ex;
            
        

        private static void SyncData_Received(object sender, BasicDeliverEventArgs e)
        
            var watch = new Stopwatch();
            watch.Start();

            var body = e.Body.ToArray();
            var message = Encoding.UTF8.GetString(body);

            Console.Write("尝试读取OPC变量: ");

            //var obj = JsonConvert.DeserializeObject<string[]>(message);
            //var tags = HenryMes.OpcManager.GetInstance.Instance.Read<string>(obj);

            //每页条数   
            var nodes = JsonConvert.DeserializeObject<string[]>(message).ToList();
            const int pageSize = 500;
            //页码 0也就是第一条 
            int pageNum = 0;
            var tasks = new List<Task>();
            while (pageNum * pageSize < nodes.Count)
            
                var pageNodes = nodes.Skip(pageNum * pageSize).Take(pageSize).Select(t => t).ToArray();
                tasks.Add(Task.Factory.StartNew(() =>
                
                    var tags = OpcManager.GetInstance.Instance.Read<string>(pageNodes);
                ));
                pageNum++;
            
            Task.WaitAll(tasks.ToArray());

            // 确认收到
            _model.BasicAck(e.DeliveryTag, false);
            watch.Stop();
            TimeSpan timeSpan = watch.Elapsed;
            Console.WriteLine("处理耗时0ms.", watch.ElapsedMilliseconds);
        

        static void Main(string[] args)
        
            Reconnect();
            Console.ReadKey();
        
    



以上是关于RabbitMQ 消费者断线重连的主要内容,如果未能解决你的问题,请参考以下文章

netty的断线重连问题

netty 心跳包和断线重连机制

unidac 断线重连

WebSocket 断线重连引入心跳的原因

NodeMCU入门:断线自动重连,指示灯显示连接状态

websocket 断线重连