Spring批处理JdbcPagingItemReader无法读取所有事件
Posted
技术标签:
【中文标题】Spring批处理JdbcPagingItemReader无法读取所有事件【英文标题】:Spring batch JdbcPagingItemReader not able to read all events 【发布时间】:2019-05-31 09:26:35 【问题描述】:我有如下弹簧批处理应用程序(表名和查询被编辑为一些通用名称)
当我执行这个程序时,它能够读取 7500 个事件,即 3 倍的块大小,并且无法读取 oracle 数据库中的剩余记录。我有一个包含 5000 万条记录的表,并且能够复制到另一个 noSql 数据库。
@EnableBatchProcessing
@SpringBootApplication
@EnableAutoConfiguration
public class MultiThreadPagingApp extends DefaultBatchConfigurer
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
public DataSource dataSource;
@Bean
public DataSource dataSource()
final DriverManagerDataSource dataSource = new DriverManagerDataSource();
dataSource.setDriverClassName("oracle.jdbc.OracleDriver");
dataSource.setUrl("jdbc:oracle:thin:@***********");
dataSource.setUsername("user");
dataSource.setPassword("password");
return dataSource;
@Override
public void setDataSource(DataSource dataSource)
@Bean
@StepScope
ItemReader<UserModel> dbReader() throws Exception
JdbcPagingItemReader<UserModel> reader = new JdbcPagingItemReader<UserModel>();
final SqlPagingQueryProviderFactoryBean sqlPagingQueryProviderFactoryBean = new SqlPagingQueryProviderFactoryBean();
sqlPagingQueryProviderFactoryBean.setDataSource(dataSource);
sqlPagingQueryProviderFactoryBean.setSelectClause("select * ");
sqlPagingQueryProviderFactoryBean.setFromClause("from user");
sqlPagingQueryProviderFactoryBean.setWhereClause("where id>0");
sqlPagingQueryProviderFactoryBean.setSortKey("name");
reader.setQueryProvider(sqlPagingQueryProviderFactoryBean.getObject());
reader.setDataSource(dataSource);
reader.setPageSize(2500);
reader.setRowMapper(new BeanPropertyRowMapper<>(UserModel.class));
reader.afterPropertiesSet();
reader.setSaveState(true);
System.out.println("Reading users anonymized in chunks of "+ 2500);
return reader;
@Bean
public Dbwriter writer()
return new Dbwriter(); // I had another class for this
@Bean
public Step step1() throws Exception
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(4);
taskExecutor.setMaxPoolSize(10);
taskExecutor.afterPropertiesSet();
return this.stepBuilderFactory.get("step1")
.<UserModel, UserModel>chunk(2500)
.reader(dbReader())
.writer(writer())
.taskExecutor(taskExecutor)
.build();
@Bean
public Job multithreadedJob() throws Exception
return this.jobBuilderFactory.get("multithreadedJob")
.start(step1())
.build();
@Bean
public PlatformTransactionManager getTransactionManager()
return new ResourcelessTransactionManager();
@Bean
public JobRepository getJobRepo() throws Exception
return new MapJobRepositoryFactoryBean(getTransactionManager()).getObject();
public static void main(String[] args)
SpringApplication.run(MultiThreadPagingApp.class, args);
您能帮我如何使用 Spring Batch 有效地读取所有记录,或者帮助我任何其他方法来处理这个问题。我尝试过这里提到的一种方法:http://techdive.in/java/jdbc-handling-huge-resultset 使用单线程应用程序读取和保存所有记录需要 120 分钟。由于春季批次最适合这种情况,我认为我们可以快速处理这种情况。
【问题讨论】:
为什么没有完成工作?是否抛出异常,我们需要更多信息 它不会抛出任何异常。它刚刚完成了我阅读三倍块大小的工作并停止了。这是日志:作业:[SimpleJob:[name=multithreadedJob]] 使用以下参数启动:[] INFO 15008 --- [main] osbatch.core.job.SimpleStepHandler :执行步骤:[step1] INFO 15008 --- [main] osbclsupport.SimpleJobLauncher : Job: [SimpleJob: [name=multithreadedJob]] 使用以下参数完成:[] 和以下状态:[COMPLETED] 【参考方案1】:您正在将JdbcPagingItemReader
上的saveState
标志设置为true(顺便说一句,它应该在调用afterPropertiesSet
之前设置)并在多线程步骤中使用此阅读器。但是,it is documented 在多线程上下文中将此标志设置为 false。
带有数据库读取器的多线程通常不是最佳选择,我建议在您的情况下使用分区。
【讨论】:
我已将 saveState 标志设置为 false 并在调用 afterPropertiesSet 之前移动了该标志。我仍然有同样的问题,它的读数只有块大小的三倍。并且,表中的数据无法根据某些条件分离数据,该表中也没有主键。如何使用分区方法从数据库中读取这些数据? 在 where 子句中有where id>0
,因此您可以创建一个基于此 ID 对数据进行分区的分区器。以ColumnRangePartitioner 为例。
我试过这样,不幸的是在我们的数据库中,我们没有定义这样的主键/唯一键。这个 ID 是从 1 到 10 的一些不同的数字。所有具有此编号的记录。就像每 1 到 4000 万条具有相同 ID 的记录一样。其中一些计数较少,其中一些记录更多。我没有该表的任何特定唯一列。不完全确定如何读取具有这种数据的完整表。除了 ID 值之外,所有其他列都可以接受空值。并且此 ID 值存在,但对于每一行来说不是唯一的。请帮帮我。
@MahmoudBenHassine - 即使我们使用Partitions
和JdbcPagingItemReader
给我的记录少于预期,我也会丢失近20 万条记录。以上是关于Spring批处理JdbcPagingItemReader无法读取所有事件的主要内容,如果未能解决你的问题,请参考以下文章
如何使用 Spring Cloud Task 动态部署独立 Spring 批处理
使用 RepositoryItemReader 和 Repository Item writer 的 spring 数据 JPA 的 spring 批处理