RabbitMqHelper 消息队列帮助类

Posted request

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMqHelper 消息队列帮助类相关的知识,希望对你有一定的参考价值。

using Newtonsoft.Json;
using RabbitMQ.Client;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace RabbitMQ_Send

class ConfigModel

public enum ExchangeTypeEnum

/// <summary>
/// 不处理路由键。你只需要简单的将队列绑定到交换机上。一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。
/// 很像子网广播,每台子网内的主机都获得了一份复制的消息。Fanout交换机转发消息是最快的。
/// </summary>
fanout = 1,

/// <summary>
/// 处理路由键。需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全匹配
/// 。这是一个完整的匹配。如果一个队列绑定到该交换机上要求路由键 “dog”,
/// 则只有被标记为“dog”的消息才被转发,不会转发dog.puppy,也不会转发dog.guard,只会转发dog。
/// </summary>
direct = 2,

/// <summary>
/// 将路由键和某模式进行匹配。此时队列需要绑定要一个模式上。
/// 符号“#”匹配一个或多个词,符号“*”匹配不多不少一个词。
/// 因此“audit.#”能够匹配到“audit.irs.corporate”,但是“audit.*” 只会匹配到“audit.irs”
/// </summary>
topic = 3,

header = 4


/// <summary>
/// 数据被执行后的处理方式
/// </summary>
public enum ProcessingResultsEnum

/// <summary>
/// 处理成功
/// </summary>
Accept,

/// <summary>
/// 可以重试的错误
/// </summary>
Retry,

/// <summary>
/// 无需重试的错误
/// </summary>
Reject,

/// <summary>
/// 消息队列的配置信息
/// </summary>
public class RabbitMqConfigModel

#region host
/// <summary>
/// 服务器IP地址
/// </summary>
public string IP get; set;

/// <summary>
/// 服务器端口,默认是 5672
/// </summary>
public int Port get; set;

/// <summary>
/// 登录用户名
/// </summary>
public string UserName get; set;

/// <summary>
/// 登录密码
/// </summary>
public string Password get; set;
/// <summary>
/// 虚拟主机名称
/// </summary>
public string VirtualHost get; set;
#endregion

#region Queue
/// <summary>
/// 队列名称
/// </summary>
public string QueueName get; set;

/// <summary>
/// 是否持久化该队列
/// </summary>
public bool DurableQueue get; set;
#endregion

#region exchange
/// <summary>
/// 路由名称
/// </summary>
public string ExchangeName get; set;

/// <summary>
/// 路由的类型枚举
/// </summary>
public ExchangeTypeEnum ExchangeType get; set;

/// <summary>
/// 路由的关键字
/// </summary>
public string RoutingKey get; set;

#endregion

#region message
/// <summary>
/// 是否持久化队列中的消息
/// </summary>
public bool DurableMessage get; set;
#endregion

/// <summary>
/// 基类
/// </summary>
public class BaseService

public static IConnection _connection;

/// <summary>
/// 服务器配置
/// </summary>
public RabbitMqConfigModel RabbitConfig get; set;


#region 构造函数
/// <summary>
/// 构造函数
/// </summary>
/// <param name="config"></param>
public BaseService(RabbitMqConfigModel config)

try

RabbitConfig = config;
CreateConn();

catch (Exception)

throw;


#endregion

#region 方法
#region 初始化
/// <summary>
/// 创建连接
/// </summary>
public void CreateConn()

ConnectionFactory cf = new ConnectionFactory();
cf.Port = RabbitConfig.Port; //服务器的端口
cf.Endpoint = new AmqpTcpEndpoint(new Uri("amqp://" + RabbitConfig.IP + "/")); //服务器ip
cf.UserName = RabbitConfig.UserName; //登录账户
cf.Password = RabbitConfig.Password; //登录账户
cf.VirtualHost = RabbitConfig.VirtualHost; //虚拟主机
cf.RequestedHeartbeat = 60; //虚拟主机

_connection = cf.CreateConnection();

#endregion

#region 发送消息
/// <summary>
/// 发送消息,泛型
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="message"></param>
/// <returns></returns>
public bool Send<T>(T messageInfo, ref string errMsg)

if (messageInfo == null)

errMsg = "消息对象不能为空";
return false;

string value = JsonConvert.SerializeObject(messageInfo);
return Send(value, ref errMsg);

/// <summary>
/// 发送消息,string类型
/// </summary>
/// <param name="message"></param>
/// <param name="errMsg"></param>
/// <returns></returns>
public bool Send(string message, ref string errMsg)

if (string.IsNullOrEmpty(message))

errMsg = "消息不能为空";
return false;

try

if (!_connection.IsOpen)

CreateConn();

using (var channel = _connection.CreateModel())

//推送消息
byte[] bytes = Encoding.UTF8.GetBytes(message);

IBasicProperties properties = channel.CreateBasicProperties();
properties.DeliveryMode = Convert.ToByte(RabbitConfig.DurableMessage ? 2 : 1); //支持可持久化数据

if (string.IsNullOrEmpty(RabbitConfig.ExchangeName))

//使用自定义的路由
channel.ExchangeDeclare(RabbitConfig.ExchangeName, RabbitConfig.ExchangeType.ToString(), RabbitConfig.DurableMessage, false, null);
channel.BasicPublish("", RabbitConfig.QueueName, properties, bytes);

else

//申明消息队列,且为可持久化的,如果队列的名称不存在,系统会自动创建,有的话不会覆盖
channel.QueueDeclare(RabbitConfig.QueueName, RabbitConfig.DurableQueue, false, false, null);
channel.BasicPublish(RabbitConfig.ExchangeName, RabbitConfig.RoutingKey, properties, bytes);

return true;


catch (Exception ex)

errMsg = ex.Message;
return false;


#endregion

public class RabbitBasicService : BaseService

/// <summary>
/// 构造函数
/// </summary>
/// <param name="config"></param>
public RabbitBasicService(RabbitMqConfigModel config)
: base(config)


/// <summary>
/// 接受消息,使用Action进行处理
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="method"></param>
public void Receive<T>(Func<string, bool> method)

try

using (var channel = _connection.CreateModel())

//申明队列
channel.QueueDeclare(RabbitConfig.QueueName, RabbitConfig.DurableQueue, false, false, null);
//使用路由
if (!string.IsNullOrEmpty(RabbitConfig.ExchangeName))

//申明路由
channel.ExchangeDeclare(RabbitConfig.ExchangeName, RabbitConfig.ExchangeType.ToString(), RabbitConfig.DurableQueue);
//队列和交换机绑定
channel.QueueBind(RabbitConfig.QueueName, RabbitConfig.ExchangeName, RabbitConfig.RoutingKey);

//输入1,那如果接收一个消息,但是没有应答,则客户端不会收到下一个消息
channel.BasicQos(0, 1, false);
//在队列上定义一个消费者
var customer = new QueueingBasicConsumer(channel);
//var customer = new EventingBasicConsumer (channel);

//消费队列,并设置应答模式为程序主动应答
channel.BasicConsume(RabbitConfig.QueueName, false, customer);

while (true)//timer

//阻塞函数,获取队列中的消息
ProcessingResultsEnum processingResult = ProcessingResultsEnum.Retry;
ulong deliveryTag = 0;
try

//Thread.Sleep(10);

var ea = customer.Queue.Dequeue();
deliveryTag = ea.DeliveryTag;
byte[] bytes = ea.Body;
string body = Encoding.UTF8.GetString(bytes);
// T info = JsonConvert.DeserializeObject<T>(body);
method(body);
processingResult = ProcessingResultsEnum.Accept;

catch (Exception ex)

processingResult = ProcessingResultsEnum.Reject; //系统无法处理的错误

finally

switch (processingResult)

case ProcessingResultsEnum.Accept:
//回复确认处理成功
channel.BasicAck(deliveryTag,
false);//处理单挑信息
break;
case ProcessingResultsEnum.Retry:
//发生错误了,但是还可以重新提交给队列重新分配
channel.BasicNack(deliveryTag, false, true);
break;
case ProcessingResultsEnum.Reject:
//发生严重错误,无法继续进行,这种情况应该写日志或者是发送消息通知管理员
channel.BasicNack(deliveryTag, false, false);
//写日志
break;




catch (Exception ex)




#endregion

以上是关于RabbitMqHelper 消息队列帮助类的主要内容,如果未能解决你的问题,请参考以下文章

ActiveMQ:一条待处理消息但队列为空

消息模型:主题和队列的区别

系统学习消息队列分享 怎样系统学习消息队列?

需要一个线程安全的异步消息队列

微服务中使用MQ——RabbitMQ

Laravel使用数据库队列给用户发送通知