Spring kafka记录标题未正确填充
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spring kafka记录标题未正确填充相关的知识,希望对你有一定的参考价值。
我在应用程序中使用spring kafka(版本:2.3.4.RELEASE)。我试图增加一个计数器,并在重试时将其作为记录中的自定义标头传递。在第一次重试中,将添加标题。但是在第二次重试中,当调用handle()时,RecordHeader为空。
配置类:
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(primaryConsumerFactory());
factory.setConcurrency(this.kafkaConfigProperties.getConsumerConcurrency());
factory.setAutoStartup(false);
factory.getContainerProperties().setAckOnError(false);
factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
factory.setErrorHandler(new KafkaErrorHandler(
new RetryPublishingRecoverer(this.kafkaConfigProperties.getRetryTopic(),
retryKafkaTemplate()),
this.kafkaConfigProperties.getRetryCount(), new DeadLetterPublishingRecoverer(
this.kafkaConfigProperties.getDeadLetterTopic(), deadLetterKafkaTemplate())));
return factory;
}
RetryPublishingRecoverer类:
public class RetryPublishingRecoverer extends DeadLetterPublishingRecoverer {
private static final String RETRY_COUNT_HEADER = "RETRY_COUNT";
public PriceRetryPublishingRecoverer(String retryTopic, KafkaTemplate<Object, Object> template) {
super(template, (consumerRecord, exception) -> new TopicPartition(retryTopic, consumerRecord.partition()));
}
public void accept(ConsumerRecord<?, ?> record, Exception exception) {
Headers headers = record.headers();
var retryCounter = new Object() {
int count = 0;
};
for (Header header : headers) {
if (RETRY_COUNT_HEADER.equals(header.key())) {
retryCounter.count = Integer.parseInt(new String(header.value()));
break;
}
}
headers.remove(RETRY_COUNT_HEADER);
headers.add(new RecordHeader(RETRY_COUNT_HEADER,
String.valueOf(++retryCounter.count).getBytes(StandardCharsets.UTF_8)));
super.accept(record, exception);
}
}
ErrorHandler类:
public class KafkaErrorHandler extends SeekToCurrentErrorHandler {
private static final Logger LOG = LoggerFactory.getLogger(KafkaErrorHandler.class);
private static final String ERROR_MESSAGE = "Exception is null in handle method";
private BiConsumer<ConsumerRecord<?, ?>, Exception> deadLetterRecoverer;
public KafkaErrorHandler(BiConsumer<ConsumerRecord<?, ?>, Exception> retryRecoverer, int maxFailures,
BiConsumer<ConsumerRecord<?, ?>, Exception> deadLetterRecoverer) {
super(retryRecoverer, maxFailures);
this.deadLetterRecoverer = deadLetterRecoverer;
}
public void handle(Exception exception, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer,
MessageListenerContainer container) {
if (exception != null) {
if (DeserializationException.class == exception.getClass() || (exception.getCause() != null
&& exception.getCause().getClass() == MethodArgumentNotValidException.class)) {
records.forEach(record -> {
this.deadLetterRecoverer.accept(record, exception);
});
SeekUtils.doSeeks(records, consumer, exception, true, (c, e) -> {
return true;
}, this.logger);
} else {
super.handle(exception, records, consumer, container);
}
} else {
LOG.warn(ERROR_MESSAGE);
}
}
}
可能是什么问题?任何帮助将不胜感激。
答案
添加这样的标头在重试期间无济于事;当前记录将被丢弃,并从代理中获取一条新记录。
您为什么需要这样做?
标准SeekToCurrentErrorHandler
维护FailedRecordTracker
中当前记录的重试状态。
以上是关于Spring kafka记录标题未正确填充的主要内容,如果未能解决你的问题,请参考以下文章
Spring boot:thymeleaf 没有正确渲染片段
在 oracle 表单 10g 中的 LOV 填充后未触发后查询触发器
保存对象时未填充 Spring Boot JPA@CreatedDate @LastModifiedDate