如何在 Spring Batch 中使用 Spring 事务支持

Posted

技术标签:

【中文标题】如何在 Spring Batch 中使用 Spring 事务支持【英文标题】:How to use spring transaction support with Spring Batch 【发布时间】:2019-10-18 17:44:43 【问题描述】:

我正在尝试使用 Spring Batch 从 .dat 文件中读取文件并将数据保存到数据库中。我的要求是要么插入所有数据,要么不将任何数据插入表中,即原子性。但是,使用弹簧批处理我无法达到相同的效果,它以块的形式读取数据并且只要记录正常就插入数据。如果在某些时候记录不合适并且引发了一些数据库异常,那么我想要完全回滚,而这并没有发生。假设我们在第 2051 条记录处出错,然后我的代码保存了 2050 条记录,但我想要完全回滚,如果所有数据都很好,那么应该保留所有 N 条记录。提前感谢您提供任何可以解决我的问题的帮助或相关方法...

注意:我已经在调用者方法上使用了 Spring Transactional 注释,但它不起作用,我正在读取 10 个项目的块大小的数据。

MyConfiguration.java

@Configuration
public class MyConfiguration 


    @Autowired
    JobBuilderFactory jobBuilderFactory;

    @Autowired
    StepBuilderFactory stepBuilderFactory;

    @Autowired
    @Qualifier("MyCompletionListener")
    JobCompletionNotificationListener jobCompletionNotificationListener;

    @StepScope
    @Bean(name="MyReader")
    public FlatFileItemReader<InputMapperDTO> reader(@Value("#jobParameters['fileName']") String fileName) throws IOException 
    
        FlatFileItemReader<InputMapperDTO> newBean = new FlatFileItemReader<>();
        newBean.setName("MyReader");
        newBean.setResource(new InputStreamResource(FileUtils.openInputStream(new File(fileName))));
        newBean.setLineMapper(lineMapper());
        newBean.setLinesToSkip(1);
        return newBean;
    

    @Bean(name="MyLineMapper")
    public DefaultLineMapper<InputMapperDTO> lineMapper() 
    
        DefaultLineMapper<InputMapperDTO> lineMapper = new DefaultLineMapper<>();
        lineMapper.setLineTokenizer(lineTokenizer());
        Reader reader = new Reader();
        lineMapper.setFieldSetMapper(reader);
        return lineMapper;
    

    @Bean(name="MyTokenizer")
    public DelimitedLineTokenizer lineTokenizer() 
    
        DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
        tokenizer.setDelimiter("|");
        tokenizer.setNames("InvestmentAccountUniqueIdentifier", "BaseCurrencyUniqueIdentifier",
                "OperatingCurrencyUniqueIdentifier", "PricingHierarchyUniqueIdentifier", "InvestmentAccountNumber",
                "DummyAccountIndicator", "InvestmentAdvisorCompanyNumberLegacy","HighNetWorthAccountTypeCode");
        tokenizer.setIncludedFields(0, 5, 7, 13, 29, 40, 49,75);
        return tokenizer;
    

    @Bean(name="MyBatchProcessor")
    public ItemProcessor<InputMapperDTO, FinalDTO> processor() 
    
        return new Processor();
    

    @Bean(name="MyWriter")
    public ItemWriter<FinalDTO> writer() 
    
        return new Writer();
    

    @Bean(name="MyStep")
    public Step step1() throws IOException 
    
        return stepBuilderFactory.get("MyStep")
                .<InputMapperDTO, FinalDTO>chunk(10)
                .reader(this.reader(null))
                .processor(this.processor())
                .writer(this.writer())
                .build();
    

    @Bean(name=MyJob")
    public Job importUserJob(@Autowired @Qualifier("MyStep") Step step1) 
    
        return jobBuilderFactory
                .get("MyJob"+new Date())
                .incrementer(new RunIdIncrementer())
                .listener(jobCompletionNotificationListener)
                .flow(step1)
                .end()
                .build();
    


Writer.java

public class Writer implements ItemWriter<FinalDTO>


    @Autowired
    SomeRepository someRepository;

    @Override
    public void write(List<? extends FinalDTO> listOfObjects) throws Exception 
    
        someRepository.saveAll(listOfObjects);      
    


JobCompletionNotificationListener.java

public class JobCompletionNotificationListener extends JobExecutionListenerSupport


    @Override
    public void afterJob(JobExecution jobExecution) 
    
        if(jobExecution.getStatus() == BatchStatus.COMPLETED) 
        
            System.err.println("****************************************");
            System.err.println("*****    Batch Job Completed      ******");
            System.err.println("****************************************");
        
        else
        
            System.err.println("****************************************");
            System.err.println("*****    Batch Job Failed      ******");
            System.err.println("****************************************");
        
    


MyCallerMethod

    @Transactional
    public String processFile(String datFile) throws JobExecutionAlreadyRunningException, JobRestartException,
            JobInstanceAlreadyCompleteException, JobParametersInvalidException 
    
        long st = System.currentTimeMillis();

        JobParametersBuilder builder = new JobParametersBuilder();
        builder.addString("fileName",datFile);
        builder.addDate("date", new Date());
        jobLauncher.run(job, builder.toJobParameters());

        System.err.println("****************************************");
        System.err.println("*****    Total time consumed = "+(System.currentTimeMillis()-st)+"      ******");
        System.err.println("****************************************");
        return response;
    

【问题讨论】:

可能重复:***.com/questions/29303155/… 非常感谢,给了我一些基本的调试思路。 【参考方案1】:

我试过的操作没有批量提供。根据我的要求,我已经实现了自定义删除,它会在任何步骤失败时刷新数据库。

【讨论】:

以上是关于如何在 Spring Batch 中使用 Spring 事务支持的主要内容,如果未能解决你的问题,请参考以下文章

如何在 Spring Batch 中使用 Spring 事务支持

如何开始使用 Spring Batch? [关闭]

如何在 Spring Batch 中使用 Resource ItemReader

Spring Batch 无法获取 last_insert_id();嵌套异常是 java.sql.SQLException: Lock wait timeout exceeded;

如何使用 CommandLineJobRunner 调用嵌入在 Spring Boot 应用程序中的 Spring Batch 作业

如何评估在应用程序中是使用spring batch还是scheduler?