Kafka 消费者异常和偏移提交

Posted

技术标签:

【中文标题】Kafka 消费者异常和偏移提交【英文标题】:Kafka consumer exception and offset commits 【发布时间】:2017-08-22 10:37:50 【问题描述】:

我一直在尝试为 Spring Kafka 做一些 POC 工作。具体来说,我想尝试在 Kafka 中消费消息时处理错误的最佳实践。

我想知道是否有人能够提供帮助:

    围绕 Kafka 消费者应该做的事情分享最佳实践 当出现故障时 帮助我了解 AckMode Record 的工作原理,以及在侦听器方法中引发异常时如何防止提交到 Kafka 偏移队列。

2的代码示例如下:

鉴于 AckMode 设置为 RECORD,根据documentation:

在监听器处理完之后返回时提交偏移量 记录。

如果侦听器方法抛出异常,我会认为偏移量不会增加。但是,当我使用下面的代码/配置/命令组合对其进行测试时,情况并非如此。偏移量仍然会更新,并且会继续处理下一条消息。

我的配置:

    private Map<String, Object> producerConfigs() 
    Map<String, Object> props = new HashMap<>();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.1:9092");
    props.put(ProducerConfig.RETRIES_CONFIG, 0);
    props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
    props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
    props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    return props;


   @Bean
ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() 
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));
    factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.RECORD);
    return factory;

我的代码:

@Component
public class KafkaMessageListener
    @KafkaListener(topicPartitions = @TopicPartition( topic = "my-replicated-topic", partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "0", relativeToCurrent = "true")))
    public void onReplicatedTopicMessage(ConsumerRecord<Integer, String> data) throws InterruptedException 
            throw new RuntimeException("Oops!");
    

验证偏移的命令:

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group test-group

我正在使用 kafka_2.12-0.10.2.0 和 org.springframework.kafka:spring-kafka:1.1.3.RELEASE

【问题讨论】:

【参考方案1】:

容器(通过ContainerProperties)有一个属性ackOnError,默认为true...

/**
 * Set whether or not the container should commit offsets (ack messages) where the
 * listener throws exceptions. This works in conjunction with @link #ackMode and is
 * effective only when the kafka property @code enable.auto.commit is @code false;
 * it is not applicable to manual ack modes. When this property is set to @code true
 * (the default), all messages handled will have their offset committed. When set to
 * @code false, offsets will be committed only for successfully handled messages.
 * Manual acks will be always be applied. Bear in mind that, if the next message is
 * successfully handled, its offset will be committed, effectively committing the
 * offset of the failed message anyway, so this option has limited applicability.
 * Perhaps useful for a component that starts throwing exceptions consistently;
 * allowing it to resume when restarted from the last successfully processed message.
 * @param ackOnError whether the container should acknowledge messages that throw
 * exceptions.
 */
public void setAckOnError(boolean ackOnError) 
    this.ackOnError = ackOnError;

但请记住,如果下一条消息成功,则无论如何都会提交其偏移量,这也有效地提交了失败的偏移量。

编辑

从 2.3 版开始,ackOnError 现在默认为 false

【讨论】:

感谢@Gary 的提示。你知道是否有关于如何处理 Kafka 消费者错误的最佳实践?似乎开箱即用,读取消息的错误只会被记录,然后被吞掉。 另外,我注意到代码的 cmets 似乎是矛盾的:“仅当 auto ack 为 false 时才有效;它不适用于手动 ack。”。我猜第一部分应该是真的,而不是假的? 传递失败的最佳实践可能是将坏消息保存在某个地方(可能在另一个 - 死信 - 主题中)。如果需要严格的消息排序,则可能需要不提交偏移量 (ackOnError=false) 并停止容器。 嗨@Gary,我注意到当消息的反序列化出现错误时,Spring Kafka 会陷入循环,不断尝试一次又一次地读取相同的不可反序列化的消息,并且永远不会移动到下一个偏移量。这听起来有些不一致 - 即,如果您可以反序列化消息,但它出错,Spring Kafka 将继续进行下一个偏移量。但是,如果你不能反序列化消息,Spring Kafka 基本上会卡住。这是你理解的行为吗?您对如何处理反序列化错误有什么建议吗?非常感谢! 不幸的是,kafka 反序列化发生在 Spring Kafka 看到数据之前;所以我们对此无能为力。您需要一个更智能的反序列化器来捕获异常,并可能返回一些将反序列化错误传达给应用层的值。

以上是关于Kafka 消费者异常和偏移提交的主要内容,如果未能解决你的问题,请参考以下文章

kafka重复消费的原因

Kafka-消费者-偏移量的提交方式

Kafka消费者手动提交消息偏移

Kafka 消费者正在读取重新启动时最后提交的偏移量(Java)

Kafka消费者之提交消息的偏移量

如何正确使用 Kafka 消费者“寻找”以返回所有分区的未提交偏移量?