C#RabbitMQ进阶指南
Posted JimCarter
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了C#RabbitMQ进阶指南相关的知识,希望对你有一定的参考价值。
文章目录
- 1. 消息何去何从
- 2. 过期时间设置(Time to Live, TTL)
- 3.死信队列(Dead-Letter-Exchange,DLX)
- 4. 延迟队列
- 5. 优先级队列
- 6. 持久化
- 7. 生产者确认
- 8. 消息的顺序性
- 9. 消息传输保障
1. 消息何去何从
细心的你应该会发现当发布消息时,BasicPublish
方法还有一个mandatory
参数:
void BasicPublish(string exchange, string routingKey, bool mandatory, IBasicProperties basicProperties, ReadOnlyMemory<byte> body);
在RabbitMQ 3.0之前的版本中还会有一个名为immediate
的参数,这两个参数的作用都是当消息传递不可达时,将这条消息返回给生产者。
二者的区别是:
mandatory
:为true时。当exchange无法根据消息类型或者routingKey找到一个符合条件的队列时,就将消息返回给生产者。使用方法:channel.BasicPublish("", "hello", true, null, body); channel.BasicReturn += (sender, e) => var body = e.Body; ;
immediate
:为true时。当exchange将消息发送到队列之前,检查发现队列上没有任何消费者时,就检查下一个符合要求的队列是否有消费者。如果所有符合要求的队列都没有消费者,则将此消息返回给生产者。这个参数会影响队列性能,所以后续已被移除。建议使用TTL和DLX的方法替代。
1.1 备选交换机(Alternate Exchange,AE)
作用:当不想设置mandatory
和immediate
且也不想让消息丢失时,就可以声明一个备选交换机。
在声明交换机时,通过添加alternate-exchange
参数使其成为一个备选交换机:
string normalExchangeName = "normalEx", backupExchangeName = "backupEx";
var arguments=new Dictionary<string,object>();
arguments.Add("alternate-exchange", backupExchangeName);
//声明normalEx交换机,且设置其备选交换机为backupEx
channel.ExchangeDeclare(normalExchangeName, ExchangeType.Direct, true, false, arguments);
//声明backupEx交换机
channel.ExchangeDeclare(backupExchangeName, ExchangeType.Fanout, true, false, null);
工作流程如下:
注意:
- 如果备选交换机没有绑定任何队列或者消息没有匹配到任何一个队列,则此消息会丢失,而且不抛出异常。
- 如果备选交换机和
mandatory
都设置了,则后者无效。
2. 过期时间设置(Time to Live, TTL)
可以分别设置消息的过期时间和队列的过期时间
2.1 设置消息的过期时间
可通过以下两种方法设置消息的过期时间:
- 通过队列的属性设置,这样该队列的所有消息都有同样的过期时间。
- 单独设置某个消息的过期时间。
如果以上两种方法都进行了设置,则消息的过期时间以最短的为准。当消息过期之后,就会变成“死信”(Dead Message),除非将消息放入死信队列,否则消费者无法再接收到此消息。
2.1.1 通过队列设置
声明队列时加入x-message-ttl
即可:
var arguments=new Dictionary<string,object>();
arguments.Add("x-message-ttl", 6000);//单位是毫秒
channel.QueueDeclare("name",true,false,false,arguments);
不设置ttl表示当前消息永远不会过期,设置ttl=0则表示除非此时可以将消息直接投递给消费者,否则该消息会被立即丢弃。
当消息过期后,会立即从队列中移除。
2.1.2 单独设置某个消息的ttl
publish时进行设置:
var props = channel.CreateBasicProperties();
props.DeliveryMode = 2;
props.Expiration = "6000";//6000毫秒
channel.BasicPublish("exchange", "routingkey", false, props, body);
当消息过期时间到时并不会立即从队列中移除,指导要投递给消费者前才判断消息是否过期应当被移除。
注意:
这里的移除逻辑与上一方法的移除逻辑并不同,原因是队列中过期消息肯定都在队列的头部,只需要从头按个进行扫描判断是否过期即可。而这种方法里的过期时间每个消息都不相同,如要进行移除势必要扫描整个队列。
2.2 设置队列的过期时间
声明队列时可以设置x-expires
参数以控制队列被自动删除前处于未使用状态的时间,未使用状态的定义是:队列上没有任何消费者,队列也没有被重新声明,并且在过期时间段内也没有调用过BasicGet
方法。
RabbitMQ会保证队列过期后会将其删除,但是不保证会立即删除。RabbitMQ重启之后,持久化队列的过期时间会被重新计算。
var arguments = new Dictionary<string, object>();
arguments.Add("x-expires", 6000);//单位毫秒,不可设置为0
channel.QueueDeclare("name", true, false, false, arguments);
3.死信队列(Dead-Letter-Exchange,DLX)
死信交换机上绑定的队列就叫死信队列。当一个消息变成死信之后就可以被重新发送到死信交换机上。消息是死信的条件是:
- 消息被拒绝(BasicReject或BasicNack)且requeue参数为false。
- 消息过期
- 队列达到最大长度
死信交换机与普通的交换机没什么区别,只是声明队列的时候多加了一个名为x-dead-letter-exchange
的参数,指定了其对应的死信交换机时哪一个。当改队列中存在死信时,RabbitMQ会自动将此消息重新发布到对应的死信交换机上。
//声明死信交换机(就是一普通交换机)
channel.ExchangeDeclare("dlx_name", ExchangeType.Direct);
var queueArgs = new Dictionary<string, object>();
//设置对应的死信交换机名为dlx_name
queueArgs.Add("x-dead-letter-exchange", "dlx_name");
//可选。指定死信一个新的routingKey,如果没设置则还沿用以前的。
queueArgs.Add("x-dead-letter-routing-key", "new_routing_key");
channel.QueueDeclare("queueName", false, false, false, queueArgs);
工作流程如下:
4. 延迟队列
延迟队列存储的是延迟消息,延迟消息是指消息发出之后并不想让消费者立即拿到消息而是等待特定时候之后,才让消费者拿到。使用场景主要有:
- 订单系统中,用户下单之后有30分钟的付款时间,超时之后才关闭订单。
- 用户希望用手机控制家里的设备在指定的时间进行工作。此时就可以将消息发送到延时队列,时间到了之后才会将消息推送给设备。
RabbitMQ本省并不直接支持延迟队列,当时你可以通过结合超时时间TTL+死信队列DLX来实现此功能。
5. 优先级队列
其实称作优先级消息更为贴切。本质是首先通过x-max-priority
来设置队列最大的优先级,然后publish时设置每条消息的优先级。
//首先设置队列最大的优先级
var queueArgs = new Dictionary<string, object>();
queueArgs.Add("x-max-priority", 10);
channel.QueueDeclare("name",true,false,false,queueArgs);
//其次设置每个消息的优先级
var props = channel.CreateBasicProperties();
props.Priority = 5;//默认是0,表示最低。最高是10,即队列的优先级
channel.BasicPublish("exchange", "routingkey", false, props, body);
优先级高的消息会被优先消费,但前提是消费速度赶不上生产速度。否则队列没有消息积压,优先消费也无从谈起。
6. 持久化
RabbitMQ的持久化有以下三个部分:
- 交换机的持久化:声明交换机时将durable参数设置为true。如果不持久化,当RabbitMQ服务重启之后,交换机的元数据就会丢失,但消息不会丢失,后续无法将消息继续发送到这交换机中了。对一个长期使用的交换机来说建议持久化。
- 队列的持久化:声明队列时将durable参数设置为true。否则当RabbitMQ重启时,队列的元数据将会丢失,队列里的数据也会丢失。虽然队列持久化之后,可以保证队列的元数据不丢失,但不能保证队列的数据也不丢失。
- 消息数据持久化:参考上面的案例,publish时将
props.DeliveryMode = 2;
即可实现消息的持久化
综上,我们可以得出结论,如果想保证消息不丢失,需要设置队列+消息的持久化。二者任何一个没有设置,RabbitMQ重启之后消息都会丢失。
但如果三者都设置了持久化,就一定能保证消息不丢失吗?答案是不一定,原因有以下两点:
- 如果是自动ack,则消费者可能还没正确处理完消息自己就挂了,该消息没有得到有效处理,相当于丢失。
- 持久化的消息存入RabbitMQ之后,还需要一段很短的时间才能存入磁盘。在这个时间段内如果RabbitMQ挂了,也会导致消息丢失。
解决办法主要有两种:
- 使用镜像队列机制:生产上一般都会配。当master节点挂掉后,会自动切换到slave节点。虽然无法完全保证消息不丢失,但是可靠性会高很多。
- 引入生产者确认机制。
7. 生产者确认
生产者确认用来检测消息有没有正确发送到MQ(发送到MQ的交换机中)。提供了两种确认方案:
- 事务机制:性能不好
- 发送方确认机制:轻量
7.1 事务机制
事务只要设涉及到以下三个方法:
channel.TxSelect()
: 设置当前的channel为事务模式channel.TxCommit()
:提交事务,如果事务提交成功,则消息一定到达了MQ。channel.TxRollback()
:回滚事务
使用方法如下:
try
channel.TxSelect();
for(int i=0;i<10;i++)
//发送10个消息
channel.BasicPublish();
channel.TxCommit();
catch
channel.TxRollback();
提交事务对应的AMQP协议流转如下:
回滚事务对应的AMQP协议流转如下:
7.2 发送方确认机制
通过将channle设置为confirm模式来实现。设置完成之后,该信道上发布的消息都会被指派一个唯一ID,一旦消息被确认投递到匹配的队列中,MQ就会发送一个BasicAck给生产者(包含了唯一ID)。如果消息和队列时持久化的,那么当消息被写入磁盘之后,MQ才会发送BasicAck.
事务机制每次发送一条消息都需要等待MQ回应之后,才能发送下一条。发送方确认支持异步,在发送一条消息之后,在等待MQ回应的同时可以继续发布,生产者通过回调来确认消息是否发送成功。
事务机制与发送方确认机制二者互斥,同时设置会抛出异常。
如果channel不卡其confirm模式,而直接调用
WaitForConfirmsOrDie
等方法时,会抛出异常。
7.2.1 发布一条消息就等待一次确认结果
发布一条消息之后阻塞并等待确认结果
channel.ConfirmSelect();//设置为confirm模式
while (ThereAreMessagesToPublish())
byte[] body = ...;
BasicProperties properties = ...;
channel.BasicPublish(exchange, queue, properties, body);
//5s超时
channel.WaitForConfirmsOrDie(new TimeSpan(0, 0, 5));
这里我们发布了一条消息,并用WaitForConfirmsOrDie
方法阻塞当前线程并等待结果,当收到结果之后此方法会立即返回。并设置了5s的超时时间,所以这里的等待时间最多就是5s。如果消息在指定的超时时间内没有得到确认,这个方法会抛出一个异常。通过捕获这个异常可以进行重试。
缺点也很明显:因为有等待需要阻塞线程,所以并发量并不大。
7.2.2 批量发布N条消息等待一次确认结果
var batchSize = 100;
var outstandingMessageCount = 0;
channel.ConfirmSelect();//设置为confirm模式
while (ThereAreMessagesToPublish())
byte[] body = ...;
BasicProperties properties = ...;
channel.BasicPublish(exchange, queue, properties, body);
outstandingMessageCount++;
if (outstandingMessageCount == batchSize)
channel.WaitForConfirmsOrDie(new TimeSpan(0, 0, 5));
outstandingMessageCount = 0;
if (outstandingMessageCount > 0)
channel.WaitForConfirmsOrDie(new TimeSpan(0, 0, 5));
相比于发布一条等待一次来说,批量发布等待提高了系统的并发处理能力。但是缺点是这个等待仍然会阻塞线程而且发布失败后我们并不知道是哪条消息失败了,重新重新把这一批消息再发送一遍,导致性能下降。
7.2.3 异步发布确认
通过回调实现:
var channel = connection.CreateModel();
channel.ConfirmSelect();
channel.BasicAcks += (sender, args) =>
//已确认
;
channel.BasicNacks += (sender, args) =>
//未确认
;
args
对象有两个属性:
- delivery tag:publish的消息的序号(SN)。
- multiple: 为false时表示只有1条消息得到了确认/未确认。为true时表示所有的消息得到了确认或未确认。
8. 消息的顺序性
指消费者消费到的消息和发送者发送的消息的顺序是一致的。目前有很多资料都说RabbitMQ能够保证消息的顺序性,其实这只是理想的情况下,即:
- 不使用MQ的高级特性,如:优先级队列,TTL,死信队列
- 没有消息丢失
- 只有一个消费者
- 只有一个生产者,否则无法确定消息达到MQ的顺序。
为了保证消息的顺序性,需要业务代码上添加全局有序标识(Sequence ID)来实现。
9. 消息传输保障
一般消息中间件的消息传输保障分为三个层级:
- 最多一次:即0次或1次,消息可能会丢失,但是决定不重复。
- 最少一次:即1次或N次,消息绝不丢失,但是可能重复。
需要开启事务或者发送方确认机制+生产者使用mandatory
参数或者备选交换机+队列和数据都持久化+手动ack实现。 - 恰好一次:消息有且仅传输一次。大多数消息中间件都不支持,RabbitMQ也不支持,需要业务系统去实现,如业务消息本身具有幂等性、借助Redis
以上是关于C#RabbitMQ进阶指南的主要内容,如果未能解决你的问题,请参考以下文章