KafkaConsumer:`seekToEnd()` 不会让消费者从最新的偏移量消费

Posted

技术标签:

【中文标题】KafkaConsumer:`seekToEnd()` 不会让消费者从最新的偏移量消费【英文标题】:KafkaConsumer: `seekToEnd()` does not make consumer consume from latest offset 【发布时间】:2022-01-09 20:40:52 【问题描述】:

我有以下代码

class Consumer(val consumer: KafkaConsumer<String, ConsumerRecord<String>>) 

    fun run() 
        consumer.seekToEnd(emptyList())
        val pollDuration = 30 // seconds

        while (true) 
            val records = consumer.poll(Duration.ofSeconds(pollDuration))
            // perform record analysis and commitSync()
            
        
    

消费者订阅的主题不断接收记录。有时,消费者会由于处理步骤而崩溃。当消费者然后重新启动时,我希望它从主题的最新偏移量开始消费(即忽略消费者关闭时发布到主题的记录)。我认为seekToEnd() 方法可以确保这一点。但是,似乎该方法根本没有效果。消费者从它崩溃的偏移量开始消费。

seekToEnd()的正确使用方法是什么?

编辑:使用以下配置创建消费者

fun <T> buildConsumer(valueDeserializer: String): KafkaConsumer<String, T> 
    val props = setupConfig(valueDeserializer)
    Common.setupConsumerSecurityProtocol(props)
    return createConsumer(props)


fun setupConfig(valueDeserializer: String): Properties 
    // Configuration setup
    val props = Properties()

    props[ConsumerConfig.GROUP_ID_CONFIG] = config.applicationId
    props[ConsumerConfig.CLIENT_ID_CONFIG] = config.kafka.clientId
    props[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = config.kafka.bootstrapServers
    props[AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG] = config.kafka.schemaRegistryUrl

    props[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = config.kafka.stringDeserializer
    props[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = valueDeserializer
    props[KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG] = "true"

    props[ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG] = config.kafka.maxPollIntervalMs
    props[ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG] = config.kafka.sessionTimeoutMs

    props[ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG] = "false"
    props[ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG] = "false"
    props[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "latest"

    return props


fun <T> createConsumer(props: Properties): KafkaConsumer<String, T> 
    val consumer = KafkaConsumer<String, T>(props)
    consumer.subscribe(listOf(config.kafka.inputTopic))
    return consumer

【问题讨论】:

你应该在投票前尝试 commitSync 另外,如果你禁用自动提交并设置auto.offset.reset=latest,重启时它总是从主题的末尾开始 auto.offset.reset 配置仅在消费者组没有在某处提交有效偏移量时才会启动,如下所述:***.com/a/32392174/11724337 为什么您认为在 poll 之前添加 commitSync 会有所帮助? 如果您不自动提交(并且您的代码不会自行提交),则不会有任何存储的偏移量,因此它将始终寻求该设置。否则,如果您在轮询之前提交,那么您将保证为消费者组存储结束偏移量 【参考方案1】:

seekToEnd 方法需要您计划让消费者从最后读取的实际分区的信息(在 Kafka 术语中为 TopicPartition)。

我不熟悉 Kotlin API,但检查 KafkaConsumer's method seekToEnd 上的 JavaDocs 你会看到,它要求提供 TopicPartitions 的集合。

由于您目前使用的是emptyList(),所以它不会产生任何影响,就像您观察到的那样。

【讨论】:

来自文档 - 如果没有提供分区,则寻找所有当前分配的分区的最终偏移量 hm... 我应该自己阅读文档到最后。所以,我想我的回答在这里无济于事。【参考方案2】:

我找到了解决办法!

我需要添加一个虚拟民意调查作为消费者初始化过程的一部分。由于几个 Kafka 方法是惰性评估的,因此有必要使用虚拟轮询将分区分配给消费者。如果没有虚拟轮询,消费者会尝试寻找空分区的末尾。结果,seekToEnd() 无效。

重要的是,虚拟轮询持续时间足够长,以便分配分区。例如consumer.poll((Duration.ofSeconds(1)),在程序进入下一个方法调用(即seekToEnd())之前,没有时间分配分区。

工作代码可能看起来像这样

class Consumer(val consumer: KafkaConsumer<String, ConsumerRecord<String>>) 

    fun run() 
        // Initialization 
        val pollDuration = 30 // seconds
        consumer.poll((Duration.ofSeconds(pollDuration)) // Dummy poll to get assigned partitions

        // Seek to end and commit new offset
        consumer.seekToEnd(emptyList())
        consumer.commitSync() 

        while (true) 
            val records = consumer.poll(Duration.ofSeconds(pollDuration))
            // perform record analysis and commitSync()
            
        
    

【讨论】:

以上是关于KafkaConsumer:`seekToEnd()` 不会让消费者从最新的偏移量消费的主要内容,如果未能解决你的问题,请参考以下文章

KafkaConsumer 架构设计剖析和源码全流程详解

KafkaConsumer 架构设计剖析和源码全流程详解

KafkaConsumer 架构设计剖析和源码全流程详解

kafkaconsumer SimpleExample

KafkaConsumer 对于多线程访问 pyspark 是不安全的

kafka-python KafkaConsumer 多分区提交偏移量