Spring Cloud Stream整合Rabbit之重复投递

Posted androidstarjack

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spring Cloud Stream整合Rabbit之重复投递相关的知识,希望对你有一定的参考价值。

点击上方关注 “终端研发部


 

设为“星标”,和你一起掌握更多数据库知识

SpringCloudStream 整合Rabbit 时,消费端在处理失败时,如果需要进行重试,可以有如下几种重试机制:

方法1(默认):

当消费端在处理消息时抛出异常,那么默认会在当前线程的3次的Retry。该方法是默认的,可以通过修改配置文件,指定channel下的参数,例如:

1
2
3
4
5
6
7
8
9
10
spring:
  cloud:
    stream:
      bindings:
        input-test-event:
          destination: test-event
          group: test-group
          binder: rabbit
          consumer:
            max-attempts: 1

其中:

  • max-attempts 如果等于1,就是不重试;

  • max-attempts 如果大于1,其值就是重试次数。

当消息重试超过最大次数,如果未配置启用DLQ ,消息将会被丢弃。该方法默认是无法设置重试的时间间隔的。

方法2:

方法1是在当前线程进行重试,相当于阻塞了后面的消息,有时我们不想阻塞,则可以利用死信队列(Dead Letter Queue, 缩写DLQ ),进行异步重试。

先看一下DLQ 的逻辑。

1
2
3
4
5
6
7
8
9
10
11
12
13
spring:
  cloud:
    stream:
      bindings:
        input-test-event:
          destination: test-event
          group: test-group
          binder: rabbit
      rabbit:
        bindings:
          input-test-event:
            consumer:
              autoBindDlq: true

设置spring.cloud.stream.rabbit.bindings.<channelName>.consumer.autoBindDlq 参数为true,将自动创建对应channel的DLQ ,绑定死信交换机(Dead Letter Exchange, 缩写DLX )。默认该queue的名字就是其对应destination.group 后追加.dlq ,同时,该进入该queue 的消息的routingKey 即为原destination 。

按上面的配置,消息进入DLQ 以后,因为没有任何的消费者,消息会一直存储于DLQ 中,可以添加dlqTtl 参数设置消息在DLQ中生存的时间,在无消费者的情况下,默认到期后会删除该消息。

如果想指定DLQ的名称,可以用deadLetterQueueName 参数指定。

重试的逻辑其实就是利用DLQ ,给其设置一个默认的exchange ,在TTL 时间到期后,消息会再度转到指定的exchange 对应的queue 中。

为了实现该逻辑,需要配置三个参数:

  • autoBindDql 设置为true,启用DLQ

  • dlqTtl 设置一个死信消息超时时间,变相实现了重试的间隔时间

  • dlqDeadLetterExchange 增加该参数后,留空即为设置默认值。在默认值情况下,DLQ 中的消息将会按照其routingKey的值(也可由deadLetterRoutingKey参数指定),将消息投递到给名称对应该值的quque ,实现消息的重新消费。

例如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
spring:
  cloud:
    stream:
      bindings:
        input-test-event:
          destination: test-event
          group: test-group
          binder: rabbit
      rabbit:
        bindings:
          input-test-event:
            consumer:
              autoBindDlq: true 
              dlqTtl: 5000 
              dlqDeadLetterExchange:

注意:

因为在配置中,设置了group 这个参数,当该参数使用时,默认rabbit的durable 参数是启用的,即该channel的exhange 和queue 是持久化的,应用退出后,不会自动清除,并且保存创建时的参数。所以当改变了channel的参数后,需要将queue删除,让其自动重建,否则新改的不会生效,则无法实现自动重试。

按照上面配置,删除旧queue 后重新启动应用,创建queue 信息如下:

消息重新投递后,在其header 里,会增加一些重试的信息,如下图所示:

  • deliveryAttempt 值代表在当前线程的重试次数,即方法一的重试逻辑

  • x-death 头记录了重试循环的一些详细信息,尤其是值count 记录了经由DLQ 异步重试的次数。

但有时,我们想知道上一次错误的具体异常,此时可以增加republishToDlq 参数,当设置为true时,会在消息头里增加详细的异常和异常堆栈信息。

注:

当 republishToDlq 设置为不同值时,routingKey 的取值逻辑不同。当为false时,取的是x-death 头中第一个的routing-keys 值;当为true时,取得是X_ORIGINAL_ROUTING_KEY_HEADER 这个Header的值。

此时,该消息将不断重复queue -> DLQ -> queue的循环(假设消费端一直拒绝或抛异常)。如果我们想设置重试次数大于3就不再重试,可以抛出ImmediateAcknowledgeAmqpException 这个异常,则该消息被丢弃,不再进入DLQ

关于消息的拒绝

前面对于消息的拒绝,都是采用抛异常,但是这个异常不能乱抛。不同的异常,框架处理的方式不同:

  • 普通的异常,等同于AmqpRejectAndDontRequeueException ,会导致消息重试

  • ImmediateAcknowledgeAmqpException 这个异常,会导致消息被丢弃不触发重试

有时候,我们不期望在生产的日志中出现重试的ERROR,可以考虑用下面的方案:

  1. 将消费端的acknowledgeMode 从默认的自动改为手动,即 acknowledgeMode: MANUAL

  2. 将channel注入到消费端,手动处理,例如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Component
@EnableBinding(TestSink.class)
public class TestConsumer 

    private static final Logger logger = LoggerFactory.getLogger(TestConsumer.class);
    private static final Long MAX_RETRY = Long.valueOf(3L);
    @StreamListener(TestSink.INPUT)
    public void consume(Message message,
                        @Header(name = AmqpHeaders.CHANNEL, required = false) Channel channel,
                        @Header(AmqpHeaders.DELIVERY_TAG) Long deliveryTag,
                        @Header(name = "x-death", required = false) Map<?,?> death) throws IOException 
        logger.info("收到消息:", message);

        if(death!=null && death.get("count")!=null && Long.valueOf(death.get("count").toString()).compareTo(MAX_RETRY)>=0)
            logger.error("放弃该消息");
            channel.basicAck(deliveryTag, false);
            return;
        
        //c
        channel.basicReject(deliveryTag, false);
    

转自:Edison Xu

链接:http://edisonxu.com/2022/01/28/spring-cloud-stream-rabbit.html


 

写在最后的话

大家看完有什么不懂的可以在下方留言讨论,也可以私信问我一般看到后我都会回复的。最后觉得文章对你有帮助的话记得点个赞哦,点点关注不迷路
@终端研发部
每天都有新鲜的干货分享!

回复 【idea激活】即可获得idea的激活方式
回复 【Java】获取java相关的视频教程和资料
回复 【SpringCloud】获取SpringCloud相关多的学习资料
回复 【python】获取全套0基础Python知识手册
回复 【2020】获取2020java相关面试题教程
回复 【加群】即可加入终端研发部相关的技术交流群
阅读更多
用 Spring 的 BeanUtils 前,建议你先了解这几个坑!

lazy-mock ,一个生成后端模拟数据的懒人工具

在华为鸿蒙 OS 上尝鲜,我的第一个“hello world”,起飞!

字节跳动一面:i++ 是线程安全的吗?

一条 SQL 引发的事故,同事直接被开除!!

太扎心!排查阿里云 ECS 的 CPU 居然达100%

一款vue编写的功能强大的swagger-ui,有点秀(附开源地址)


相信自己,没有做不到的,只有想不到的在这里获得的不仅仅是技术!



点“在看”支持小于哥呀,谢谢啦

以上是关于Spring Cloud Stream整合Rabbit之重复投递的主要内容,如果未能解决你的问题,请参考以下文章

Spring Cloud Stream整合Rabbit之重复投递

微服务架构 | 8.1 使用 Spring Cloud Stream 整合 Apache kafka #yyds干货盘点#

Spring Cloud(12)——基于Kafka的Stream实现

Spring cloud stream入门介绍

NodeJS Cloud外部配置AUTO REFRESH。

spring-cloud-stream 请求-回复消息模式