Spring Batch Kafka Kafka 到数据库作业

Posted

技术标签:

【中文标题】Spring Batch Kafka Kafka 到数据库作业【英文标题】:Spring Batch Kafka Kafka to Database Job 【发布时间】:2019-11-03 23:31:37 【问题描述】:

我需要一个 spring-batch ItemReader 来使用 Kafka 消息,其结果将被进一步处理和写入。

这是我实现的一个项目阅读器:

public abstract class KafkaItemReader<T> implements ItemReader<List<T>> 
  public abstract KafkaConsumer<String, T> getKafkaConsumer();

  public abstract String getTopic();

  public abstract long getPollingTime();

  @Override
  public List<T> read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException 
    Iterator<ConsumerRecord<String, T>> iterator = getKafkaConsumer()
        .poll(Duration.ofMillis(getPollingTime()))
        .records(getTopic())
        .iterator();
    List<T> records = new ArrayList<>();
    while (iterator.hasNext()) 
      records.add(iterator.next().value());
    
    return records;
  

这些是春季批处理作业和步骤的以下bean:

  @Bean
  public ItemWriter<List<DbEntity>> databaseWriter(DataSource dataSource) 
    //some item writer that needs to be implmented
    return null;
  


  @Bean
  public Step kafkaToDatabaseStep(KafkaItemReader kafkaItemReader, //implementation of KafkaItemReader
                                  StepBuilderFactory stepBuilderFactory,
                                  DataSource dataSource) 

    return stepBuilderFactory
        .get("kafkaToDatabaseStep")
        .allowStartIfComplete(true)
        .<List<KafkaRecord>, List<DbEntity>>chunk(100)
        .reader(kafkaItemReader)
        .processor(itemProcessor()) //List<KafkaRecord> to List<DbEntity> converter
        .writer(databaseWriter(dataSource))
        .build();
  


  @Bean
  public Job kafkaToDatabaseJob(
      @Qualifier("kafkaToDatabaseStep") Step step) 
    return jobBuilderFactory.get("kafkaToDatabaseJob")
        .incrementer(new RunIdIncrementer())
        .flow(step)
        .end()
        .build();
  

这里我不知道:

    如何提交写入器中读取消息的偏移量,因为我只想在完成记录处理后提交。 如何在我的场景中使用 JdbcBatchItemWriter 作为 ItemWriter。

【问题讨论】:

【参考方案1】:

即将推出的 Spring Batch v4.2 GA 将支持读取/写入数据到 Apache Kafka 主题。您已经可以使用4.2.0.M2 release 进行尝试了。

您还可以查看 Josh Long 的 Spring Tips installment。

【讨论】:

以上是关于Spring Batch Kafka Kafka 到数据库作业的主要内容,如果未能解决你的问题,请参考以下文章

Kafka集成优化

kafka producer的batch.size和linger.ms

kafka 吞吐量为什么这么大?

kafka如何动态消费新增topic主题

强制终止后 Spring Batch 不更新作业存储库

kafka 参数 batch.sizelinger.ms, max.request.size message.max.bytesfetch.....