kafka消费者中死信队列的好选择是啥

Posted

技术标签:

【中文标题】kafka消费者中死信队列的好选择是啥【英文标题】:What is the good choice for Dead letter queue in kafka consumer [closed]kafka消费者中死信队列的好选择是什么 【发布时间】:2020-02-03 11:24:20 【问题描述】:

我正在写一个卡夫卡消费者。消费者的工作主要是关于创建多个数据库实体并在处理有效负载后保存它们。我正在尝试编写代码来处理使用数据时可能发生的错误。 为此,我可以想到 2 个选项(在 Spring 生态系统中)

    将失败的消息发送到 dead-letter-kafka-topic 将失败消息发送到新的数据库表(错误表)

失败的消息需要重新处理。

案例 1: 我还要再写一个@KafkaListner,它会监听死信主题并处理消息。这里的问题是我无法更好地控制如何启动重新处理流程。 (就像一个调度器)因为KafkaListener会在数据一发布到dead letter topic中就开始处理数据。

情况 2: 我可以更好地控制重新处理流程,因为我可以编写一个 REST 端点或调度程序来尝试重新处理失败的消息。 (在这里我对使用哪个数据库感到两难。关系或一些键值存储)

我基本上陷入了设计困境,无法确定 Spring 生态系统中哪种方法更好。

感谢您的回复。

【问题讨论】:

【参考方案1】:

我认为使用 Kafka 是最好的解决方案。

因为 KafkaListener 会在死信主题一发布数据就开始处理数据。

您可以通过在该侦听器上将 autoStartup 设置为 false 来控制行为,然后根据需要使用 KafkaListenerEndpointRegistry 启动/停止侦听器:

registry.getListenerContainer(myListenerId).start();

或者您可以使用自己的KafkaConsumer(由消费者工厂创建)并根据需要轮询尽可能多的记录,并在完成后关闭消费者。

【讨论】:

非常感谢!我有这个困境,因为在我们的企业中,kafka 消息保留期最长为 72 小时。整个 kafka 平台由不同的团队管理。因此,我想到了将失败的消息保存到 DB 的想法。【参考方案2】:

我同意 Gary Russell 的回答,您可以创建 KafkaConsumer 实例并控制其生命周期。该类来自org.apache.kafka:kafka-clients 库。

在您的特定情况下,您可以添加Thread.sleep(schedulerDelay) 来实现调度。这是简化的示例:

@Component
class Scheduler() 

  public void init() 
    // create kafka consumer connected to your DLQ topic
  

  public void run() 
    try 
      while (running) 
        ConsumerRecords<String, String> records = consumer.poll(1000);
        for (ConsumerRecord<String, String> record : records)
          processRecordLogicGoesHere(record);
        Thread.sleep(schedulerDelay);
      
     finally 
      consumer.close();
    
  


schedulerDelay 应小心拾取以跟上传入的消息,并且不要让它们因 Kafka 的日志清理策略而丢失。

关于如何使用 Kafka 的官方 API 的教程很多,这里是其中之一:Introducing the Kafka Consumer

另外,你可以在这里找到一些想法:Retrying consumer architecture in the Apache Kafka

【讨论】:

从概念上讲,使用裸KafkaConsumerPollingConsumer 模式的实现。 Spring 的 KafkaListenerEvent-Driven Consumer 模式的实现。这两个来自企业集成模式书。 非常感谢您的帮助!将对此进行调查。

以上是关于kafka消费者中死信队列的好选择是啥的主要内容,如果未能解决你的问题,请参考以下文章

RabbitMQ实战-死信队列

RabbitMQ中的死信队列

RocketMQ:死信队列和消息幂等

RocketMQ的死信队列

RocketMQ的死信队列

MQ死信队列