Spring Cloud Stream功能手册确认 - KafkaHeaders.ACKNOWLEDGMENT不可用。
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spring Cloud Stream功能手册确认 - KafkaHeaders.ACKNOWLEDGMENT不可用。相关的知识,希望对你有一定的参考价值。
我正在使用Spring Cloud Webflux和Spring Cloud Streams Functional Interfaces来处理我的kafka处理。
如果我按顺序进行处理,如果我杀了应用程序,它就会返回处理的消息,这与预期的一样,因为没有消息丢失。然而,如果我尝试做并行处理,它似乎在向Kafka确认,这是可以理解的,因为它现在是一个单独的线程,因此想转为手动确认。
我的代码。
- application.yml (相关部分)
spring:
cloud:
stream:
kafka:
binder:
brokers: localhost:9092
autoAddPartitions: true
minPartitionCount: 2
bindings:
receiver-in-0:
binder: kafka
destination: topic-1
content-type: text/plain;charset=UTF-8
group: input-group-1
consumer:
autoCommitOffset: false
spring.cloud.stream.function.definition: receiver
- 收件人代码
public Consumer<Flux<Message<String>>> receiver() throws IOException {
return (sink -> {
sink
.onBackpressureBuffer()
.parallel(4)
.runOn(Schedulers.parallel())
.subscribe((record)->{
Flux<Action> executor = new
//Internal code which does transformation and provides a flux for execution (names changed)
IncomingMessage().process(record);
if(executor != null) {
Disposable disposable=null;
disposable= executor.subscribe(
(action)->{
try {
//Process execute does the processing on the modified data (names changed)
Process.execute(action);
Acknowledgment acknowledgment = record.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
if(acknowledgment !=null) {
acknowledgment.acknowledge();
}
}
catch(Exception e) {
log.fatal(e.getMsg());
}
},
(e)->{
log.fatal(e.getMsg());
}
});
if(disposable != null) {
disposable.dispose();
}
}
});
});
}
这里的线路 record.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
总是给出null,所以我假设 autoCommitOffset: false
是不工作的,我试着把下面的配置也放在绑定部分,但没有用。
receiver-in-0:
binder: kafka
destination: topic-1
content-type: text/plain;charset=UTF-8
group: input-group-1
autoCommitOffset: false
我的要求是,如果我杀死应用程序,即使在并行场景中,它也应该继续读取第一个未确认消息的消息。
没有得到确认头的问题是由于标签的位置不正确。因为这纯粹是一个kafka binder属性。添加了以下属性
spring:
cloud:
stream:
kafka:
bindings:
receiver-in-0:
consumer:
autoCommitOffset: false
通道名称应该与函数调用中的通道名称相同,或者可以设置默认值。
spring:
cloud:
stream:
kafka:
default:
consumer:
autoCommitOffset: false
然而并行处理传入的通量可能不是一个好主意,因为消息可能会被丢弃,因为一些后来的消息可以得到确认。这将需要更多的逻辑来确认,而不是仅仅设置参数确认。
以上是关于Spring Cloud Stream功能手册确认 - KafkaHeaders.ACKNOWLEDGMENT不可用。的主要内容,如果未能解决你的问题,请参考以下文章
Spring Cloud Stream Supplier 功能模型
Spring Cloud搭建手册——Spring Cloud Config