如果发生异常,如何禁用记录 Kafka 批处理中的所有消息?

Posted

技术标签:

【中文标题】如果发生异常,如何禁用记录 Kafka 批处理中的所有消息?【英文标题】:How to disable logging all messages in a Kafka batch in case of an exception? 【发布时间】:2021-09-11 21:24:29 【问题描述】:

@KafkaListener 与批处理一起使用时,错误处理程序会记录整个批处理的内容(所有消息)以防出现异常。

我怎样才能使它不那么冗长?我想避免在日志文件中包含所有消息,只看到实际的异常。

这是我的消费者当前的样子的一个最小示例:

@Component
class TestConsumer 
    @Bean
    fun kafkaBatchListenerContainerFactory(kafkaProperties: KafkaProperties): ConcurrentKafkaListenerContainerFactory<String, String> 
        val configs = kafkaProperties.buildConsumerProperties()
        configs[ConsumerConfig.MAX_POLL_RECORDS_CONFIG] = 10000
        val factory = ConcurrentKafkaListenerContainerFactory<String, String>()
        factory.consumerFactory = DefaultKafkaConsumerFactory(configs)
        factory.isBatchListener = true
        return factory
    

    @KafkaListener(
        topics = ["myTopic"],
        containerFactory = "kafkaBatchListenerContainerFactory"
    )
    fun batchListen(values: List<ConsumerRecord<String, String>>) 
        // Something that might throw an exception in rare cases.
    

【问题讨论】:

【参考方案1】:

你用的是什么版本?

此容器属性是在 2.2.14 中添加的。

    /**
     * Set to false to log @code record.toString() in log messages instead
     * of @code topic-partition@offset.
     * @param onlyLogRecordMetadata false to log the entire record.
     * @since 2.2.14
     */
    public void setOnlyLogRecordMetadata(boolean onlyLogRecordMetadata) 
        this.onlyLogRecordMetadata = onlyLogRecordMetadata;
    

一直是 true by default since version 2.7(这就是为什么 javadocs 现在这么读的原因)。

这是以前的 javadoc:

    /**
     * Set to true to only log @code topic-partition@offset in log messages instead
     * of @code record.toString().
     * @param onlyLogRecordMetadata true to only log the topic/parrtition/offset.
     * @since 2.2.14
     */

此外,从 2.5 版开始,您可以在错误处理程序上设置日志级别:

    /**
     * Set the level at which the exception thrown by this handler is logged.
     * @param logLevel the level (default ERROR).
     */
    public void setLogLevel(KafkaException.Level logLevel) 
        Assert.notNull(logLevel, "'logLevel' cannot be null");
        this.logLevel = logLevel;
    

【讨论】:

谢谢!我使用的是 Spring Boot 2.5.1 版,所以在我的情况下,onlyLogRecordMetadata 应该已经是 true,但似乎不是。如何访问我的ConsumerProperties 来检查/更改它? 自动连接 KafkaListenerEndpointRegistry 然后 registry.getListenerContainer(listenerId).getContainerProperties() 谢谢,成功了。是的,这已经是真的了。我会尝试找出日志的情况。 啊,明白了。对于我的最小示例,我将kafkaBatchListenerContainerFactory 复制到我的应用程序代码中,该代码使用Spring/Kafka 版本2.7.3。在那之前,我是从一个自建库中调用它的一个版本,它仍然是版本2.5.1。这就是为什么isOnlyLogRecordMetadatafalse 并且在我最近的测试中是true。再次感谢,现在一切都说得通了。 :-)

以上是关于如果发生异常,如何禁用记录 Kafka 批处理中的所有消息?的主要内容,如果未能解决你的问题,请参考以下文章

发生了未经处理的异常

Spring自带Kafka消息异常处理

微服务如何记录kafka节点异常

处理asp.net核心中的异常?

异常处理——把异常记录下来,log4j的使用

如果写入元素发生异常,如何正确处理 Spring Batch 步骤编写器异常以便继续处理其他元素?