使用RabbitMQ处理死信队列

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了使用RabbitMQ处理死信队列相关的知识,希望对你有一定的参考价值。

参考技术A

DLX,全称为Dead-Letter-Exchange,死信交换器。消息在一个队列中变成死信(Dead Letter)之后,被重新发送到一个特殊的交换器(DLX)中,同时,绑定DLX的队列就称为“死信队列”。
在定义业务队列时可以考虑指定一个 死信交换机,并绑定一个死信队列。当消息变成死信时,该消息就会被发送到该死信队列上,这样方便我们查看消息失败的原因。

以下几种情况导致消息变为死信:

对于RabbitMQ 来说,DLX 是一个非常有用的特性。它可以处理异常情况下,消息不能够被消费者正确消费(消费者调用了Basic.Nack 或者Basic.Reject)而被置入死信队列中的情况,后续分析程序可以通过消费这个死信队列中的内容来分析当时所遇到的异常情况,进而可以改善和优化系统。

pom.xml添加依赖

application.properties配置RabbitMQ连接信息

启动类实现

RabbitConfig类实现

GoController类实现

RabbitMQ项目使用之死信队列

消息消费失败处理方式:

一 进入死信队列(进入死信的三种方式)

1.消息被拒绝(basic.reject or basic.nack)并且requeue=false

2.消息TTL过期过期时间

3.队列达到最大长度

DLX也是一下正常的Exchange同一般的Exchange没有区别,它能在任何的队列上被指定,实际上就是设置某个队列的属性,当这个队列中有死信时,RabbitMQ就会自动的将这个消息重新发布到设置的Exchange中去,进而被路由到另一个队列, publish可以监听这个队列中消息做相应的处理, 这个特性可以弥补R abbitMQ 3.0.0以前支持的immediate参数中的向publish确认的功能。



rabbitmq的三种模式:

一. Fanout Exchange  广播

所有发送到Fanout Exchange的消息都会被转发到与该Exchange 绑定(Binding)的所有Queue上。Fanout Exchange  不需要处理RouteKey 。只需要简单的将队列绑定到exchange 上。这样发送到exchange的消息都会被转发到与该交换机绑定的所有队列上。类似子网广播,每台子网内的主机都获得了一份复制的消息。所以,Fanout Exchange 转发消息是最快的。

二. Direct Exchange  点对点

所有发送到Direct Exchange的消息被转发到RouteKey中指定的Queue。Direct模式,可以使用rabbitMQ自带的Exchange:default Exchange 。所以不需要将Exchange进行任何绑定(binding)操作 。消息传递时,RouteKey必须完全匹配,才会被队列接收,否则该消息会被抛弃。

三. Topic Exchange  模糊匹配

所有发送到Topic Exchange的消息被转发到所有关心RouteKey中指定Topic的Queue上,Exchange 将RouteKey 和某Topic 进行模糊匹配。此时队列需要绑定一个Topic。可以使用通配符进行模糊匹配,符号“#”匹配一个或多个词,符号“*”匹配不多不少一个词。因此“log.#”能够匹配到“log.info.oa”,但是“log.*” 只会匹配到“log.error”。所以,Topic Exchange 使用非常灵活。

 

死信队列实现:

   详见官网说明: http://www.rabbitmq.com/ttl.html

在声明期间使用x参数为队列定义消息TTL

 如:spring 中配置如下:

<rabbit:connection-factory id="connectionFactory" host="47.104.203.101" password="admin"
username="admin" port="5672"
channel-cache-size="30" virtual-host="/citpay"
publisher-confirms="true"
publisher-returns="true"/>


<rabbit:template id="amqpTemplate" connection-factory="connectionFactory" mandatory="true"
confirm-callback="payOrderConfirmCallBack"
return-callback="payOrderReturnCallBack"/>

<rabbit:admin connection-factory="connectionFactory" />

<rabbit:queue id="pay_order_queue" name="citpay.pay_order_queue" durable="true" auto-delete="false" >
<rabbit:queue-arguments>
<entry key="x-message-ttl">
<value type="java.lang.Long">5000</value>
</entry>
<entry key="x-dead-letter-exchange">
<value type="java.lang.String">citpay.pay_order_dead_exchange</value>
</entry>
</rabbit:queue-arguments>
</rabbit:queue>

<rabbit:direct-exchange name="citpay.direct_pay_order_exchange" id="direct_pay_order_exchange" durable="true" auto-delete="false">
<rabbit:bindings>
<rabbit:binding queue="citpay.pay_order_queue" key="pay_order_routekey"/>
</rabbit:bindings>
</rabbit:direct-exchange>

<rabbit:queue id="pay_dead_order_queue" name="citpay.pay_dead_order_queue" durable="true" auto-delete="false"></rabbit:queue>

<rabbit:direct-exchange name="citpay.pay_order_dead_exchange" id="pay_order_dead_exchange" durable="true" auto-delete="false">
<rabbit:bindings>
<rabbit:binding queue="citpay.pay_dead_order_queue" key="pay_order_routekey"/>
</rabbit:bindings>
</rabbit:direct-exchange>


<rabbit:listener-container
connection-factory="connectionFactory" acknowledge="manual" max-concurrency="1" >
<rabbit:listener queues="citpay.pay_dead_order_queue"
ref="payOrderDeadReceiveConfirmListener" />
</rabbit:listener-container>



使用策略为队列定义队列TTL

  命令形式:
rabbitmqctl set_policy expiry“。*”‘{“expires”:1800000}‘ - apply-to queues

        管理控制台设置策略:

  技术分享图片

 























































以上是关于使用RabbitMQ处理死信队列的主要内容,如果未能解决你的问题,请参考以下文章

RabbitMQ项目使用之死信队列

Rabbitmq消费失败死信队列

RabbitMQ项目使用之死信队列

RabbitMQ中的死信队列

rabbitmq死信队列及延迟队列

RabbitMQ-死信队列