RabbitMQ 消费者断线重连
Posted lee576
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ 消费者断线重连相关的知识,希望对你有一定的参考价值。
虽然RabbitMQ.Client 库有心跳机制,有断线重连机制,但是在网络断掉的时候并不能重连,下面的代码就是解决这个问题,经本人测试有效,适合作为挂机程序
using Newtonsoft.Json;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using RabbitMQ.Client.Exceptions;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace HenryMes.RabbitMQ.Consumer
class Program
/// <summary>
/// RabbitMQ 服务器IP
/// </summary>
private static string ServerIp => string.IsNullOrEmpty(Com.DotNet.Utilities.IO.Config.Select("RabbitMQInfo", "HostIp")) ? "127.0.0.1" :
Com.DotNet.Utilities.IO.Config.Select("RabbitMQInfo", "HostIp");
/// <summary>
/// RabbitMQ 服务器端口, 默认 5672, 网页监控页面是 15672
/// </summary>
private static string ServerPort => string.IsNullOrEmpty(Com.DotNet.Utilities.IO.Config.Select("RabbitMQInfo", "HostPort")) ? "5672" :
Com.DotNet.Utilities.IO.Config.Select("RabbitMQInfo", "HostPort");
/// <summary>
/// RabbitMQ 用户名, 默认 guest
/// </summary>
private static string UserName => string.IsNullOrEmpty(Com.DotNet.Utilities.IO.Config.Select("RabbitMQInfo", "UserName")) ? "guest" :
Com.DotNet.Utilities.IO.Config.Select("RabbitMQInfo", "UserName");
/// <summary>
/// RabbitMQ 密码, 默认 guest
/// </summary>
private static string Password => string.IsNullOrEmpty(Com.DotNet.Utilities.IO.Config.Select("RabbitMQInfo", "Password")) ? "guest" :
Com.DotNet.Utilities.IO.Config.Select("RabbitMQInfo", "Password");
public static string ChannelName => string.IsNullOrEmpty(Com.DotNet.Utilities.IO.Config.Select("RabbitMQInfo", "Channel")) ? "tags_queue" :
Com.DotNet.Utilities.IO.Config.Select("RabbitMQInfo", "Channel");
/// <summary>
/// Main entry point to the RabbitMQ .NET AMQP client API. Constructs RabbitMQ.Client.IConnection instances.
/// </summary>
private static ConnectionFactory _factory;
private static readonly object Sync = new object();
/// <summary>
/// Main interface to an AMQP connection.
/// </summary>
private static IConnection _conn;
public static IModel _model;
static void Connect()
try
//连接工厂
_factory = new ConnectionFactory();
//连接工厂信息
_factory.HostName = ServerIp;// "localhost";
int rabbitmq_port = 5672; // 默认是5672端口
int.TryParse(ServerPort, out rabbitmq_port);
_factory.Port = rabbitmq_port;// "5672"
_factory.UserName = UserName;
_factory.Password = Password;
_factory.VirtualHost = "/";
_factory.RequestedHeartbeat = TimeSpan.FromSeconds(2);//心跳包
_factory.AutomaticRecoveryEnabled = true;//自动重连
_factory.TopologyRecoveryEnabled = true;//拓扑重连
_factory.NetworkRecoveryInterval = TimeSpan.FromSeconds(10);
//创建连接
_conn = _factory.CreateConnection();
//断开连接时,调用方法自动重连
_conn.ConnectionShutdown += Connection_ConnectionShutdown;
//创建接收频道
_model = _conn.CreateModel();
// 监控消息
RabbitmqMessageConsume();
Console.WriteLine("尝试连接至RabbitMQ服务器:" + ServerIp);
catch (BrokerUnreachableException e)
throw e;
catch (Exception ex)
throw ex;
private static void Connection_ConnectionShutdown(object sender, ShutdownEventArgs e)
Console.WriteLine("RabbitMQ已经断开连接,正在尝试重新连接至RabbitMQ服务器");
Reconnect();
private static void Reconnect()
try
//清除连接及频道
Cleanup();
var mres = new ManualResetEventSlim(false); // state is initially false
while (!mres.Wait(3000)) // loop until state is true, checking every 3s
try
//连接
Connect();
mres.Set(); // state set to true - breaks out of loop
catch (Exception ex)
Console.WriteLine("RabbitMQ尝试连接RabbitMQ服务器出现错误:" + ex.Message, ex);
catch (Exception ex)
Console.WriteLine("RabbitMQ尝试重新连接RabbitMQ服务器出现错误:" + ex.Message, ex);
static void Cleanup()
try
if (_model != null && _model.IsOpen)
try
_model.Close();
catch (Exception ex)
Console.WriteLine("RabbitMQ重新连接,正在尝试关闭之前的Channel[接收],但遇到错误", ex);
_model = null;
if (_conn != null && _conn.IsOpen)
try
_conn.Close();
catch (Exception ex)
Console.WriteLine("RabbitMQ重新连接,正在尝试关闭之前的连接,但遇到错误", ex);
_conn = null;
// 重置OPC连接
if (OpcManager.GetInstance.Instance != null)
OpcManager.GetInstance.Instance = null;
catch (IOException ex)
throw ex;
private static void RabbitmqMessageConsume()
try
if (_conn == null || !_conn.IsOpen) throw new Exception("连接为空或连接已经关闭");
if (_model == null || !_model.IsOpen) throw new Exception("通道为空或通道已经关闭");
bool queueDurable = true;
//在MQ上定义一个持久化队列,如果名称相同不会重复创建
_model.QueueDeclare(ChannelName, queueDurable, false, false, null);
//输入1,那如果接收一个消息,但是没有应答,则客户端不会收到下一个消息
_model.BasicQos(0, 1, true);
//创建基于该队列的消费者,绑定事件
var consumer = new EventingBasicConsumer(_model);
//回应消息监控
consumer.Received += SyncData_Received;
//绑定消费者
_model.BasicConsume(ChannelName, //队列名
false, //false:手动应答;true:自动应答
consumer);
Console.WriteLine("开始监控RabbitMQ服务器,队列" + ChannelName);
catch (AggregateException ae)
//错误信息去重
var errorList = (from error in ae.InnerExceptions select error.Message).Distinct().ToList();
//打印所有错误信息
foreach (string error in errorList)
Console.WriteLine(error);
catch (Exception ex)
throw ex;
private static void SyncData_Received(object sender, BasicDeliverEventArgs e)
var watch = new Stopwatch();
watch.Start();
var body = e.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.Write("尝试读取OPC变量: ");
//var obj = JsonConvert.DeserializeObject<string[]>(message);
//var tags = HenryMes.OpcManager.GetInstance.Instance.Read<string>(obj);
//每页条数
var nodes = JsonConvert.DeserializeObject<string[]>(message).ToList();
const int pageSize = 500;
//页码 0也就是第一条
int pageNum = 0;
var tasks = new List<Task>();
while (pageNum * pageSize < nodes.Count)
var pageNodes = nodes.Skip(pageNum * pageSize).Take(pageSize).Select(t => t).ToArray();
tasks.Add(Task.Factory.StartNew(() =>
var tags = OpcManager.GetInstance.Instance.Read<string>(pageNodes);
));
pageNum++;
Task.WaitAll(tasks.ToArray());
// 确认收到
_model.BasicAck(e.DeliveryTag, false);
watch.Stop();
TimeSpan timeSpan = watch.Elapsed;
Console.WriteLine("处理耗时0ms.", watch.ElapsedMilliseconds);
static void Main(string[] args)
Reconnect();
Console.ReadKey();
以上是关于RabbitMQ 消费者断线重连的主要内容,如果未能解决你的问题,请参考以下文章