如果发生异常,如何禁用记录 Kafka 批处理中的所有消息?
Posted
技术标签:
【中文标题】如果发生异常,如何禁用记录 Kafka 批处理中的所有消息?【英文标题】:How to disable logging all messages in a Kafka batch in case of an exception? 【发布时间】:2021-09-11 21:24:29 【问题描述】:将@KafkaListener
与批处理一起使用时,错误处理程序会记录整个批处理的内容(所有消息)以防出现异常。
我怎样才能使它不那么冗长?我想避免在日志文件中包含所有消息,只看到实际的异常。
这是我的消费者当前的样子的一个最小示例:
@Component
class TestConsumer
@Bean
fun kafkaBatchListenerContainerFactory(kafkaProperties: KafkaProperties): ConcurrentKafkaListenerContainerFactory<String, String>
val configs = kafkaProperties.buildConsumerProperties()
configs[ConsumerConfig.MAX_POLL_RECORDS_CONFIG] = 10000
val factory = ConcurrentKafkaListenerContainerFactory<String, String>()
factory.consumerFactory = DefaultKafkaConsumerFactory(configs)
factory.isBatchListener = true
return factory
@KafkaListener(
topics = ["myTopic"],
containerFactory = "kafkaBatchListenerContainerFactory"
)
fun batchListen(values: List<ConsumerRecord<String, String>>)
// Something that might throw an exception in rare cases.
【问题讨论】:
【参考方案1】:你用的是什么版本?
此容器属性是在 2.2.14 中添加的。
/**
* Set to false to log @code record.toString() in log messages instead
* of @code topic-partition@offset.
* @param onlyLogRecordMetadata false to log the entire record.
* @since 2.2.14
*/
public void setOnlyLogRecordMetadata(boolean onlyLogRecordMetadata)
this.onlyLogRecordMetadata = onlyLogRecordMetadata;
一直是 true by default since version 2.7(这就是为什么 javadocs 现在这么读的原因)。
这是以前的 javadoc:
/**
* Set to true to only log @code topic-partition@offset in log messages instead
* of @code record.toString().
* @param onlyLogRecordMetadata true to only log the topic/parrtition/offset.
* @since 2.2.14
*/
此外,从 2.5 版开始,您可以在错误处理程序上设置日志级别:
/**
* Set the level at which the exception thrown by this handler is logged.
* @param logLevel the level (default ERROR).
*/
public void setLogLevel(KafkaException.Level logLevel)
Assert.notNull(logLevel, "'logLevel' cannot be null");
this.logLevel = logLevel;
【讨论】:
谢谢!我使用的是 Spring Boot 2.5.1 版,所以在我的情况下,onlyLogRecordMetadata
应该已经是 true
,但似乎不是。如何访问我的ConsumerProperties
来检查/更改它?
自动连接 KafkaListenerEndpointRegistry
然后 registry.getListenerContainer(listenerId).getContainerProperties()
。
谢谢,成功了。是的,这已经是真的了。我会尝试找出日志的情况。
啊,明白了。对于我的最小示例,我将kafkaBatchListenerContainerFactory
复制到我的应用程序代码中,该代码使用Spring/Kafka 版本2.7.3
。在那之前,我是从一个自建库中调用它的一个版本,它仍然是版本2.5.1
。这就是为什么isOnlyLogRecordMetadata
是false
并且在我最近的测试中是true
。再次感谢,现在一切都说得通了。 :-)以上是关于如果发生异常,如何禁用记录 Kafka 批处理中的所有消息?的主要内容,如果未能解决你的问题,请参考以下文章