Spring Batch 处理记录,但不将它们插入数据库

Posted

技术标签:

【中文标题】Spring Batch 处理记录,但不将它们插入数据库【英文标题】:Spring Batch processes the records, but is not inserting them into the database 【发布时间】:2021-06-26 06:29:29 【问题描述】:

问题

当我开始使用不同的线程同时多次运行同一个作业时,发生了必须插入的记录,当它们被处理时,来自写入器的记录没有被插入进入数据库。当我同时运行两组数据时,批处理正确运行:

记录已处理的 dataSet1:3606(预期为 3606)。 记录已处理的 dataSet2:1776(预期为 1776)。

从下图中可以看出,Spring Batch 读写的记录数符合预期:

上下文

在这个项目中,我使用 mysql 作为数据库和 Hibernate。

一些代码

批处理配置、作业和步骤
@Configuration
@EnableBatchProcessing
public class BatchConfig extends DefaultBatchConfigurer

    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Autowired
    private StepSkipListener stepSkipListener;

    @Autowired
    private MainJobExecutionListener mainJobExecutionListener;

    @Bean
    public TaskExecutor taskExecutor()
    
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        taskExecutor.setMaxPoolSize(10);
        taskExecutor.setThreadNamePrefix("batch-thread-");

        return taskExecutor;
    

    @Bean
    public JobLauncher jobLauncher() throws Exception
    
        SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
        jobLauncher.setJobRepository(getJobRepository());
        jobLauncher.setTaskExecutor(taskExecutor());
        jobLauncher.afterPropertiesSet();

        return jobLauncher;
    

    @Bean
    public Step mainStep(ReaderImpl reader, ProcessorImpl processor, WriterImpl writer)
    
        return stepBuilderFactory.get("step")
                .<List<ExcelLoad>, Invoice>chunk(10)
                .reader(reader)
                .processor(processor)
                .writer(writer)
                .faultTolerant().skipPolicy(new ExceptionSkipPolicy())
                .listener(stepSkipListener)
                .build();
    

    @Bean
    public Job mainJob(Step mainStep)
    
        return jobBuilderFactory.get("mainJob")
                                .listener(mainJobExecutionListener)
                                .incrementer(new RunIdIncrementer())
                                .start(mainStep)
                                .build();
    

作家
@Override
public void write(List<? extends Invoice> list)

    invoiceRepository.saveAll(list);

存储库
@Repository
public interface InvoiceRepository extends JpaRepository<Invoice, Integer>

属性
spring.main.allow-bean-definition-overriding=true
spring.batch.initialize-schema=always
spring.batch.job.enabled=false
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.url=jdbc:mysql://localhost:3306/bd_dev?autoReconnect=true&useTimezone=true&useLegacyDatetimeCode=false&serverTimezone=Europe/Paris&zeroDateTimeBehavior=convertToNull
spring.datasource.username=root
spring.datasource.password=password
spring.jpa.properties.hibernate.dialect=org.hibernate.dialect.MySQL5InnoDBDialect

在使用单独的线程之前,已处理的记录已正确插入到数据库中。会发生什么?

【问题讨论】:

【参考方案1】:

在使用单独的线程之前,已处理的记录已正确插入到数据库中。会发生什么?

如果您决定使用多线程步骤,则需要确保您的批处理工件(读取器、写入器等)是线程安全的。从您共享的内容来看,write 方法在线程之间不同步,因此不是线程安全的。这在文档的Multi-threaded Step 部分进行了说明。

您需要同步它(通过使用synchronized 关键字,或使用Lock 等)或将您的编写器包装在SynchronizedItemStreamWriter 中。

【讨论】:

使用 ItemWriter 实现 @Bean PlatformTransactionManager 对我有用,问题是这是否有我不知道的风险 我从您的问题中了解到的是:When I've started to use separate threads [..] from the Writer aren't being inserted into the database 意味着事情在单线程步骤中按预期工作,但在您开始使用多线程时却没有。所以对我来说,事务管理器不是问题,而是“丢失”未写入的记录,这可能是由于多线程(即一个线程正在覆盖另一个线程的写入)。因此,我的回答是确保一切都是线程安全的。如果是这种情况,则没有风险。【参考方案2】:

为了帮助实施,如果有人提出这个问题,我分享在我的情况下已经解决问题的代码:

批处理配置、作业和步骤
@Configuration
@EnableBatchProcessing
public class BatchConfig extends DefaultBatchConfigurer

    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Autowired
    private StepSkipListener stepSkipListener;

    @Autowired
    private MainJobExecutionListener mainJobExecutionListener;

    @Bean
    public PlatformTransactionManager getTransactionManager()
    
        return new JpaTransactionManager();
    

    @Bean
    public TaskExecutor taskExecutor()
    
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        taskExecutor.setMaxPoolSize(10);
        taskExecutor.setThreadNamePrefix("batch-thread-");

        return taskExecutor;
    

    @Bean
    public JobLauncher jobLauncher() throws Exception
    
        SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
        jobLauncher.setJobRepository(getJobRepository());
        jobLauncher.setTaskExecutor(taskExecutor());
        jobLauncher.afterPropertiesSet();

        return jobLauncher;
    

    @Bean
    public Step mainStep(ReaderImpl reader, ProcessorImpl processor, WriterImpl writer)
    
        return stepBuilderFactory.get("step")
                .transactionManager(jpaTransactionManager)
                .<List<ExcelLoad>, Invoice>chunk(10)
                .reader(reader)
                .processor(processor)
                .writer(writer)
                .faultTolerant().skipPolicy(new ExceptionSkipPolicy())
                .listener(stepSkipListener)
                .build();
    

    @Bean
    public Job mainJob(Step mainStep)
    
        return jobBuilderFactory.get("mainJob")
                                .listener(mainJobExecutionListener)
                                .incrementer(new RunIdIncrementer())
                                .start(mainStep)
                                .build();
    

作家
@Component
public class WriterImpl implements ItemWriter<Invoice>

    @Autowired
    private InvoiceRepository invoiceRepository;

    @Override
    public void write(List<? extends Invoice> list)
    
        invoiceRepository.saveAll(list);
    

【讨论】:

以上是关于Spring Batch 处理记录,但不将它们插入数据库的主要内容,如果未能解决你的问题,请参考以下文章

AFTER DELETE 触发器触发但不将记录插入审计表

PHP:在文件中记录错误但不将它们显示给 POST/GET 请求

让 Maven2 将资源复制到构建目录,但不将它们捆绑在 JAR 中

Spring Batch - MongoItemReader 未读取所有记录

Spring Batch:ItemProcessor 不处理所有记录

Spring batch - 项目阅读器知道阅读器处理的最后一条记录