Spring Batch Kafka Kafka 到数据库作业
Posted
技术标签:
【中文标题】Spring Batch Kafka Kafka 到数据库作业【英文标题】:Spring Batch Kafka Kafka to Database Job 【发布时间】:2019-11-03 23:31:37 【问题描述】:我需要一个 spring-batch ItemReader 来使用 Kafka 消息,其结果将被进一步处理和写入。
这是我实现的一个项目阅读器:
public abstract class KafkaItemReader<T> implements ItemReader<List<T>>
public abstract KafkaConsumer<String, T> getKafkaConsumer();
public abstract String getTopic();
public abstract long getPollingTime();
@Override
public List<T> read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException
Iterator<ConsumerRecord<String, T>> iterator = getKafkaConsumer()
.poll(Duration.ofMillis(getPollingTime()))
.records(getTopic())
.iterator();
List<T> records = new ArrayList<>();
while (iterator.hasNext())
records.add(iterator.next().value());
return records;
这些是春季批处理作业和步骤的以下bean:
@Bean
public ItemWriter<List<DbEntity>> databaseWriter(DataSource dataSource)
//some item writer that needs to be implmented
return null;
@Bean
public Step kafkaToDatabaseStep(KafkaItemReader kafkaItemReader, //implementation of KafkaItemReader
StepBuilderFactory stepBuilderFactory,
DataSource dataSource)
return stepBuilderFactory
.get("kafkaToDatabaseStep")
.allowStartIfComplete(true)
.<List<KafkaRecord>, List<DbEntity>>chunk(100)
.reader(kafkaItemReader)
.processor(itemProcessor()) //List<KafkaRecord> to List<DbEntity> converter
.writer(databaseWriter(dataSource))
.build();
@Bean
public Job kafkaToDatabaseJob(
@Qualifier("kafkaToDatabaseStep") Step step)
return jobBuilderFactory.get("kafkaToDatabaseJob")
.incrementer(new RunIdIncrementer())
.flow(step)
.end()
.build();
这里我不知道:
-
如何提交写入器中读取消息的偏移量,因为我只想在完成记录处理后提交。
如何在我的场景中使用 JdbcBatchItemWriter 作为 ItemWriter。
【问题讨论】:
【参考方案1】:即将推出的 Spring Batch v4.2 GA 将支持读取/写入数据到 Apache Kafka 主题。您已经可以使用4.2.0.M2 release 进行尝试了。
您还可以查看 Josh Long 的 Spring Tips installment。
【讨论】:
以上是关于Spring Batch Kafka Kafka 到数据库作业的主要内容,如果未能解决你的问题,请参考以下文章
kafka producer的batch.size和linger.ms
kafka 参数 batch.sizelinger.ms, max.request.size message.max.bytesfetch.....