Spring Cloud Stream整合Rabbit之重复投递
Posted androidstarjack
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spring Cloud Stream整合Rabbit之重复投递相关的知识,希望对你有一定的参考价值。
点击上方关注 “终端研发部”
设为“星标”,和你一起掌握更多数据库知识
SpringCloudStream
整合Rabbit
时,消费端在处理失败时,如果需要进行重试,可以有如下几种重试机制:
方法1(默认):
当消费端在处理消息时抛出异常,那么默认会在当前线程的3次的Retry。该方法是默认的,可以通过修改配置文件,指定channel下的参数,例如:
|
|
其中:
max-attempts
如果等于1,就是不重试;max-attempts
如果大于1,其值就是重试次数。
当消息重试超过最大次数,如果未配置启用DLQ
,消息将会被丢弃。该方法默认是无法设置重试的时间间隔的。
方法2:
方法1是在当前线程进行重试,相当于阻塞了后面的消息,有时我们不想阻塞,则可以利用死信队列(Dead Letter Queue, 缩写DLQ
),进行异步重试。
先看一下DLQ
的逻辑。
|
|
设置spring.cloud.stream.rabbit.bindings.<channelName>.consumer.autoBindDlq
参数为true,将自动创建对应channel的DLQ
,绑定死信交换机(Dead Letter Exchange, 缩写DLX
)。默认该queue
的名字就是其对应destination
.group
后追加.dlq
,同时,该进入该queue
的消息的routingKey
即为原destination
。
按上面的配置,消息进入DLQ
以后,因为没有任何的消费者,消息会一直存储于DLQ
中,可以添加dlqTtl
参数设置消息在DLQ
中生存的时间,在无消费者的情况下,默认到期后会删除该消息。
如果想指定DLQ
的名称,可以用deadLetterQueueName
参数指定。
重试的逻辑其实就是利用DLQ
,给其设置一个默认的exchange
,在TTL
时间到期后,消息会再度转到指定的exchange
对应的queue
中。
为了实现该逻辑,需要配置三个参数:
autoBindDql
设置为true,启用DLQ
dlqTtl
设置一个死信消息超时时间,变相实现了重试的间隔时间dlqDeadLetterExchange
增加该参数后,留空即为设置默认值。在默认值情况下,DLQ
中的消息将会按照其routingKey
的值(也可由deadLetterRoutingKey
参数指定),将消息投递到给名称对应该值的quque
,实现消息的重新消费。
例如:
|
|
注意:
因为在配置中,设置了
group
这个参数,当该参数使用时,默认rabbit的durable
参数是启用的,即该channel的exhange
和queue
是持久化的,应用退出后,不会自动清除,并且保存创建时的参数。所以当改变了channel的参数后,需要将queue删除,让其自动重建,否则新改的不会生效,则无法实现自动重试。
按照上面配置,删除旧queue
后重新启动应用,创建queue
信息如下:
消息重新投递后,在其header
里,会增加一些重试的信息,如下图所示:
deliveryAttempt
值代表在当前线程的重试次数,即方法一的重试逻辑x-death
头记录了重试循环的一些详细信息,尤其是值count
记录了经由DLQ
异步重试的次数。
但有时,我们想知道上一次错误的具体异常,此时可以增加republishToDlq
参数,当设置为true时,会在消息头里增加详细的异常和异常堆栈信息。
注:
当
republishToDlq
设置为不同值时,routingKey
的取值逻辑不同。当为false时,取的是x-death
头中第一个的routing-keys
值;当为true时,取得是X_ORIGINAL_ROUTING_KEY_HEADER
这个Header的值。
此时,该消息将不断重复queue -> DLQ -> queue的循环(假设消费端一直拒绝或抛异常)。如果我们想设置重试次数大于3就不再重试,可以抛出ImmediateAcknowledgeAmqpException
这个异常,则该消息被丢弃,不再进入DLQ
。
关于消息的拒绝
前面对于消息的拒绝,都是采用抛异常,但是这个异常不能乱抛。不同的异常,框架处理的方式不同:
普通的异常,等同于
AmqpRejectAndDontRequeueException
,会导致消息重试ImmediateAcknowledgeAmqpException
这个异常,会导致消息被丢弃不触发重试
有时候,我们不期望在生产的日志中出现重试的ERROR,可以考虑用下面的方案:
将消费端的
acknowledgeMode
从默认的自动改为手动,即acknowledgeMode: MANUAL
将channel注入到消费端,手动处理,例如:
|
|
转自:Edison Xu
链接:http://edisonxu.com/2022/01/28/spring-cloud-stream-rabbit.html
写在最后的话
大家看完有什么不懂的可以在下方留言讨论,也可以私信问我一般看到后我都会回复的。最后觉得文章对你有帮助的话记得点个赞哦,点点关注不迷路
@终端研发部
每天都有新鲜的干货分享!
回复 【idea激活】即可获得idea的激活方式
回复 【Java】获取java相关的视频教程和资料
回复 【SpringCloud】获取SpringCloud相关多的学习资料
回复 【python】获取全套0基础Python知识手册
回复 【2020】获取2020java相关面试题教程
回复 【加群】即可加入终端研发部相关的技术交流群
阅读更多
用 Spring 的 BeanUtils 前,建议你先了解这几个坑!
lazy-mock ,一个生成后端模拟数据的懒人工具
在华为鸿蒙 OS 上尝鲜,我的第一个“hello world”,起飞!
字节跳动一面:i++ 是线程安全的吗?
一条 SQL 引发的事故,同事直接被开除!!
太扎心!排查阿里云 ECS 的 CPU 居然达100%
一款vue编写的功能强大的swagger-ui,有点秀(附开源地址)
相信自己,没有做不到的,只有想不到的在这里获得的不仅仅是技术!
点“在看”支持小于哥呀,谢谢啦
以上是关于Spring Cloud Stream整合Rabbit之重复投递的主要内容,如果未能解决你的问题,请参考以下文章
Spring Cloud Stream整合Rabbit之重复投递
微服务架构 | 8.1 使用 Spring Cloud Stream 整合 Apache kafka #yyds干货盘点#