RabbitMQ——RabbitMQ的高级特性(TTL死信队列延迟队列优先级队列RPC)

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ——RabbitMQ的高级特性(TTL死信队列延迟队列优先级队列RPC)相关的知识,希望对你有一定的参考价值。


摘要

本博文将介绍数据可靠性的一些细节,并展示RabbitMQ的几种已具备或衍生的高级特性,包括TTL、死信队列、延迟队列、优先级队列、RPC等,这些功能在实际使用中可以让某些应用的实现变得事半功倍。同时对源码进行详细的分析。

RabbitMQ的消息工作流

mandatory 和immediate 是channel.basicPublish 方法中的两个参数,它们都有当消息传递过程中不可达目的地时将消息返回给生产者的功能。RabbitMQ 提供的备份交换器(Altemate Exchange) 可以将未能被交换器路由的消息(没有绑定队列或者没有匹配的绑定)存储起来,而不用返回给客户端。

mandatory 参数

  • 当mandatory 参数设为true 时,交换器无法根据自身的类型和路由键找到一个符合条件的队列,那么RabbitMQ 会调用Basic.Return 命令将消息返回给生产者。
  • 当mandatory 参数设置为false 时,出现上述情形,则消息直接被丢弃。

那么生产者如何获取到没有被正确路由到合适队列的消息呢?这时候可以通过调用channel . addReturnListener 来添加ReturnListener 监昕器实现。

RabbitMQ——RabbitMQ的高级特性(TTL、死信队列、延迟队列、优先级队列、RPC)_优先级

上面代码中生产者没有成功地将消息路由到队列,此时RabbitMQ 会通过Basic.Return,返回" mandatory test " 这条消息,之后生产者客户端通过ReturnListener 监昕到了这个事
件,上面代码的最后输出应该是" Basic. Retum 返回的结果是: mandatory test "。

RabbitMQ——RabbitMQ的高级特性(TTL、死信队列、延迟队列、优先级队列、RPC)_rabbitmq_02

immediate 参数

当imrnediate 参数设为true 时,如果交换器在将消息路由到队列时发现队列上并不存在任何消费者,那么这条消息将不会存入队列中。当与路由键匹配的所有队列都没有消费者时,
该消息会通过Basic.Return 返回至生产者。概括来说, ma口datory 参数告诉服务器至少将该消息路由到一个队列中, 否则将消息返回给生产者。imrnediate 参数告诉服务器, 如果该消息关联的队列上有消费者, 则立刻投递:如果所有匹配的队列上都没有消费者,则直接将消息返还给生产者, 不用将消息存入队列而等待消费者了。RabbitMQ 3 .0 版本开始去掉了对imrnediate 参数的支持,对此RabbitMQ 官方解释是:imrnediate 参数会影响镜像队列的性能, 增加了代码复杂性, 建议采用TTL 和DLX 的方法替代。

RabbitMQ的备份交换机

备份交换器,英文名称为Altemate Exchange ,简称庙,或者更直白地称之为"备胎交换器"。生产者在发送消息的时候如果不设置mandatory 参数, 那么消息在未被路由的情况下将会丢失:如果设置了mandatory 参数,那么需要添加ReturnListener 的编程逻辑,生产者的代码将变得复杂。如果既不想复杂化生产者的编程逻辑,又不想消息丢失,那么可以使用备份交换器,这样可以将未被路由的消息存储在RabbitMQ 中,再在需要的时候去处理这些消息。可以通过在声明交换器(调用channel.exchangeDeclare 方法)的时候添加alternate-exchange 参数来实现,也可以通过策略(Policy)的方式实现。如果两者同时使用,则前者的优先级更高,会覆盖掉Policy 的设置。

实例:如果此时发送一条消息到nonnalExchange 上,当路由键等于" nonnalKey" 的时候,消息能正确路由到nonnalQueue 这个队列中。如果路由键设为其他值,比如"errorKey "即消息不能被正确地路由到与nonnallixchange 绑定的任何队列上,此时就会发送给myAe ,进而发送到unroutedQueue 这个队列。

RabbitMQ——RabbitMQ的高级特性(TTL、死信队列、延迟队列、优先级队列、RPC)_rabbitmq_03

同样,如果采用Policy 的方式来设置备份交换器,可以参考如下:

rabbitmqctl set_policy AE " ^norma lExchange ♀"、"alternate-exchange": "myAE"

备份交换器其实和普通的交换器没有太大的区别,为了方便使用,建议设置为fanout 类型,如若读者想设置为direct 或者topic 的类型也没有什么不妥。需要注意的是,消息被重新发送到备份交换器时的路由键和从生产者发出的路由键是一样的。

考虑这样一种情况,如果备份交换器的类型是direct , 并且有一个与其绑定的队列,假设绑定的路由键是keyl , 当某条携带路由键为key2 的消息被转发到这个备份交换器的时候,备份交换器没有匹配到合适的队列,则消息丢失。如果消息携带的路由键为keyl,则可以存储到队列中。

对于备份交换器,总结了以下几种特殊情况:

  • 如果设置的备份交换器不存在,客户端和RabbitMQ 服务端都不会有异常出现,此时消息会丢失。
  • 如果备份交换器没有绑定任何队列,客户端和RabbitMQ 服务端都不会有异常出现,此时消息会丢失。
  • 令如果备份交换器没有任何匹配的队列,客户端和RabbitMQ 服务端都不会有异常出现,此时消息会丢失。
  • 如果备份交换器和mandatory 参数一起使用,那么mandatory 参数无效。

RabbitMQ的TTL

TTL, Time to Live 的简称,即过期时间。RabbitMQ 可以对消息和队列设置TTL 。

设置消息的TTL

目前有两种方法可以设置消息的TTL。第一种方法是通过队列属性设置,队列中所有消息都有相同的过期时间。第二种方法是对消息本身进行单独设置,每条消息的TTL 可以不同。如果两种方法一起使用,则消息的TTL 以两者之间较小的那个数值为准。消息在队列中的生存时I司一旦超过设置的TTL 值时,就会变成"死信" (Dead Message) ,消费者将无法再收到该消息。通过队列属性设置消息TTL 的方法是在channel.queueDeclare 方法中加入x-message -ttl 参数实现的,这个参数的单位是毫秒。

Map<String, Object> argss = new HashMap<String , Object>();
argss.put("x-message-ttl " , 6000);
channel . queueDeclare(queueName , durable , exclusive , autoDelete , argss) ;
同时也可以通过Policy 的方式来设置TTL.示例如下:

rabbitmqctl set_policy TTL "食" "message-ttl":60000 --apply-to queues

还可以通过调用HTTPAPI 接口设置:

$ curl -i -u root:root -H "content-type:application/json"-X PUT
-d"auto_delete":false , "durable":true , "arguments":"x-message-ttl": 60000
http://localhost:15672/api/queues/vhost/queuename

如果不设置TTL.则表示此消息不会过期;如果将TTL 设置为0 ,则表示除非此时可以直接将消息投递到消费者,否则该消息会被立即丢弃,这个特性可以部分替代RabbitMQ 3.0 版本之前的immediate 参数,之所以部分代替,是因为immediate 参数在投递失败时会用Basic.Return 将消息返回。针对每条消息设置TTL 的方法是在channel.basicPublish 方法中加入expiration的属性参数,单位为毫秒。

AMQP . BasicProperties.Builder builder = new AMQP.BasicProperties . Builder();
builder . deliveryMode(2); / / 持久化消息
builder . expiration( " 60000 " );/ / 设置TTL=60000ms
AMQP.BasicProperties properties = builder . build() ;
channel.basicPublish(exchangeName , routingKey, mandatory, properties ,
"ttlTestMessage".getBytes());

为什么这两种方法处理的方式不一样?

因为第一种方法里,队列中己过期的消息肯定在队列头部, RabbitMQ 只要定期从队头开始扫描是否有过期的消息即可。而第二种方法里,每条消息的过期时间不同,如果要删除所有过期消息势必要扫描整个队列,所以不如等到此消息即将被消费时再判定是否过期, 如果过期再进行删除即可。

设置队列的TTL

通过channel.queueDeclare 方法中的x-expires 参数可以控制队列被自动删除前处于未使用状态的时间。未使用的意思是队列上没有任何的消费者,队列也没有被重新声明,并且在过期时间段内也未调用过Basic . Get 命令。设置队列里的TTL 可以应用于类似RPC 方式的回复队列,在RPC 中,许多队列会被创建出来,但是却是未被使用的。

RabbitMQ 会确保在过期时间到达后将队列删除,但是不保障删除的动作有多及时。在RabbitMQ 重启后,持久化的队列的过期时间会被重新计算。用于表示过期时间的x-expires 参数以毫秒为单位,井且服从和x-message-ttl一样的约束条件,不过不能设置为0,比如该参数设置为1 000 ,则表示该队列如果在1 秒钟之内未使用则会被删除。

Map<String, Object> args =口ew HashMap<String, Object>) ;

args . put( "x-expires" , 1800000);

channel . queueDeclare("myqueue " , false , false , false , args) ;

RabbitMQ的死信交换机、队列

DLX ,全称为Dead-Letter-Exchange ,可以称之为死信交换器,也有人称之为死信邮箱。当消息在一个队列中变成死信(dead message) 之后,它能被重新被发送到另一个交换器中,这个交换器就是DLX ,绑定DLX 的队列就称之为死信队列。

消息变成死信一般是由于以下几种情况:

  • 消息被拒绝(Basic.Reject/Basic.Nack) ,井且设置requeue 参数为false;
  • 消息过期;
  • 队列达到最大长度。

DLX 也是一个正常的交换器,和一般的交换器没有区别,它能在任何的队列上被指定, 实际上就是设置某个队列的属性。当这个队列中存在死信时, RabbitMQ 就会自动地将这个消息重新发布到设置的DLX 上去,进而被路由到另一个队列,即死信队列。可以监听这个队列中的消息、以进行相应的处理,这个特性与将消息的TTL 设置为0 配合使用可以弥补imrnediate 参数的功能。

通过在channel.queueDeclare 方法中设置x-dead-letter-exchange 参数来为这个队列添加DLX.

channel.exchangeDeclare("dlx_exchange " , "dir ect "); //创建DLX: dlx_exchange
Map<String, Object> args = new HashMap<String, Object>();
args . put("x-dead-letter-exchange" , " dlx exchange ");

//为队列myqueue 添加DLX
channel.queueDeclare("myqueue" , false , false , false , args);

也可以为这个DLX 指定路由键,如果没有特殊指定,则使用原队列的路由键:
args.put("x-dead-letter-routing-key" , "dlx-routing-key");

当然这里也可以通过Policy 的方式设置:
rabbitmqctl set_policy DLX " 女" "dead-letter- exchange":" dlx_exchange "
--apply-to queues

这里创建了两个交换器exchange.normal 和exchange.dlx , 分别绑定两个队列queue.normal和queue.dlx 。由Web 管理页面(图4-3) 可以看出,两个队列都被标记了"D 飞这个是durable 的缩写,即设置了队列持久化。queue.normal 这个队列还配置了TTL 、DLX 和DLK ,其中DLX 指的是
x-dead-letter-routing-key 这个属性。

RabbitMQ——RabbitMQ的高级特性(TTL、死信队列、延迟队列、优先级队列、RPC)_rabbitmq_04

channel .exchangeDeclare("exchange.dlx" , "direct " , true);
channel . exchangeDeclare( "exchange.normal " , " fanout " , true);
Map<String , Object> args = new HashMap<String, Object>( );
args . put("x-message- ttl " , 10000);
args . put( "x-dead-letter-exchange " , "exchange . dlx");
args . put( "x-dead-letter-routing-key" , " routingkey");
channe1 .queueDec1are( "queue.norma1 " , true , fa1se , fa1se , args);
channe1 . queueBind( "queue.normal " , "exchange .normal" , "");
channe1 . queueDec1are( "queue.d1x " , true , false , false , null) ;
channel .queueBind( "queue.dlx" , "exchange.dlx " , Wr outingkey");
channel . basicPublish( "exchange.normal" , " rk " ,
MessageProperties.PERSISTENT_TEXT_PLAIN, "dlx " .getBytes()) ;

生产者首先发送一条携带路由键为" rk " 的消息,然后经过交换器exchange .normal 顺利地存储到队列queue.normal 中。由于队列queue.normal 设置了过期时间为10s , 在这10s 内没有消费者消费这条消息,那么判定这条消息为过期。由于设置了DLX , 过期之时, 消息被丢给交换器exchange.dlx 中,这时找到与exchange.dlx 匹配的队列queue . dlx , 最后消息被存储在queue.dlx 这个死信队列中。

RabbitMQ——RabbitMQ的高级特性(TTL、死信队列、延迟队列、优先级队列、RPC)_RPC_05

对于RabbitMQ 来说, DLX 是一个非常有用的特性。它可以处理异常情况下,消息不能够被消费者正确消费(消费者调用了Basic.Nack 或者Basic.Reject) 而被置入死信队列中的情况,后续分析程序可以通过消费这个死信队列中的内容来分析当时所遇到的异常情况,进而可以改善和优化系统。DLX 配合TTL 使用还可以实现延迟队列的功能。

RabbitMQ的延迟队列

延迟队列存储的对象是对应的延迟消息,所谓"延迟消息"是指当消息被发送以后,并不想让消费者立刻拿到消息,而是等待特定时间后,消费者才能拿到这个消息进行消费。在AMQP 协议中,或者RabbitMQ 本身没有直接支持延迟队列的功能,但是可以通过前面所介绍的DLX 和TTL 模拟出延迟队列的功能。

延迟队列的使用场景有很多,比如:

  • 在订单系统中, 一个用户下单之后通常有30分钟的时间进行支付,如果30 分钟之内没有支付成功,那么这个订单将进行异常处理,这时就可以使用延迟队列来处理这些订单了。
  • 用户希望通过手机远程遥控家里的智能设备在指定的时间进行工作。这时候就可以将用户指令发送到延迟队列,当指令设定的时间到了再将指令推送到智能设备。

在图中,不仅展示的是死信队列的用法,也是延迟队列的用法,对于queue.dlx 这个死信队列来说,同样可以看作延迟队列。假设一个应用中需要将每条消息都设置为10 秒的延迟,生产者通过exchange.normal 这个交换器将发送的消息存储在queue.normal 这个队列中。消费者订阅的并非是queue.normal 这个队列,而是queue.dlx 这个队列。当消息从queue.normal 这个队列中过期之后被存入queue.dlx 这个队列中,消费者就恰巧消费到了延迟10 秒的这条消息。在真实应用中,对于延迟队列可以根据延迟时间的长短分为多个等级,一般分为5 秒、10秒、30 秒、1 分钟、5 分钟、10 分钟、30 分钟、1 小时这几个维度,当然也可以再细化一下。,为了简化说明,这里只设置了5 秒、10 秒、30 秒、l 分钟这四个等级。根据应用需求的不同,生产者在发送消息的时候通过设置不同的路由键,以此将消息发送到与交换器绑定的不同的队列中。这里队列分别设置了过期时间为5 秒、10 秒、30 秒、1 分钟,同时也分别配置了DLX 和相应的死信队列。当相应的消息过期时,就会转存到相应的死信队列(即延迟队列〉中,这样消费者根据业务自身的情况,分别选择不同延迟等级的延迟队列进行消费。

RabbitMQ——RabbitMQ的高级特性(TTL、死信队列、延迟队列、优先级队列、RPC)_RPC_06

RabbitMQ的优先级队列

优先级队列,顾名思义,具有高优先级的队列具有高的优先权,优先级高的消息具备优先被消费的特权。可以通过设置队列的x - max - priority 参数来实现。示例代码如代码清单所示。

Map<String, Object> args =口ew HashMap<String, Object>() ;
args . put( "x-rnax-priority " , 10) ;
channel.queueDeclare( " queue . priority" , true , fa1se , false , args) ;

RabbitMQ——RabbitMQ的高级特性(TTL、死信队列、延迟队列、优先级队列、RPC)_优先级_07

上面的代码演示的是如何配置一个队列的最大优先级。在此之后, 需要在发送时在消息中设置消息当前的优先级。

AMQP.BasicProperties.Bui1der builder = new AMQP.BasicProperties.Builder() ;
builder.priority(5) ;
AMQP.BasicProperties properties = builder.build () ;
channel.basicPub1ish("exchange_priority" , "rk_priority" , properties , (messages
").getBytes () ) ;

上面的代码中设置消息的优先级为5 。默认最低为0 ,最高为队列设置的最大优先级。优先级高的消息可以被优先消费,这个也是有前提的: 如果在消费者的消费速度大于生产者的速度且Broker 中没有消息堆积的情况下, 对发送的消息设置优先级也就没有什么实际意义。因为生产者刚发送完一条消息就被消费者消费了,那么就相当于Broker 中至多只有一条消息,对于单条消息来说优先级是没有什么意义的。

RabbitMQ的RPC

RPC是Remote Procedure Call 的简称,即远程过程调用。它是一种通过网络从远程计算机上请求服务,而不需要了解底层网络的技术。RPC 的主要功用是让构建分布式计算更容易,在提供强大的远程调用能力时不损失本地调用的语义简洁性。

通俗点来说,假设有两台服务器A 和B , 一个应用部署在A 服务器上,想要调用B 服务器上应用提供的函数或者方法,由于不在同一个内存空间, 不能直接调用,需要通过网络来表达调用的语义和传达调用的数据。RPC 的协议有很多,比如最早的CORBA 、Java RMI, WebService 的RPC 风格、Hessian 、Thrift 甚至还有Restful API 。

一般在RabbitMQ 中进行RPC 是很简单。客户端发送请求消息,服务端回复响应的消息。为了接收响应的消息,我们需要在请求消息中发送一个回调队列(参考下面代码中的replyTo) 。可以使用默认的队列,

String callbackQueueName = channel.queueDeclare() . getQueue();

BasicProperties props = new BasicProperties .Bu 工lder() . replyTo(callbackQueueName) . build();

channel.basicPublish( "" , " rpc queue " , props , message.getBytes()) ;
// then code to read a respo 口se message from the callback_queue...
  • replyTo: 通常用来设置一个回调队列。
  • correlationId : 用来关联请求( request) 和其调用RPC 之后的回复(response ) 。

如果像上面的代码中一样,为每个RPC 请求创建一个回调队列,则是非常低效的。但是幸运的是这里有一个通用的解决方案一一可以为每个客户端创建一个单一的回调队列。这样就产生了一个新的问题,对于回调队列而言,在其接收到一条回复的消息之后,它并不知道这条消息应该和哪一个请求匹配。这里就用到correlationld 这个属性了, 我们应该为每一个请求设置一个唯一的correlationld 。之后在回调队列接收到回复的消息时,可以根据这个属性匹配到相应的请求。如果回调队列接收到一条未知correlationld 的回复消息,可以简单地将其丢弃。

为什么要将回调队列中的位置消息丢弃而不是仅仅将其看作失败?

RPC 服务器可能在发送给回调队列(amq.gen-LhQzlgv3GhDOv8PIDabOXA )并且在确认接收到请求的消息( rpc_queue中的消息)之后挂掉了,那么只需重启下RPC 服务器即可, RPC 服务会重新消费rpc_queue 队列中的请求,这样就不会出现RPC 服务端未处理请求的情况。这里的回调队列可能会收到重复消息的情况,这需要客户端能够优雅地处理这种情况,并且RPC 请求也需要保证其本身是幂等的(补充: 根据3. 5 节的介绍,消费者消费消息一般是先处理业务逻辑, 再使用Basic . Ack确认己接收到消息以防止消息不必要地丢失)。

RabbitMQ——RabbitMQ的高级特性(TTL、死信队列、延迟队列、优先级队列、RPC)_优先级_08

RPC 的处理流程如下:

(1)当客户端启动时,创建一个匿名的回调队列(名称由RabbitMQ 自动创建,图中的回调队列为amq.gen-LhQzlgv3GhDOv8PIDabOXA ) 。

(2) 客户端为RPC 请求设置2 个属性: reply T o 用来告知RPC 服务端回复请求时的目的队列,即回调队列; correlationld 用来标记一个请求。

(3)请求被发送到rpc_queue 队列中。

(4) RPC 服务端监听rpc_queue 队列中的请求,当请求到来时, 服务端会处理并且把带有结果的消息发送给客户端。接收的队列就是replyTo 设定的回调队列。

(5) 客户端监昕回调队列, 当有消息时, 检查correlationld 属性,如果与请求匹配,那就是结果了。

RabbitMQ的持久化

"持久化"这个词汇在前面的篇幅中有多次提及,持久化可以提高RabbitMQ 的可靠性, 以防在异常情况(重启、关闭、右机等)下的数据丢失。本节针对这个概念做一个总结。RabbitMQ
的持久化分为三个部分:

交换器的持久化

交换器的持久化是通过在声明队列是将durable 参数置为true 实现的,如果交换器不设置持久化,那么在RabbitMQ 服务重启之后,相关的交换器元数据会丢失,不过消息不会丢失,只是不能将消息发送到这个交换器中了。对一个长期使用的交换器来说,建议将其置为持久化的。

队列的持久化

队列的持久化是通过在声明队列时将durable 参数置为true 实现的,如果队列不设置持久化,那么在RabbitMQ 服务重启之后,相关队列的元数据会丢失,此时数据也会丢失。正所谓"皮之不存,毛将焉附",队列都没有了,消息又能存在哪里呢?队列的持久化能保证其本身的元数据不会因异常情况而丢失,但是并不能保证内部所存储的消息不会丢失。要确保消息不会丢失, 需要将其设置为持久化。通过将消息的投递模式(BasicPropert i es 中的deliveryMode 属性)设置为2 即可实现消息的持久化。前面示例中多次提及的MessageProperties.PERSISTENT TEXT PLAIN 实际上是封装了这个属性:

public stat 工c final BasicProperties PERSISTENT TEXT PLAIN =
new BasicProperties("text/plain",
null,
null,
2,//deliveryMode,
null,null,null,
null,null,null,null ,
null,null);

消息的持久化

设置了队列和消息的持久化,当RabbitMQ 服务重启之后,消息依旧存在。单单只设置队列持久化,重启之后消息会丢失;单单只设置消息的持久化,重启之后队列消失,继而消息也丢失。单单设置消息持久化而不设置队列的持久化显得毫无意义。

可以将所有的消息都设直为持久化,但是这样会严重影响RabbitMQ 的性能(随机)。写入磁盘的速度比写入内存的速度慢得不只一点点。对于可靠性不是那么高的消息可以不采用持久
化处理以提高整体的吞吐量。在选择是否要将消息持久化时,需要在可靠性和吐吞量之间做一个权衡。

将交换器、队列、消息都设置了持久化之后就能百分之百保证数据不丢失了吗?

  • 首先从消费者来说,如果在订阅消费队列时将autoAck 参数设置为true ,那么当消费者接收到相关消息之后,还没来得及处理就看机了,这样也算数据丢失。这种情况很好解决,将autoAck 参数设置为false , 并进行手动确认。
  • 其次,在持久化的消息正确存入RabbitMQ 之后,还需要有一段时间(虽然很短,但是不可忽视〉才能存入磁盘之中。RabbitMQ 并不会为每条消息都进行同步存盘(调用内核的fsync 方法)的处理,可能仅仅保存到操作系统缓存之中而不是物理磁盘之中。如果在这段时间内RabbitMQ 服务节点发生了岩机、重启等异常情况,消息保存还没来得及落盘,那么这些消息将会丢失

这个问题怎么解决呢?这里可以引入RabbitMQ 的镜像队列机制,相当于配置了副本,如果主节点(master) 在此特殊时间内挂掉,可以自动切换到从节点(slave),这样有效地保证了高可用性,除非整个集群都挂掉。虽然这样也不能完全保证RabbitMQ 消息不丢失,但是配置了镜像队列要比没有配置镜像队列的可靠性要高很多,在实际生产环境中的关键业务队列一般都会设置镜像队列。还可以在发送端引入事务机制或者发送方确认机制来保证消息己经正确地发送并存储至RabbitMQ 中,前提还要保证在调用channel.basicPublish 方法的时候交换器能够将消息正确路由到相应的队列之中。

RabbitMQ生产者消息发送确认

在使用RabbitMQ 的时候,可以通过消息持久化操作来解决因为服务器的异常崩溃而导致的消息丢失,除此之外,我们还会遇到一个问题,当消息的生产者将消息发送出去之后,消息到底有没有正确地到达服务器呢?如果不进行特殊配置,默认情况下发送消息的操作是不会返回任何信息给生产者的,也就是默认情况下生产者是不知道消息有没有正确地到达服务器。如果在消息到达服务器之前己经丢失,持久化操作也解决不了这个问题,因为消息根本没有到达服务器,何谈持久化?RabbitMQ 针对这个问题,提供了两种解决方式:

通过事务机制实现

Rabb itMQ 客户端中与事务机制相关的方法有三个: channel.txSelect 、channel.txCommit 和channel.txRollbacko 、channel.txSelect 用于将当前的信道设置成事务模式. channel . txCommit 用于提交事务. channel . txRollback 用于事务回滚。在通过channel.txSelect 方法开启事务之后,我们便可以发布消息给RabbitMQ 了,如果事务提交成功,则消息一定到达了RabbitMQ 中,如果在事务提交执行之前由于RabbitMQ异常崩溃或者其他原因抛出异常,这个时候我们便可以将其捕获,进而通过执行channel . txRollback 方法来实现事务回夜。注意这里的RabbitMQ 中的事务机制与大多数数据库中的事务概念井不相同,需要注意区分。

channel . txSelect();
channel.basicPublish(EXCHANGE NAME , ROUTING KEY ,
MessageProperties . PERSISTENT TEXT_PLAIN,
"transaction messages".getBytes());
channel . txCommit();

RabbitMQ——RabbitMQ的高级特性(TTL、死信队列、延迟队列、优先级队列、RPC)_优先级_09

可以发现开启事务机制与不开启(参考图2-10 ) 相比多了四个步骤:

  • 客户端发送Tx.Select. 将信道置为事务模式;
  • Broker 回复Tx. Select-Ok. 确认己将信道置为事务模式:
  • 在发送完消息之后,客户端发送Tx.Commit 提交事务;
  • Broker 回复Tx. Commi t-Ok. 确认事务提交。

事务确实能够解决消息发送方和RabbitMQ 之间消息确认的问题,只有消息成功被RabbitMQ 接收,事务才能提交成功,否则便可在捕获异常之后进行事务回滚,与此同时可以进行消息重发。但是使用事务机制会"吸干" RabbitMQ 的性能,那么有没有更好的方法既能保证消息发送方确认消息已经正确送达,又能基本上不带来性能上的损失呢?从AMQP 协议层面来看并没有更好的办法,但是RabbitMQ 提供了一个改进方案,即发送方确认机。

通过发送方确认(publisher confirm ) 机制实现

前面介绍了RabbitMQ 可能会遇到的一个问题,即消息发送方(生产者〉并不知道消息是否真正地到达了RabbitMQ。随后了解到在AMQP 协议层面提供了事务机制来解决这个问题,但是采用事务机制实现会严重降低RabbitMQ 的消息吞吐量,这里就引入了一种轻量级的方式一发送方确认(publisher confirm) 机制。生产者将信道设置成confmn (确认)模式,一旦信道进入confirm 模式,所有在该信道上面发布的消息都会被指派一个唯一的ID(从l 开始),一旦消息被投递到所有匹配的队列之后,RabbitMQ 就会发送一个确认CBasic.Ack) 给生产者(包含消息的唯一ID) ,这就使得生产者知晓消息已经正确到达了目的地了。如果消息和队列是可持久化的,那么确认消息会在消息写入磁盘之后发出。RabbitMQ 回传给生产者的确认消息中的delivery Tag 包含了确认消息的序号,此外RabbitMQ 也可以设置channel . basicAck 方法中的multiple 参数,表示到这个序号之前的所有消息都己经得到了处理,可以参考图 。注意辨别这里的确认和消费时候的确认之间的异同。

RabbitMQ——RabbitMQ的高级特性(TTL、死信队列、延迟队列、优先级队列、RPC)_rabbitmq_10

事务机制在一条消息发送之后会使发送端阻塞,以等待RabbitMQ 的回应,之后才能继续发送下一条消息。相比之下, 发送方确认机制最大的好处在于它是异步的,一旦发布一条消息,生产者应用程序就可以在等信道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用程序便可以通过回调方法来处理该确认消息,如果RabbitMQ 因为自身内部错误导致消息丢失,就会发送一条nack CBas i c . Nack) 命令,生产者应用程序同样可以在回调方法中处理该nack 命令。

生产者通过调用channel . confirmSelect 方法(即Confirm.Select 命令)将信道设置为confrrm 模式,之后RabbitMQ 会返回Confirm . Select-Ok 命令表示同意生产者将当前信道设置为confirm 模式。所有被发送的后续消息都被ack 或者nack 一次,不会出现一条消息既被ack 又被nack 的情况, 并且RabbitMQ 也并没有对消息被confrrm 的快慢做任何保证。

try 
channel.confirmSelect(); //将信道置为publisher confirm 模式
//之后正常发送消息
channel.basicPublish("exchange","routingKey",null,"publisher confirm test".getBytes());
if(!channel.waitForConfirms())
System.out.println( "send message failed " ) ;
// do something else..

catch (InterruptedException e)
e.printStackTrace() ;

如果发送多条消息,需要将channel . basicPublish 和channel.waitForConfirms 方法包裹在循环里面即可,可以参考事务机制,不过不需要把channel . confirmSelect 方法包裹在循环内部。

RabbitMQ——RabbitMQ的高级特性(TTL、死信队列、延迟队列、优先级队列、RPC)_客户端_11

注意要点:

  • (1)事务机制和publisher ∞nfirm 机制两者是互斥的,不能共存。如果企图将已开启事务模式的信道再设直为publisher confmn 模式, RabbitMQ 会报错amqp_error , precondition_failed, " cannot switch from tx to confirm mode" , confirm .sele ct ; 或者如果企图将已开启publisher confum 模式的信道再设直为事务模式, RabbitMQ 也会报错:amqp_error, precondition_failed, "cannot switch from confirm to txmode" , tx . select 。
  • (2)事务和忡忡口publisher confum 机制确保的是消息能够正确地发送至RabbitMQ ,这里的"友送至RabbitMQ" 的含义是指消息被正确地发往至RabbitMQ 的交换器,如果此交换器没有匹配的队列,那么消息也会丢失。所以在使用这两种机制的时候要确保所涉及的交换器能够有匹配的队列. 更进一步地讲,发送方要配合mandatory 参数或者备份交换器一起使用来提高消息传输的可靠性。

publisher confmn 的优势在于并不一定需要同步确认。这里我们改进了一下使用方式,总结有如下两种:

  • 批量confirm 方法:每发送一批消息后,调用channel.waitForConfirms 方法,等待服务器的确认返回。
  • 异步confirm 方法:提供一个回调方法,服务端确认了一条或者多条消息后客户端会因,调这个方法进行处理。
  • 在批量confmn 方法中,客户端程序需要定期或者定量(达到多少条),亦或者两者结合起来调用channel .w aitForConfirms 来等待RabbitMQ 的确认返回。相比于前面示例中的普通confirm 方法,批量极大地提升了confmn 的效率,但是问题在于出现返回Basic.Nack 或者超时情况时,客户端需要将这一批次的消息全部重发,这会带来明显的重复消息数量,并且
    当消息经常丢失时,批量confirm 的性能应该是不升反降的。

RabbitMQ消费者消费确认

消费者客户端可以通过推模式或者拉模式的方式来获取井消费消息,当消费者处理完业务逻辑需要手动确认消息己被接收,这样RabbitMQ才能把当前消息从队列中标记清除。当然如果消费者由于某些原因无法处理当前接收到的消息,可以通过channel . basicNack 或者channel . basicReject 来拒绝掉。这里对于RabbitMQ 消费端来说,还有几点需要注意:

消息分发

当RabbitMQ 队列拥有多个消费者时,队列收到的消息将以轮询(round-robin )的分发方式发送给消费者。每条消息只会发送给订阅列表里的一个消费者。这种方式非常适合扩展,而且它是专门为并发程序设计的。如果现在负载加重,那么只需要创建更多的消费者来消费处理消息即可。很多时候轮询的分发机制也不是那么优雅。默认情况下,如果有n 个消费者,那么RabbitMQ会将第m 条消息分发给第m%n (取余的方式)个消费者, RabbitMQ 不管消费者是否消费并己经确认(Basic.Ack) 了消息。试想一下,如果某些消费者任务繁重,来不及消费那么多的消息,而某些其他消费者由于某些原因(比如业务逻辑简单、机器性能卓越等)很快地处理完了所分配到的消息,进而进程空闲,这样就会造成整体应用吞吐量的下降。那么该如何处理这种情况呢?这里就要用到channel.basicQos(int prefetchCount)这个方法,

在订阅消费队列之前,消费端程序调用了channel.basicQos(5) ,之后订阅了某个队列进行消费。RabbitMQ 会保存一个消费者的列表,每发送一条消息都会为对应的消费者计数,如果达到了所设定的上限,那么RabbitMQ 就不会向这个消费者再发送任何消息。直到消费者确认了某条消息之后, RabbitMQ 将相应的计数减1,之后消费者可以继续接收消息,直到再次到达计数上限。这种机制可以类比于TCP/IP中的"滑动窗口"。

前面介绍的都只用到了prefetchCount 这个参数,当prefetchCount 设置为0 则表示没有上限。还有prefetchSize 这个参数表示消费者所能接收未确认消息的总体大小的上限,单位为B ,设置为0 则表示没有上限。对于一个信道来说,它可以同时消费多个队列,当设置了prefetchCount 大于0 时,这个信道需要和各个队列协调以确保发送的消息都没有超过所限定的prefetchCount 的值,这样会使RabbitMQ 的性能降低,尤其是这些队列分散在集群中的多个Broker 节点之中。RabbitMQ 为了提升相关的性能,在AMQPO-9-1 协议之上重新定义了global 这个参数。

RabbitMQ——RabbitMQ的高级特性(TTL、死信队列、延迟队列、优先级队列、RPC)_rabbitmq_12

消息顺序性

消息的顺序性是指消费者消费到的消息和发送者发布的消息的顺序是一致的。举个例子,不考虑消息重复的情况,如果生产者发布的消息分别为msgl 、msg2 、msg3 ,那么消费者必然
也是按照msgl 、msg2 、msg3 的顺序进行消费的。目前很多资料显示RabbitMQ 的消息能够保障顺序性,这是不正确的,或者说这个观点有很大的局限性。在不使用任何RabbitMQ 的高级特性,也没有消息丢失、网络故障之类异常的情况发生,并且只有一个消费者的情况下,最好也只有一个生产者的情况下可以保证消息的顺序性。如果有多个生产者同时发送消息,无法确定消息到达Broker 的前后顺序,也就无法验证消息的顺序性。

那么哪些情况下RabbitMQ 的消息顺序性会被打破呢?下面介绍几种常见的情形。

  • 如果生产者使用了事务机制,在发送消息之后遇到异常进行了事务回滚,那么需要重新补偿发送这条消息,如果补偿发送是在另一个线程实现的,那么消息在生产者这个源头就出现了错序。同样,如果启用publisher confirrn 时,在发生超时、中断,又或者是收到RabbitMQ 的Basic.Nack 命令时,那么同样需要补偿发送,结果与事务机制一样会错序。或者这种说法有些牵强,我们可以固执地认为消息的顺序性保障是从存入队列之后开始的,而不是在发迭的时候开始的。
  • 考虑另一种情形,如果生产者发送的消息设置了不同的超时时间,井且也设置了死信队列,整体上来说相当于一个延迟队列,那么消费者在消费这个延迟队列的时候,消息的顺序必然不会和生产者发送消息的顺序一致。
  • 再考虑一种情形,如果消息设置了优先级,那么消费者消费到的消息也必然不是顺序性的。如果一个队列按照前后顺序分有msg1 , msg2 、msg3 、msg4 这4 个消息,同时有ConsumerA和ConsumerB 这两个消费者同时订阅了这个队列。队列中的消息轮询分发到各个消费者之中,ConsumerA 中的消息为msg1 和msg3 , ConsumerB 中的消息为msg2 、msg4 0 ConsumerA 收到消息msg1 之后并不想处理而调用了Basic.Nack/.Reject 将消息拒绝,与此同时将requeue 设置为true ,这样这条消息就可以重新存入队列中。消息msg1 之后被发送到了ConsumerB 中,此时ConsumerB 己经消费了msg2 、msg4 ,之后再消费msg 1.这样消息顺序性也就错乱了。或者消息msg1 又重新发往ConsumerA 中,此时ConsumerA 己经消费了msg3 ,那么再消费msg1 ,消息顺序性也无法得到保障。同样可以用在Basic.Recover 这个AMQP命令中。

包括但不仅限于以上几种情形会使RabbitMQ 消息错序。如果要保证消息的顺序性,需要业务方使用RabbitMQ 之后做进一步的处理,比如在消息体内添加全局有序标识(类似Sequence
ID) 来实现。

RabbitMQ的消息可靠性

消息可靠传输一般是业务系统接入消息中间件时首要考虑的问题,一般消息中间件的消息传输保障分为三个层级。

  • At most once: 最多一次。消息可能会丢失,但绝不会重复传输。:生产者随意发送,消费者随意消费,不过这样很难确保消息不会丢失。
  • At least once: 最少一次。消息绝不会丢失,但可能会重复传输。
  • Exactly once: 恰好一次。每条消息肯定会被传输一次且仅传输一次。:"恰好一次"是RabbitMQ 目前无法保障的。考虑这样一种情况,消费者在消费完一条消息之后向RabbitMQ 发送确认Basic.Ack 命令,此时由于网络断开或者其他原因造成RabbitMQ并没有收到这个确认命令,那么RabbitMQ 不会将此条消息标记删除。在重新建立连接之后,消费者还是会消费到这一条消息,这就造成了重复消费。再考虑一种情况,生产者在使用publisher confirm 机制的时候,发送完一条消息等待RabbitMQ 返回确认通知,此时网络断开,生产者捕获到异常情况,为了确保消息可靠性选择重新发送,这样RabbitMQ 中就有两条同样的消息,在消费的时候,消费者就会重复消费。

RabbitMQ 支持其中的"最多一次"和"最少一次"。其中"最少一次"投递实现需要考虑以下这个几个方面的内容:

(1)消息生产者需要开启事务机制或者publisher confirm 机制,以确保消息可以可靠地传输到RabbitMQ 中。

(2) 消息生产者需要配合使用mandatory 参数或者备份交换器来确保消息能够从交换器路由到队列中,进而能够保存下来而不会被丢弃。

(3) 消息和队列都需要进行持久化处理,以确保RabbitMQ 服务器在遇到异常情况时不会造成消息丢失。

(4) 消费者在消费消息的同时需要将autoAck 设置为false ,然后通过手动确认的方式去确认己经正确消费的消息,以避免在消费端引起不必要的消息丢失。

那么RabbitMQ 有没有去重的机制来保证"恰好一次"呢?

不仅是RabbitMQ ,目前大多数主流的消息中间件都没有消息去重机制,也不保障"恰好一次"。去重处理一般是在业务客户端实现,比如引入GUID (Globally Unique Identifier) 的概念。针对GUID ,如果从客户端的角度去重,那么需要引入集中式缓存,必然会增加依赖复杂度,另外缓存的大小也难以界定。建议在实际生产环境中,业务方根据自身的业务特性进行去重,比如业务消息本身具备军等性,或者借助Redis 等其他产品进行去重处理。

RabbitMQ RabbitMQ高级特性

消息如何保障100%的投递成功

什么是生产端的可靠性投递

  • 保障消息的成功发出
  • 保障MQ节点的成功接收
  • 发送端收到MQ节点(Borker)确认应答
  • 完善的消息进行补偿机制

生产端-可靠性投递(一)

  • 消息落库,对消息状态进行打标
  • 消息的延迟投递,做二次确认,回调检查

生产端-可靠性投递(二)

消息落库,对消息状态进行打标
技术图片

消息落库,对消息进行打标(对消息设置状态,发送中,broker收到,)

定时器轮训,检测未发送的消息,进行二次投递,最大努力尝试(设置最大次数)

  • step1 消息落库(唯一的消息id) ,一定是数据库入库成功以后在进行发送消息
  • step2 发送消息 到MQ Broker
  • step3 Broker Confirm (发送消息确认)
  • step4 生产者ConfirmListener (异步监听,Broker回送的响应)
  • step5 成功,通过messageId更新消息状态

补偿

分布式定时任务,抓取数据(超过第一时长),尝试重发,重试次数限制

生产端-可靠性投递(三)

消息的延迟投递,做二次确认,回调检查 (最大限度的减少消息落库)
技术图片
方案一在高并发场景下,每次消息落库,影响性能(IO操作)

step1: 业务消息落库 ,一定是数据库入库成功以后在进行发送消息

step2:第一次消息的发送

step3:延迟消息的检测

step4:监听,处理完,生成一条新消息

step5:通过队列发送,确认 不是之前的ack

幂等性概念

幂等性是什么

借鉴数据库的乐观锁机制

执行一条更新SQL

 update t_reps set count=count-1,version=version+1  where verison=1

消费端幂等性保障

在业务高峰期,如何避免消息的重复消费问题

消费端实现幂等性,就意味着,我们的消息永远不会被消费多次,即时收到多条一样的消息。

幂等性,通俗点说,就一个数据,或者一个请求,给你重复来多次,你得确保对应的数据是不会改变的,不能出错
https://github.com/doocs/advanced-java/blob/master/docs/high-concurrency/how-to-ensure-that-messages-are-not-repeatedly-consumed.md

业界主流的幂等性操作

  • 唯一ID+指纹码机制,利用数据库主键去重
  • 利用Redis的原子性实现

方案一

唯一ID+指纹码机制

  • 唯一ID+指纹码机制,利用数据库主键去重
  • select count(1) from t_order where id=唯一id+指纹码
  • 好处:实现简单
  • 坏处:高并发下有数据库写入的性能瓶颈
  • 解决方案:跟进ID进行分库分表进行路由算法

方案二:利用Redis的原子性实现

Confirm确认消息

理解Confirm消息确认机制:

  • 消息的去人,是指生产者投递消息后,如果Broker收到消息,则会给生产者一个应答
  • 生产者进行接收应该,用来确定这条消息是否正常的发送到broker,这种方式也是消息的可靠性投递的核心保障

confirm确认消息流程解析
技术图片
confirm确认消息实现

  • 第一步:在channel上开启确认模式: channel.sconfirmSelect()
  • 第二步:在channel上添加监听:addConfirmListener,监听成功和失败的返回结果,根据具体的结果对消息进行重新发送、或记录日志等后续处理。

示例

生产者

/**
 * @author niugang
 */
public class Producer {
	public static void main(String[] args) throws Exception {		
		//1 创建ConnectionFactory
		ConnectionFactory connectionFactory = new ConnectionFactory();
		connectionFactory.setHost("localhost");
		connectionFactory.setPort(5672);
		connectionFactory.setVirtualHost("/");
		
		//2 获取C	onnection
		Connection connection = connectionFactory.newConnection();
		
		//3 通过Connection创建一个新的Channel
		Channel channel = connection.createChannel();		
		//4 指定我们的消息投递模式: 消息的确认模式 
		channel.confirmSelect();
		
		String exchangeName = "test_confirm_exchange";
		String routingKey = "confirm.save";
		
		//5 发送一条消息
		String msg = "Hello RabbitMQ Send confirm message!";
		channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());
		
		//6 添加一个确认监听
		channel.addConfirmListener(new ConfirmListener() {
			//失败
			// deliveryTag 消息唯一标签
			@Override
			public void handleNack(long deliveryTag, boolean multiple) throws IOException {
				System.err.println("-------no ack!-----------");
			}

			//成功
			@Override
			public void handleAck(long deliveryTag, boolean multiple) throws IOException {
				System.err.println("-------ack!-----------");
			}
		});		
	}
}

消费者

/**
 * @author niugang
 */
public class Consumer {

	
	public static void main(String[] args) throws Exception {
		
		
		//1 创建ConnectionFactory
		ConnectionFactory connectionFactory = new ConnectionFactory();
		connectionFactory.setHost("localhost");
		connectionFactory.setPort(5672);
		connectionFactory.setVirtualHost("/");
		
		//2 获取C	onnection
		Connection connection = connectionFactory.newConnection();
		
		//3 通过Connection创建一个新的Channel
		Channel channel = connection.createChannel();
		
		String exchangeName = "test_confirm_exchange";
		String routingKey = "confirm.#";
		String queueName = "test_confirm_queue";
		
		//4 声明交换机和队列 然后进行绑定设置, 最后制定路由Key
		channel.exchangeDeclare(exchangeName, "topic", true);
		channel.queueDeclare(queueName, true, false, false, null);
		channel.queueBind(queueName, exchangeName, routingKey);
		
		//5 创建消费者 
		QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
		channel.basicConsume(queueName, true, queueingConsumer);
		
		while(true){
			Delivery delivery = queueingConsumer.nextDelivery();
			String msg = new String(delivery.getBody());
			
			System.err.println("消费端: " + msg);
		}
		
		
	}
}

Return消息机制

  • Return Listener 用于处理一些不可路由的消息。
  • 我们的消息生产者,通过指定一个Exchange和RoutingKey,把消息送达到某一个队列中取,然后我们的消费者监听队列,进行消费处理操作。
  • 但是在某些情况下,如果我们在发送消息的时候,当前的Exchange不存在或者指定的路由key路由不到,这个时候如果我们需要监听这种不可达的消息,就要使用Return Listener

配置

在基础API上有一个关键的配置项

Mandatory:如果为true,则监听器会接收到路由不可达的消息,然后进行后续处理,如果为false,那么broker端自动删除该消息。

流程
技术图片
生产者

/**
 * @author niugang
 */
public class Producer {

	public static void main(String[] args) throws Exception {
		
		
		ConnectionFactory connectionFactory = new ConnectionFactory();
		connectionFactory.setHost("localhost");
		connectionFactory.setPort(5672);
		connectionFactory.setVirtualHost("/");
		
		Connection connection = connectionFactory.newConnection();
		Channel channel = connection.createChannel();
		
		String exchange = "test_return_exchange";
		String routingKey = "return.save";
		String routingKeyError = "abc.save";
		
		String msg = "Hello RabbitMQ Return Message";
		
		
		channel.addReturnListener(new ReturnListener() {
			@Override
			public void handleReturn(int replyCode, String replyText, String exchange,
					String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
				
				System.err.println("---------handle  return----------");
				//响应码 312
				System.err.println("replyCode: " + replyCode);
				//NO_ROUTE
				System.err.println("replyText: " + replyText);
				System.err.println("exchange: " + exchange);
				System.err.println("routingKey: " + routingKey);
				System.err.println("properties: " + properties);
				System.err.println("body: " + new String(body));
			}
		});
		
		
		channel.basicPublish(exchange, routingKeyError, true, null, msg.getBytes());
		
		//channel.basicPublish(exchange, routingKeyError, true, null, msg.getBytes());	
	}
}

消费者

/**
 * @author niugang
 */
public class Consumer {
	public static void main(String[] args) throws Exception {
		ConnectionFactory connectionFactory = new ConnectionFactory();
		connectionFactory.setHost("localhost");
		connectionFactory.setPort(5672);
		connectionFactory.setVirtualHost("/");
		
		Connection connection = connectionFactory.newConnection();
		Channel channel = connection.createChannel();
		
		String exchangeName = "test_return_exchange";
		String routingKey = "return.#";
		String queueName = "test_return_queue";
		
		channel.exchangeDeclare(exchangeName, "topic", true, false, null);
		channel.queueDeclare(queueName, true, false, false, null);
		channel.queueBind(queueName, exchangeName, routingKey);	
		QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
		channel.basicConsume(queueName, true, queueingConsumer);
		
		while(true){
			Delivery delivery = queueingConsumer.nextDelivery();
			String msg = new String(delivery.getBody());
			System.err.println("消费者: " + msg);
		}	
	}
}

消费者自定义监听

  • 我们一般就是在代码中编写while循环,进行consumer.nextDelivery方法获取下一条消息,然后进行消费处理!
  • 但是我们使用自定义的Consumer更加的方便,解耦性更加的强,也是在实际工作中最常用的使用方式!

实现方式

自定义类,继承 DefaultConsumer
技术图片
生产者

/**
 * @author niugang
 */
public class Producer {

	
	public static void main(String[] args) throws Exception {
		
		ConnectionFactory connectionFactory = new ConnectionFactory();
		connectionFactory.setHost("localhost");
		connectionFactory.setPort(5672);
		connectionFactory.setVirtualHost("/");
		
		Connection connection = connectionFactory.newConnection();
		Channel channel = connection.createChannel();
		
		String exchange = "test_consumer_exchange";
		String routingKey = "consumer.save";
		
		String msg = "Hello RabbitMQ Consumer Message";
		
		for(int i =0; i<5; i ++){
			channel.basicPublish(exchange, routingKey, true, null, msg.getBytes());
		}
		
	}
}

自定义消费者

/**
 * 自定义消费者
 * @author niugang
 */
public class MyConsumer extends DefaultConsumer {


	public MyConsumer(Channel channel) {
		super(channel);
	}

	@Override
	public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
		System.err.println("-----------consume message----------");
		//消费标签
		System.err.println("consumerTag: " + consumerTag);
		System.err.println("envelope: " + envelope);
		System.err.println("properties: " + properties);
		System.err.println("body: " + new String(body));
	}


}

消费者

/**
 * @author niugang
 */
public class Consumer {

	
	public static void main(String[] args) throws Exception {
		
		
		ConnectionFactory connectionFactory = new ConnectionFactory();
		connectionFactory.setHost("localhost");
		connectionFactory.setPort(5672);
		connectionFactory.setVirtualHost("/");
		
		Connection connection = connectionFactory.newConnection();
		Channel channel = connection.createChannel();
		
		
		String exchangeName = "test_consumer_exchange";
		String routingKey = "consumer.#";
		String queueName = "test_consumer_queue";
		
		channel.exchangeDeclare(exchangeName, "topic", true, false, null);
		channel.queueDeclare(queueName, true, false, false, null);
		channel.queueBind(queueName, exchangeName, routingKey);
		
		channel.basicConsume(queueName, true, new MyConsumer(channel));
		
	
	}
}

消费端限流

什么是消费端的限流

  • 假设一个场景,首先,我们RabbitMQ服务器上有上万条未处理的消息,我们随便打开一个消费者客户端,会出现下面情况:
  • 巨量的消息瞬间全部推送过来,但是我们单个客户端无法同时处理这么多数据;
  • RabbitMQ提供了一种qos(服务质量保证)功能,即在非自动确认消息(autoAck为false)的前提下,如果一定数目的消息(通过基于consume或者channel设置Qos的值)未被确认前,不进行消费新的消息。
  • void BasicQos(uint prefetchSize,ushort prefetchCount,bool global);
  • prefetchSize:0 #这里为0表示不限制
  • prefetchCount: 会告诉RabbitMQ不要同时给一个消费者推送多于N个消息,即一旦有N个消息还没有ack,则该consumer将block掉,直到有消息ack; (prefetchCount等于1即可)
  • global:truefalse 是否将上面设置应用于channel
  • 简单来说,就是上面限制是channel级别的还是consumer级别;

生产者

/**
 * @author niugang
 */
public class Producer {

	
	public static void main(String[] args) throws Exception {
		
		ConnectionFactory connectionFactory = new ConnectionFactory();
		connectionFactory.setHost("localhosy");
		connectionFactory.setPort(5672);
		connectionFactory.setVirtualHost("/");
		
		Connection connection = connectionFactory.newConnection();
		Channel channel = connection.createChannel();
		
		String exchange = "test_qos_exchange";
		String routingKey = "qos.save";
		
		String msg = "Hello RabbitMQ QOS Message";
		
		for(int i =0; i<5; i ++){
			channel.basicPublish(exchange, routingKey, true, null, msg.getBytes());
		}
		
	}
}

自定义消费者

public class MyConsumer extends DefaultConsumer {


	private Channel channel ;
	
	public MyConsumer(Channel channel) {
		super(channel);
		this.channel = channel;
	}

	@Override
	public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
		System.err.println("-----------consume message----------");
		System.err.println("consumerTag: " + consumerTag);
		System.err.println("envelope: " + envelope);
		System.err.println("properties: " + properties);
		System.err.println("body: " + new String(body));
		//ack 注释掉后 控制台只会接收到一条消息
		channel.basicAck(envelope.getDeliveryTag(), false);
		
	}

}

消费者

/**
 * @author niugang
 */
public class Consumer {
	public static void main(String[] args) throws Exception {
		ConnectionFactory connectionFactory = new ConnectionFactory();
		connectionFactory.setHost("localhost");
		connectionFactory.setPort(5672);
		connectionFactory.setVirtualHost("/");
		
		Connection connection = connectionFactory.newConnection();
		Channel channel = connection.createChannel();
		String exchangeName = "test_qos_exchange";
		String queueName = "test_qos_queue";
		String routingKey = "qos.#";
		
		channel.exchangeDeclare(exchangeName, "topic", true, false, null);
		channel.queueDeclare(queueName, true, false, false, null);
		channel.queueBind(queueName, exchangeName, routingKey);
		
		//1 限流方式  第一件事就是 autoAck设置为 false
		//prefetchCount broker 给 消费者 最大推送消息数量
		channel.basicQos(0, 1, false);
		//手工签收
		channel.basicConsume(queueName, false, new MyConsumer(channel));	
	}
}

消息ACK与重回队列

消费端的手工ACK和NACK

  • 消费端进行消费的时候,如果由于业务异常我们可以进行日志的记录,然后进行补偿
  • 如果由于服务器宕机等严重问题,那我们就需要手工进行ACK保障消费端消费成功

消费端的重回队列

  • 消费端重回队列是为了对没有处理成功的消息,把消息重新会递给Broker (requeue属性设置)

  • 一般我们在实际应用中,都会关闭重回队列,也就是设置为false;

生产者

/**
 * ack 测试生产者
 * @author niugang
 */
public class Producer {
	public static void main(String[] args) throws Exception {
		
		ConnectionFactory connectionFactory = new ConnectionFactory();
		connectionFactory.setHost("localhost");
		connectionFactory.setPort(5672);
		connectionFactory.setVirtualHost("/");
		Connection connection = connectionFactory.newConnection();
		Channel channel = connection.createChannel();
		String exchange = "test_ack_exchange";
		String routingKey = "ack.save";
		for(int i =0; i<5; i ++){
			
			Map<String, Object> headers = new HashMap<String, Object>();
			headers.put("num", i);
			//设置消息属性
			AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
					.deliveryMode(2).expiration("1000")
					.contentEncoding("UTF-8")
					.headers(headers)
					.build();
			String msg = "Hello RabbitMQ ACK Message " + i;
			channel.basicPublish(exchange, routingKey, true, properties, msg.getBytes());
		}
		
	}
}

自定义消费者

/**
 * 自定义消费者
 * @author niugang
 */
public class MyConsumer extends DefaultConsumer {


	private Channel channel ;
	
	public MyConsumer(Channel channel) {
		super(channel);
		this.channel = channel;
	}

	@Override
	public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
		System.err.println("-----------consume message----------");
		System.err.println("body: " + new String(body));
		try {
			Thread.sleep(2000);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		if((Integer)properties.getHeaders().get("num") == 0) {
			//multiple 是否是批量
			//requeue 重新添加到队列尾部
			channel.basicNack(envelope.getDeliveryTag(), false, true);
		} else {
			channel.basicAck(envelope.getDeliveryTag(), false);
		}
		
	}
}

消费者

/**
 * 消费者
 * @author niugang
 */
public class Consumer {
    public static void main(String[] args) throws Exception {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");

        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        String exchangeName = "test_ack_exchange";
        String queueName = "test_ack_queue";
        String routingKey = "ack.#";
        channel.exchangeDeclare(exchangeName, "topic", true, false, null);
        channel.queueDeclare(queueName, true, false, false, null);
        channel.queueBind(queueName, exchangeName, routingKey);
        // 手工签收 必须要关闭 autoAck = false
        channel.basicConsume(queueName, false, new MyConsumer(channel));

    }
}

TTL队列/消息

TTL

  • TTL是Time To Live的缩写,也就是生存时间
  • RabbitMQ支持消息的过期时间,在消息发送时可以进行指定。
  • RabbitMQ支持队列的过期时间,从消息入队列开始计算,只要超过了队列的超时时间配置,那么消息会自动的删除。

在控制台创建队列
技术图片
在控制台创建exchange 并添加binding 然后发送消息,然后在队列页面可以看到消息自动被队列剔除
技术图片
原生API设置TTL

	AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
					.expiration("1000").build();

Spring AMQP设置TTL

MessageProperties messageProperties = new MessageProperties();
 //消息过期时间
messageProperties.setExpiration("1000");
Message stringMessage = new Message("Hello Springboot RabbitMQ".getBytes(), messageProperties);

死信队列

死信队列:DLX ,Dead-Letter-Exchange

  • 利用DLX,当消息在一个队列中变成死信(dead message)之后,它能被重新publish到另一个Exchange,这个Exchange就是DLX;

消息变成死信有以下几种情况

  • 消息被拒绝(basic.reject/basic.nack) 并且requeue=false;
  • 消息TTL过期
  • 队列达到最大长度

死信队列详细解释

DLX也是一个正常的Exchange,和一般的Exchange没有区别,它能在任何的队列上被指定,实际上就是设置某个队列的属性。
当这个队列中有死信时,RabbitMQ就会自动的将这个消息重新发布到设置的Exchange上去,进而被路由到另一个队列。
可以监听这个队列中消息做相应的处理,这个特性可以弥补RabbitMQ3.0以前支持的immediate参数的功能;

死信队列具体设置

step1:首先需要设置死信队列的exchange和queue,然后进行绑定

例如定义如下exchange和queue

  • Exchange:dlx.echange
  • Queue:dlx.queue
  • RoutingKey:#

step2:

然后我们进行正常声明交换机、队列、绑定,只不过我们需要在队列上加上一个参数即可:arguments.put(“x-dead-letter-exchange”,“dlx.exchange”);

这样消息在过期、requeue、队列在达到最大长度时,消息就可以直接路由到死信队列。

技术图片
生产者

/**
 * 私信队列 生产端
 *
 * @author niugang
 */
public class Producer {
    public static void main(String[] args) throws Exception {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");

        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        //自定义普通的exchange
        String exchange = "test_dlx_exchange";
        String routingKey = "dlx.save";
        String msg = "Hello RabbitMQ DLX Message";
        for (int i = 0; i < 1; i++) {
            AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
                    .deliveryMode(2)
                    .contentEncoding("UTF-8")
                    //过期时间为10s
                    .expiration("10000")
                    .build();
            channel.basicPublish(exchange, routingKey, true, properties, msg.getBytes());
        }

    }
}

自定义消息消费

/**
 * 自定义消息消费
 *
 * @author niugang
 */
public class MyConsumer extends DefaultConsumer {
    public MyConsumer(Channel channel) {
        super(channel);
    }

    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        System.err.println("-----------consume message----------");
        System.err.println("consumerTag: " + consumerTag);
        System.err.println("envelope: " + envelope);
        System.err.println("properties: " + properties);
        System.err.println("body: " + new String(body));
    }
}

生产者

/**
 * 死信队列消费端
 *
 * @author niugang
 */
public class Consumer {
    public static void main(String[] args) throws Exception {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");

        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();

        // 这就是一个普通的交换机 和 队列 以及路由
        String exchangeName = "test_dlx_exchange";
        String routingKey = "dlx.#";
        String queueName = "test_dlx_queue";

        channel.exchangeDeclare(exchangeName, "topic", true, false, null);

        Map<String, Object> agruments = new HashMap<String, Object>(16);
        //设置死信队列exchange  这些具体的参数可以通过rabbitmq控制台查看
        agruments.put("x-dead-letter-exchange", "dlx.exchange");
        //这个agruments属性,要设置到声明队列上
        channel.queueDeclare(queueName, true, false, false, agruments);
        channel.queueBind(queueName, exchangeName, routingKey);

        //要进行死信队列的声明:
        channel.exchangeDeclare("dlx.exchange", "topic", true, false, null);
        channel.queueDeclare("dlx.queue", true, false, false, null);
        channel.queueBind("dlx.queue", "dlx.exchange", "#");

        channel.basicConsume(queueName, true, new MyConsumer(channel));
    }
}

技术图片


















以上是关于RabbitMQ——RabbitMQ的高级特性(TTL死信队列延迟队列优先级队列RPC)的主要内容,如果未能解决你的问题,请参考以下文章

RabbitMQ RabbitMQ高级特性

RabbitMQ RabbitMQ高级特性

消息队列 RabbitMq高级特性

RabbitMq高级特性之延迟队列 通俗易懂 超详细 内含案例

RabbitMQ——RabbitMQ的高级特性(TTL死信队列延迟队列优先级队列RPC)

RabbitMQ高级特性