Spring批处理JdbcPagingItemReader无法读取所有事件

Posted

技术标签:

【中文标题】Spring批处理JdbcPagingItemReader无法读取所有事件【英文标题】:Spring batch JdbcPagingItemReader not able to read all events 【发布时间】:2019-05-31 09:26:35 【问题描述】:

我有如下弹簧批处理应用程序(表名和查询被编辑为一些通用名称)

当我执行这个程序时,它能够读取 7500 个事件,即 3 倍的块大小,并且无法读取 oracle 数据库中的剩余记录。我有一个包含 5000 万条记录的表,并且能够复制到另一个 noSql 数据库。

@EnableBatchProcessing
@SpringBootApplication
@EnableAutoConfiguration
public class MultiThreadPagingApp extends DefaultBatchConfigurer

@Autowired
private JobBuilderFactory jobBuilderFactory;

@Autowired
private StepBuilderFactory stepBuilderFactory;

@Autowired
public DataSource dataSource;

@Bean
public DataSource dataSource() 
    final DriverManagerDataSource dataSource = new DriverManagerDataSource();
    dataSource.setDriverClassName("oracle.jdbc.OracleDriver");
    dataSource.setUrl("jdbc:oracle:thin:@***********");
    dataSource.setUsername("user");
    dataSource.setPassword("password");

    return dataSource;



@Override
public void setDataSource(DataSource dataSource) 

@Bean
@StepScope
ItemReader<UserModel> dbReader() throws Exception 

    JdbcPagingItemReader<UserModel> reader = new JdbcPagingItemReader<UserModel>();
    final SqlPagingQueryProviderFactoryBean sqlPagingQueryProviderFactoryBean = new SqlPagingQueryProviderFactoryBean();        
    sqlPagingQueryProviderFactoryBean.setDataSource(dataSource);
    sqlPagingQueryProviderFactoryBean.setSelectClause("select * ");
    sqlPagingQueryProviderFactoryBean.setFromClause("from user");
    sqlPagingQueryProviderFactoryBean.setWhereClause("where id>0");
    sqlPagingQueryProviderFactoryBean.setSortKey("name");
    reader.setQueryProvider(sqlPagingQueryProviderFactoryBean.getObject());
    reader.setDataSource(dataSource);
    reader.setPageSize(2500);       
    reader.setRowMapper(new BeanPropertyRowMapper<>(UserModel.class));
    reader.afterPropertiesSet();
    reader.setSaveState(true);
    System.out.println("Reading users anonymized in chunks of "+ 2500);
    return reader;



@Bean
public Dbwriter writer() 
    return new Dbwriter(); // I had another class for this
   

@Bean
public Step step1() throws Exception 
    ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
    taskExecutor.setCorePoolSize(4);
    taskExecutor.setMaxPoolSize(10);
    taskExecutor.afterPropertiesSet();

    return this.stepBuilderFactory.get("step1")
            .<UserModel, UserModel>chunk(2500)
            .reader(dbReader())
            .writer(writer())
            .taskExecutor(taskExecutor)
            .build();



@Bean
public Job multithreadedJob() throws Exception 
    return this.jobBuilderFactory.get("multithreadedJob")
            .start(step1())
            .build();
 


@Bean
public PlatformTransactionManager getTransactionManager() 
    return new ResourcelessTransactionManager();


@Bean
public JobRepository getJobRepo() throws Exception 
    return new MapJobRepositoryFactoryBean(getTransactionManager()).getObject();


public static void main(String[] args) 
    SpringApplication.run(MultiThreadPagingApp.class, args);

您能帮我如何使用 Spring Batch 有效地读取所有记录,或者帮助我任何其他方法来处理这个问题。我尝试过这里提到的一种方法:http://techdive.in/java/jdbc-handling-huge-resultset 使用单线程应用程序读取和保存所有记录需要 120 分钟。由于春季批次最适合这种情况,我认为我们可以快速处理这种情况。

【问题讨论】:

为什么没有完成工作?是否抛出异常,我们需要更多信息 它不会抛出任何异常。它刚刚完成了我阅读三倍块大小的工作并停止了。这是日志:作业:[SimpleJob:[name=multithreadedJob]] 使用以下参数启动:[] INFO 15008 --- [main] osbatch.core.job.SimpleStepHandler :执行步骤:[step1] INFO 15008 --- [main] osbclsupport.SimpleJobLauncher : Job: [SimpleJob: [name=multithreadedJob]] 使用以下参数完成:[] 和以下状态:[COMPLETED] 【参考方案1】:

您正在将JdbcPagingItemReader 上的saveState 标志设置为true(顺便说一句,它应该在调用afterPropertiesSet 之前设置)并在多线程步骤中使用此阅读器。但是,it is documented 在多线程上下文中将此标志设置为 false。

带有数据库读取器的多线程通常不是最佳选择,我建议在您的情况下使用分区。

【讨论】:

我已将 saveState 标志设置为 false 并在调用 afterPropertiesSet 之前移动了该标志。我仍然有同样的问题,它的读数只有块大小的三倍。并且,表中的数据无法根据某些条件分离数据,该表中也没有主键。如何使用分区方法从数据库中读取这些数据? 在 where 子句中有where id&gt;0,因此您可以创建一个基于此 ID 对数据进行分区的分区器。以ColumnRangePartitioner 为例。 我试过这样,不幸的是在我们的数据库中,我们没有定义这样的主键/唯一键。这个 ID 是从 1 到 10 的一些不同的数字。所有具有此编号的记录。就像每 1 到 4000 万条具有相同 ID 的记录一样。其中一些计数较少,其中一些记录更多。我没有该表的任何特定唯一列。不完全确定如何读取具有这种数据的完整表。除了 ID 值之外,所有其他列都可以接受空值。并且此 ID 值存在,但对于每一行来说不是唯一的。请帮帮我。 @MahmoudBenHassine - 即使我们使用PartitionsJdbcPagingItemReader 给我的记录少于预期,我也会丢失近20 万条记录。

以上是关于Spring批处理JdbcPagingItemReader无法读取所有事件的主要内容,如果未能解决你的问题,请参考以下文章

如何使用 Spring Cloud Task 动态部署独立 Spring 批处理

使用 RepositoryItemReader 和 Repository Item writer 的 spring 数据 JPA 的 spring 批处理

spring batch(批处理)

spring的后置处理器

Spring Batch之批处理实践

spring的bean工厂后置处理器