Spring kafka记录标题未正确填充

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spring kafka记录标题未正确填充相关的知识,希望对你有一定的参考价值。

我在应用程序中使用spring kafka(版本:2.3.4.RELEASE)。我试图增加一个计数器,并在重试时将其作为记录中的自定义标头传递。在第一次重试中,将添加标题。但是在第二次重试中,当调用handle()时,RecordHeader为空。

配置类:

@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(primaryConsumerFactory());
    factory.setConcurrency(this.kafkaConfigProperties.getConsumerConcurrency());
    factory.setAutoStartup(false);
    factory.getContainerProperties().setAckOnError(false);
    factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
    factory.setErrorHandler(new KafkaErrorHandler(
            new RetryPublishingRecoverer(this.kafkaConfigProperties.getRetryTopic(),
                    retryKafkaTemplate()),
            this.kafkaConfigProperties.getRetryCount(), new DeadLetterPublishingRecoverer(
                    this.kafkaConfigProperties.getDeadLetterTopic(), deadLetterKafkaTemplate())));
    return factory;
}

RetryPublishingRecoverer类:

public class RetryPublishingRecoverer extends DeadLetterPublishingRecoverer {

private static final String RETRY_COUNT_HEADER = "RETRY_COUNT";

public PriceRetryPublishingRecoverer(String retryTopic, KafkaTemplate<Object, Object> template) {
    super(template, (consumerRecord, exception) -> new TopicPartition(retryTopic, consumerRecord.partition()));
}

public void accept(ConsumerRecord<?, ?> record, Exception exception) {
    Headers headers = record.headers();
    var retryCounter = new Object() {
        int count = 0;
    };
    for (Header header : headers) {
        if (RETRY_COUNT_HEADER.equals(header.key())) {
            retryCounter.count = Integer.parseInt(new String(header.value()));
            break;
        }
    }
    headers.remove(RETRY_COUNT_HEADER);
    headers.add(new RecordHeader(RETRY_COUNT_HEADER,
            String.valueOf(++retryCounter.count).getBytes(StandardCharsets.UTF_8)));
    super.accept(record, exception);
}
}

ErrorHandler类:

public class KafkaErrorHandler extends SeekToCurrentErrorHandler {

private static final Logger LOG = LoggerFactory.getLogger(KafkaErrorHandler.class);
private static final String ERROR_MESSAGE = "Exception is null in handle method";

private BiConsumer<ConsumerRecord<?, ?>, Exception> deadLetterRecoverer;

public KafkaErrorHandler(BiConsumer<ConsumerRecord<?, ?>, Exception> retryRecoverer, int maxFailures,
        BiConsumer<ConsumerRecord<?, ?>, Exception> deadLetterRecoverer) {
    super(retryRecoverer, maxFailures);
    this.deadLetterRecoverer = deadLetterRecoverer;
}

public void handle(Exception exception, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer,
        MessageListenerContainer container) {
    if (exception != null) {
        if (DeserializationException.class == exception.getClass() || (exception.getCause() != null
                && exception.getCause().getClass() == MethodArgumentNotValidException.class)) {
            records.forEach(record -> {
                this.deadLetterRecoverer.accept(record, exception);
            });
            SeekUtils.doSeeks(records, consumer, exception, true, (c, e) -> {
                return true;
            }, this.logger);
        } else {
            super.handle(exception, records, consumer, container);
        }
    } else {
        LOG.warn(ERROR_MESSAGE);
    }
}
}

可能是什么问题?任何帮助将不胜感激。

答案

添加这样的标头在重试期间无济于事;当前记录将被丢弃,并从代理中获取一条新记录。

您为什么需要这样做?

标准SeekToCurrentErrorHandler维护FailedRecordTracker中当前记录的重试状态。

以上是关于Spring kafka记录标题未正确填充的主要内容,如果未能解决你的问题,请参考以下文章

Spring boot:thymeleaf 没有正确渲染片段

在 oracle 表单 10g 中的 LOV 填充后未触发后查询触发器

保存对象时未填充 Spring Boot JPA@CreatedDate @LastModifiedDate

给定最终块未正确填充

使用 Spring boot webflux reactive 记录未持久化到 R2DB

多对多 Spring Boot JPA 未填充多对多表