如何在 Spring Batch 中使用 Spring 事务支持
Posted
技术标签:
【中文标题】如何在 Spring Batch 中使用 Spring 事务支持【英文标题】:How to use spring transaction support with Spring Batch 【发布时间】:2019-10-18 17:44:43 【问题描述】:我正在尝试使用 Spring Batch 从 .dat 文件中读取文件并将数据保存到数据库中。我的要求是要么插入所有数据,要么不将任何数据插入表中,即原子性。但是,使用弹簧批处理我无法达到相同的效果,它以块的形式读取数据并且只要记录正常就插入数据。如果在某些时候记录不合适并且引发了一些数据库异常,那么我想要完全回滚,而这并没有发生。假设我们在第 2051 条记录处出错,然后我的代码保存了 2050 条记录,但我想要完全回滚,如果所有数据都很好,那么应该保留所有 N 条记录。提前感谢您提供任何可以解决我的问题的帮助或相关方法...
注意:我已经在调用者方法上使用了 Spring Transactional 注释,但它不起作用,我正在读取 10 个项目的块大小的数据。
MyConfiguration.java
@Configuration
public class MyConfiguration
@Autowired
JobBuilderFactory jobBuilderFactory;
@Autowired
StepBuilderFactory stepBuilderFactory;
@Autowired
@Qualifier("MyCompletionListener")
JobCompletionNotificationListener jobCompletionNotificationListener;
@StepScope
@Bean(name="MyReader")
public FlatFileItemReader<InputMapperDTO> reader(@Value("#jobParameters['fileName']") String fileName) throws IOException
FlatFileItemReader<InputMapperDTO> newBean = new FlatFileItemReader<>();
newBean.setName("MyReader");
newBean.setResource(new InputStreamResource(FileUtils.openInputStream(new File(fileName))));
newBean.setLineMapper(lineMapper());
newBean.setLinesToSkip(1);
return newBean;
@Bean(name="MyLineMapper")
public DefaultLineMapper<InputMapperDTO> lineMapper()
DefaultLineMapper<InputMapperDTO> lineMapper = new DefaultLineMapper<>();
lineMapper.setLineTokenizer(lineTokenizer());
Reader reader = new Reader();
lineMapper.setFieldSetMapper(reader);
return lineMapper;
@Bean(name="MyTokenizer")
public DelimitedLineTokenizer lineTokenizer()
DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
tokenizer.setDelimiter("|");
tokenizer.setNames("InvestmentAccountUniqueIdentifier", "BaseCurrencyUniqueIdentifier",
"OperatingCurrencyUniqueIdentifier", "PricingHierarchyUniqueIdentifier", "InvestmentAccountNumber",
"DummyAccountIndicator", "InvestmentAdvisorCompanyNumberLegacy","HighNetWorthAccountTypeCode");
tokenizer.setIncludedFields(0, 5, 7, 13, 29, 40, 49,75);
return tokenizer;
@Bean(name="MyBatchProcessor")
public ItemProcessor<InputMapperDTO, FinalDTO> processor()
return new Processor();
@Bean(name="MyWriter")
public ItemWriter<FinalDTO> writer()
return new Writer();
@Bean(name="MyStep")
public Step step1() throws IOException
return stepBuilderFactory.get("MyStep")
.<InputMapperDTO, FinalDTO>chunk(10)
.reader(this.reader(null))
.processor(this.processor())
.writer(this.writer())
.build();
@Bean(name=MyJob")
public Job importUserJob(@Autowired @Qualifier("MyStep") Step step1)
return jobBuilderFactory
.get("MyJob"+new Date())
.incrementer(new RunIdIncrementer())
.listener(jobCompletionNotificationListener)
.flow(step1)
.end()
.build();
Writer.java
public class Writer implements ItemWriter<FinalDTO>
@Autowired
SomeRepository someRepository;
@Override
public void write(List<? extends FinalDTO> listOfObjects) throws Exception
someRepository.saveAll(listOfObjects);
JobCompletionNotificationListener.java
public class JobCompletionNotificationListener extends JobExecutionListenerSupport
@Override
public void afterJob(JobExecution jobExecution)
if(jobExecution.getStatus() == BatchStatus.COMPLETED)
System.err.println("****************************************");
System.err.println("***** Batch Job Completed ******");
System.err.println("****************************************");
else
System.err.println("****************************************");
System.err.println("***** Batch Job Failed ******");
System.err.println("****************************************");
MyCallerMethod
@Transactional
public String processFile(String datFile) throws JobExecutionAlreadyRunningException, JobRestartException,
JobInstanceAlreadyCompleteException, JobParametersInvalidException
long st = System.currentTimeMillis();
JobParametersBuilder builder = new JobParametersBuilder();
builder.addString("fileName",datFile);
builder.addDate("date", new Date());
jobLauncher.run(job, builder.toJobParameters());
System.err.println("****************************************");
System.err.println("***** Total time consumed = "+(System.currentTimeMillis()-st)+" ******");
System.err.println("****************************************");
return response;
【问题讨论】:
可能重复:***.com/questions/29303155/… 非常感谢,给了我一些基本的调试思路。 【参考方案1】:我试过的操作没有批量提供。根据我的要求,我已经实现了自定义删除,它会在任何步骤失败时刷新数据库。
【讨论】:
以上是关于如何在 Spring Batch 中使用 Spring 事务支持的主要内容,如果未能解决你的问题,请参考以下文章
如何在 Spring Batch 中使用 Spring 事务支持
如何在 Spring Batch 中使用 Resource ItemReader
Spring Batch 无法获取 last_insert_id();嵌套异常是 java.sql.SQLException: Lock wait timeout exceeded;
如何使用 CommandLineJobRunner 调用嵌入在 Spring Boot 应用程序中的 Spring Batch 作业