在 Spring Cloud Stream Kafka 中正确管理 DLQ

Posted

技术标签:

【中文标题】在 Spring Cloud Stream Kafka 中正确管理 DLQ【英文标题】:Correctly manage DLQ in Spring Cloud Stream Kafka 【发布时间】:2018-12-17 05:31:24 【问题描述】:

我想使用 kafka 在 Spring Cloud Stream 中管理一个 DLQ。

application.yaml

server:
    port: 8091
eureka:
    client:
        serviceUrl:
            defaultZone: http://IP:8761/eureka
spring:
    application:
        name: employee-consumer
    cloud:
        stream:
            kafka:
                binder:
                    brokers: IP:9092
                bindings:
                    greetings-in:
                        destination: greetings
                        contentType: application/json
                    greetings-out:
                        destination: greetings
                        contentType: application/json
            bindings:
                greetings-out:
                    consumer:
                        enableDlq: true
                        dlqName: dead-out
    kafka:
      consumer:
        group-id: A

正如您在我的配置中看到的,我启用了 dlq 并为 dlq 主题设置了一个名称。

为了测试 DLQ 行为,我在某些消息上抛出异常

我的监听器组件

@StreamListener("greetings-out")
    public void handleGreetingsInput(@Payload Greetings greetings) throws Exception 
        logger.info("Greetings input -> ", greetings);
        if (greetings.getMessage().equals("ciao")) 
            throw new Exception("eer");
        
    

这样,等于“ciao”的消息抛出异常,在日志中我看到它被处理了3次

2018-07-09 13:19:57.256  INFO 1 --- [container-0-C-1] com.mitro.service.GreetingsListener      : Greetings input -> com.mitro.model.Greetings@3da9d701[timestamp=0,message=ciao]
2018-07-09 13:19:58.259  INFO 1 --- [container-0-C-1] com.mitro.service.GreetingsListener      : Greetings input -> com.mitro.model.Greetings@5bd62aaf[timestamp=0,message=ciao]
2018-07-09 13:20:00.262  INFO 1 --- [container-0-C-1] com.mitro.service.GreetingsListener      : Greetings input -> com.mitro.model.Greetings@c26f92b[timestamp=0,message=ciao]
2018-07-09 13:20:00.266 ERROR 1 --- [container-0-C-1] o.s.integration.handler.LoggingHandler   : org.springframework.messaging.MessagingException: Exception thrown while invoking com.mitro.service.GreetingsListener#handleGreetingsInput[1 args]; nested exception is java.lang.Exception: eer, failedMessage=GenericMessage [payload=byte[32], headers=kafka_offset=3, scst_nativeHeadersPresent=true, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@510302cb, deliveryAttempt=3, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, contentType=application/json, kafka_receivedTopic=greetings-out, kafka_receivedTimestamp=1531142397248]

这对我来说很好,但我不明白为什么会创建一个名为 dead-out 的主题(请看下图)。

我做错了什么?

编辑 1:(仍然没有为 DLQ 创建主题)

server:
    port: 8091
eureka:
    client:
        serviceUrl:
            defaultZone: http://IP:8761/eureka
spring:
    application:
        name: employee-consumer
    cloud:
        stream:
            kafka:
                streams:
                    binder:
                        serdeError: sendToDlq
                binder:
                    brokers: IP:9092
                    auto-create-topics: true
                bindings:
                    greetings-out:
                        destination: greetings-out
                        contentType: application/json
                        consumer:
                          enableDql: true
                          dlqName: dead-out
                          autoCommitOnError: true
                          autoCommitOffset: true
            bindings:
                greetings-out:
                    destination: greetings-out
                    contentType: application/json
                    consumer:
                        enableDlq: true
                        dlqName: dead-out
                        autoCommitOnError: true
                        autoCommitOffset: true
    kafka:
      consumer:
        group-id: A

【问题讨论】:

【参考方案1】:

看起来你的属性被颠倒了;公共属性 - 目的地、内容类型 - 必须在 spring.cloud.stream.bindings 下。特定于 kafka 的属性(enableDlq、dlqName)必须在 spring.clound.stream.kafka.bindings 下。

你把它们颠倒了。

编辑

您的(修改后的)配置存在两个问题。

    错字enableDql 而不是enableDlq 没有组 - 您不能与匿名消费者建立 DLQ:

原因:java.lang.IllegalArgumentException:DLQ 支持不适用于匿名订阅

这很好用:

spring:
  application:
    name: employee-consumer
  cloud:
    stream:
      kafka:
        binder:
          brokers: localhost:9092
          auto-create-topics: true
        bindings:
          input:
            consumer:
              enableDlq: true
              dlqName: dead-out
              autoCommitOnError: true
              autoCommitOffset: true
      bindings:
        input:
          group: so51247113
          destination: greetings-out
          contentType: application/json

@SpringBootApplication
@EnableBinding(Sink.class)
public class So51247113Application 

    public static void main(String[] args) 
        SpringApplication.run(So51247113Application.class, args);
    

    @StreamListener(Sink.INPUT)
    public void in(String in) 
        System.out.println(in);
        throw new RuntimeException("fail");
    

    @KafkaListener(id = "foo", topics = "dead-out")
    public void dlq(Message<?> in) 
        System.out.println("DLQ:" + in);
    


【讨论】:

我按照你说的颠倒了属性,我得到了这个错误:2018-07-10 10:08:05.232 ERROR 1 --- [main] oscloud.stream.binding.BindingService : Failed to创建消费者绑定;在 30 秒内重试 org.springframework.cloud.stream.binder.BinderException:启动消费者时抛出异常: 用新的 yml 编辑问题,我试图让它与更多的属性一起工作,但仍然没有为 DLQ 创建新的主题 您仍然缺少 DLQ 属性的 kafka: 元素。请再次阅读我的回答。 哦;我懂了;是的;混淆在这两个地方都有它。让我复制你的配置并尝试一下...... 查看我的答案的编辑。我不知道你这里的划分是什么意思。

以上是关于在 Spring Cloud Stream Kafka 中正确管理 DLQ的主要内容,如果未能解决你的问题,请参考以下文章

Spring Cloud 2020.0.0 中的 Spring Cloud Bus/Stream 问题

spring-cloud-stream-kafka 在应用程序启动后仅使用最新消息

一文了解Spring Cloud Stream体系

spring cloud stream

Spring Cloud Stream异常处理

spring cloud-stream 和 spring cloud-bus 有啥区别?