RabbitMQ操作代码封装
Posted huangzelin
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ操作代码封装相关的知识,希望对你有一定的参考价值。
1、RabbitMqPublisher.cs (发送消息)
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; using RabbitMQ.Client; namespace X.RabbitMQ public class RabbitMqPublisher private readonly string _rabbitMqUri; /// <summary> /// 构造函数 /// </summary> /// <param name="rabbitMqUri">连接串,如 amqp://guest:[email protected]:5672/</param> public RabbitMqPublisher(string rabbitMqUri) this._rabbitMqUri = rabbitMqUri; /// <summary> /// 创建连接 /// </summary> private IConnection CreateConnection() var factory = new ConnectionFactory Uri = new Uri(_rabbitMqUri) ; return factory.CreateConnection(); /// <summary> /// 创建信道 /// </summary> private IModel CreateChannel(IConnection con, string exchangeName, string exchangeType, string queueName, string routeKey) var channel = con.CreateModel(); channel.ExchangeDeclare(exchangeName, exchangeType, true, false, null); if (!string.IsNullOrEmpty(queueName)) channel.QueueDeclare(queueName, true, false, false, null); //创建一个消息队列,用来存储消息 channel.QueueBind(queueName, exchangeName, routeKey, null); channel.BasicQos(0, 3, true); //在非自动确认消息的前提下,如果一定数目的消息(通过基于consume或者channel设置Qos的值)未被确认前,不进行消费新的消息 return channel; /// <summary> /// 发送ExchangeType类型为Direct的消息 /// </summary> /// <param name="exchangeName">交换机名称</param> /// <param name="routeKey">消息路由key</param> /// <param name="message">消息实体</param> /// <param name="queueName">缺省队列名(不存在则自动创建),设置后可避免消息发送后由于没有队列接收而丢失的问题</param> /// <returns></returns> public bool PublishDirectMessage(string exchangeName, string routeKey, string message, string queueName = "") return this.PublishMessage(exchangeName, ExchangeType.Direct, queueName, routeKey, new[] message ); /// <summary> /// 批量发送ExchangeType类型为Direct的消息 /// </summary> /// <param name="exchangeName">交换机名称</param> /// <param name="routeKey">消息路由key</param> /// <param name="messages">消息实体</param> /// <param name="queueName">缺省队列名(不存在则自动创建),设置后可避免消息发送后由于没有队列接收而丢失的问题</param> /// <returns></returns> public bool PublishDirectMessages(string exchangeName, string routeKey, IEnumerable<string> messages, string queueName = "") return this.PublishMessage(exchangeName, ExchangeType.Direct, queueName, routeKey, messages); /// <summary> /// 发送ExchangeType类型为Fanout的消息 /// </summary> /// <param name="exchangeName">交换机名称</param> /// <param name="message">消息实体</param> /// <param name="queueName">缺省队列名(不存在则自动创建),设置后可避免消息发送后由于没有队列接收而丢失的问题</param> /// <returns></returns> public bool PublishFanoutMessage(string exchangeName, string message, string queueName = "") return this.PublishMessage(exchangeName, ExchangeType.Fanout, queueName, "", new[] message ); /// <summary> /// 批量发送ExchangeType类型为Fanout的消息 /// </summary> /// <param name="exchangeName">交换机名称</param> /// <param name="messages">消息实体</param> /// <param name="queueName">缺省队列名(不存在则自动创建),设置后可避免消息发送后由于没有队列接收而丢失的问题</param> /// <returns></returns> public bool PublishFanoutMessages(string exchangeName, IEnumerable<string> messages, string queueName = "") return this.PublishMessage(exchangeName, ExchangeType.Fanout, queueName, "", messages); private bool PublishMessage(string exchangeName, string exchangeType, string queueName, string routeKey, IEnumerable<string> messages) using (var con = CreateConnection()) using (var channel = CreateChannel(con, exchangeName, exchangeType, queueName, routeKey)) channel.ConfirmSelect();//启用消息发送确认机制 foreach (var message in messages) var body = Encoding.UTF8.GetBytes(message); var properties = channel.CreateBasicProperties(); properties.Persistent = true; //使消息持久化 channel.BasicPublish(exchangeName, routeKey, properties, body); return channel.WaitForConfirms();
2、RabbitMqQuery.cs (队列消息拉取)
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; using RabbitMQ.Client; namespace X.RabbitMQ public class RabbitMqQuery private readonly string _rabbitMqUri; /// <summary> /// 构造函数 /// </summary> /// <param name="rabbitMqUri">连接串,如 amqp://guest:[email protected]:5672/</param> public RabbitMqQuery(string rabbitMqUri) this._rabbitMqUri = rabbitMqUri; /// <summary> /// 创建连接 /// </summary> private IConnection CreateConnection() var factory = new ConnectionFactory Uri = new Uri(_rabbitMqUri) ; return factory.CreateConnection(); /// <summary> /// 拉取队列中的数据 /// </summary> /// <param name="queueName">队列名</param> /// <returns></returns> public string GetMessage(string queueName) using (var con = this.CreateConnection()) var channel = con.CreateModel(); var rs = channel.BasicGet(queueName, true); if (rs != null) var body = rs.Body; return Encoding.UTF8.GetString(body); return ""; /// <summary> /// 批量拉取队列中的数据 /// </summary> /// <param name="queueName">队列名</param> /// <param name="queryCount">拉取数据的条数,默认为1</param> /// <returns></returns> public string[] GetMessages(string queueName, int queryCount = 1) if (queryCount <= 0) queryCount = 1; var msgLst = new List<string>(); using (var con = this.CreateConnection()) var channel = con.CreateModel(); for (int i = 0; i < queryCount; i++) var rs = channel.BasicGet(queueName, true); if (rs != null) var body = rs.Body; msgLst.Add(Encoding.UTF8.GetString(body)); else break; return msgLst.ToArray();
3、RabbitMqListener.cs (消息监听与推送)
using RabbitMQ.Client; using RabbitMQ.Client.Events; using System; using System.Collections.Generic; using System.IO; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; using RabbitMQ.Client.Exceptions; using X.Log; namespace X.RabbitMQ /// <summary> /// RabbitMq消息监听器 /// </summary> public class RabbitMqListener : IDisposable private ConnectionFactory _factory; private IConnection _con; private IModel _channel; private EventingBasicConsumer _consumer; private readonly string _rabbitMqUri; private readonly string _exchangeType; private readonly string _exchangeName; private readonly string _queueName; private readonly string _routeKey; private Func<string, bool> _messageHandler; /// <summary> /// 释放标记 /// </summary> private bool disposed; ~RabbitMqListener() Dispose(false); /// <summary> /// RabbitMQ消息监听器,若指定的队列不存在,则自动创建队列。并在消息交换机上绑定指定的消息路由规则(路由key) /// </summary> /// <param name="rabbitMqUri">连接串,如 amqp://guest:[email protected]:5672/</param> /// <param name="exchangeName">消息交换机</param> /// <param name="exchangeType">交换机类型,如 ExchangeType.Direct</param> /// <param name="queueName">要监听的队列</param> /// <param name="routeKey">消息路由key</param> public RabbitMqListener(string rabbitMqUri, string exchangeName, string exchangeType, string queueName, string routeKey = "") this._rabbitMqUri = rabbitMqUri; this._exchangeName = exchangeName; this._exchangeType = exchangeType; this._queueName = queueName; this._routeKey = routeKey; /// <summary> /// 创建连接 /// </summary> private void CreateConnection() _factory = new ConnectionFactory Uri = new Uri(_rabbitMqUri), RequestedHeartbeat = 20,//与服务器协商使用的心跳超时间隔(以秒为单位)。 AutomaticRecoveryEnabled = true,//开启网络异常重连机制 NetworkRecoveryInterval = TimeSpan.FromSeconds(10),//设置每10s重连一次网络 TopologyRecoveryEnabled = true //开启重连后恢复拓扑(交换,队列,绑定等等)。 ; _con = _factory.CreateConnection(); _con.ConnectionShutdown += (_sender, _e) => ReMessageListen();//掉线重新连接并监听队列消息 /// <summary> /// 创建信道 /// </summary> private void CreateChannel() _channel = _con.CreateModel(); _channel.ExchangeDeclare(_exchangeName, _exchangeType, true, false, null); _channel.QueueDeclare(_queueName, true, false, false, null); //创建一个消息队列,用来存储消息 _channel.QueueBind(_queueName, _exchangeName, _routeKey, null); _channel.BasicQos(0, 3, true); //在非自动确认消息的前提下,如果一定数目的消息(通过基于consume或者channel设置Qos的值)未被确认前,不进行消费新的消息 /// <summary> /// 监听队列消息 /// </summary> /// <param name="messageHandler">消息处理器,当监测到队列消息时回调该处理器</param> /// <returns>监听状态</returns> public bool MessageListen(Func<string, bool> messageHandler) try this.CreateConnection(); this.CreateChannel(); _consumer = new EventingBasicConsumer(_channel); //基于事件的消息推送方式 _consumer.Received += (_sender, _e) => string msg = Encoding.UTF8.GetString(_e.Body); if (messageHandler != null) this._messageHandler = messageHandler; try var isOk = this._messageHandler(msg); if (isOk) _channel.BasicAck(_e.DeliveryTag, false); catch (Exception ex) LoggerManager.ErrorLog.Error("消息处理器执行异常:" + ex.Message, ex); ; _channel.BasicConsume(_queueName, false, _consumer); //手动确认 return true; catch (Exception ex) LoggerManager.ErrorLog.Error("尝试监听队列消息出现错误:" + ex.Message, ex); return false; private void ReMessageListen() try //清除连接及频道 CleanupResource(); var mres = new ManualResetEventSlim(false); //初始化状态为false while (!mres.Wait(3000)) //每3秒监测一次状态,直到状态为true if (MessageListen(_messageHandler)) mres.Set(); //设置状态为true并跳出循环 catch (Exception ex) LoggerManager.ErrorLog.Error("尝试连接RabbitMQ服务器出现错误:" + ex.Message, ex); /// <summary> /// 清理资源 /// </summary> private void CleanupResource() if (_channel != null && _channel.IsOpen) try _channel.Close(); catch (Exception ex) LoggerManager.ErrorLog.Error("尝试关闭RabbitMQ信道遇到错误", ex); _channel = null; if (_con != null && _con.IsOpen) try _con.Close(); catch (Exception ex) LoggerManager.ErrorLog.Error("尝试关闭RabbitMQ连接遇到错误", ex); _con = null; protected virtual void Dispose(bool disposing) if (disposed) return; CleanupResource(); disposed = true; public void Dispose() Dispose(true); GC.SuppressFinalize(this);
4、RabbitMQ.Test(测试代码)
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; using RabbitMQ.Client; namespace X.RabbitMQ.Test class Program private const string RabbitHostUri = "amqp://guest:[email protected]:5672/"; private const string ExchangeName = "x.mq.test"; private const string QueueName = "x.mq.test.queue"; private const string RouteKey = "x.mq.test.key"; static void Main(string[] args) var publkisher = new X.RabbitMQ.RabbitMqPublisher(RabbitHostUri); var isOk = publkisher.PublishDirectMessage(ExchangeName, RouteKey, "hello..."); isOk = publkisher.PublishDirectMessage(ExchangeName, RouteKey, "ggggggg."); Console.WriteLine(isOk ? "is ok" : "is not ok"); Console.ReadKey(); var mqQuery = new X.RabbitMQ.RabbitMqQuery(RabbitHostUri); var ss = mqQuery.GetMessages(QueueName, 10); foreach (var s in ss) Console.WriteLine(s); Console.ReadKey(); using (var mqListener = new X.RabbitMQ.RabbitMqListener(RabbitHostUri, ExchangeName, "direct", QueueName, RouteKey)) mqListener.MessageListen(msg => Console.WriteLine(msg); return true; ); Console.WriteLine("按任意键退出程序..."); Console.ReadKey();
以上是关于RabbitMQ操作代码封装的主要内容,如果未能解决你的问题,请参考以下文章