Spring批处理作业从多个来源读取

Posted

技术标签:

【中文标题】Spring批处理作业从多个来源读取【英文标题】:Spring batch Job read from multiple sources 【发布时间】:2014-02-13 18:37:44 【问题描述】:

如何从多个数据库中读取项目?我已经知道这是可能的文件。 以下示例适用于从多个文件中读取

...
<job id="readMultiFileJob" xmlns="http://www.springframework.org/schema/batch">
    <step id="step1">
    <tasklet>
        <chunk reader="multiResourceReader" writer="flatFileItemWriter"
            commit-interval="1" />
    </tasklet>
    </step>
</job>
...
<bean id="multiResourceReader"
    class=" org.springframework.batch.item.file.MultiResourceItemReader">
    <property name="resources" value="file:csv/inputs/domain-*.csv" />
    <property name="delegate" ref="flatFileItemReader" />
</bean>
...

三个这样的豆子。

<bean id="database2" class="org.springframework.batch.item.database.JdbcCursorItemReader">
    <property name="name" value="database2Reader" />
    <property name="dataSource" ref="dataSource2" />
    <property name="sql" value="select image from object where image like '%/images/%'" />
    <property name="rowMapper">
        <bean class="sym.batch.ImagesRowMapper2" />
    </property>
</bean>

【问题讨论】:

你能提供更多关于你的情况的信息吗?喜欢; - 您是否从相同的数据库类型(mysql、Oracle 或 DB2)中读取数据? - 你是否从不同的数据库实例中读取同一张表?如果答案是否定的,您如何将您的输入映射到同一个对象? 是不同表的 MySQL,我只想取一个字符串 【参考方案1】:

没有现成的组件可以满足您的要求;唯一的解决方案是编写一个自定义的ItemReader&lt;&gt; 委托给JdbcCursorItemReader(或HibernateCursorItemReader 或任何通用的ItemReader 实现)。 您需要准备所有必要的东西(数据源、会话、真正的数据库阅读器)并将所有委托阅读器绑定到您的自定义阅读器。

编辑: 您需要使用ItemReader.read() 的recusion 模拟一个循环,并在作业重新启动时维护阅读器和委托状态。

class MyItemReader<T> implements ItemReader<T>, ItemStream 
  private ItemReader[] delegates;
  private int delegateIndex;
  private ItemReader<T> currentDelegate;
  private ExecutionContext stepExecutionContext;

  public void setDelegates(ItemReader[] delegates) 
    this.delegates = delegates;
  

  @BeforeStep
  private void beforeStep(StepExecution stepExecution) 
    this.stepExecutionContext = stepExecution.getExecutionContext();
  

  public T read() 
    T item = null;
    if(null != currentDelegate) 
      item = currentDelegate.read();
      if(null == item) 
        ((ItemStream)this.currentDelegate).close();
        this.currentDelegate = null;
      
    
    // Move to next delegate if previous was exhausted!
    if(null == item && this.delegateIndex< this.delegates.length) 
      this.currentDelegate = this.delegates[this.currentIndex++];
      ((ItemStream)this.currentDelegate).open(this.stepExecutionContext);
      update(this.stepExecutionContext);
      // Recurse to read() to simulate loop through delegates
      item = read();
    
    return item;
  

  public void open(ExecutionContext ctx) 
    // During open restore last active reader and restore its state
    if(ctx.containsKey("index")) 
      this.delegateIndex = ctx.getInt("index");
      this.currentDelegate = this.delegates[this.delegateIndex];
      ((ItemStream)this.currentDelegate ).open(ctx);
    
  

  public void update(ExecutionContext ctx) 
    // Update current delegate index and state
    ctx.putInt("index", this.delegateIndex);
    if(null != this.currentDelegate) 
      ((ItemStream)this.currentDelegate).update(ctx);
    
  

  public void close(ExecutionContext ctx) 
    if(null != this.currentDelegate) 
      ((ItemStream)this.currentDelegate).close();
  


<bean id="myItemReader" class=path.to.MyItemReader>
  <property name="delegates">
    <array>
      <ref bean="itemReader1"/>
      <ref bean="itemReader2"/>
      <ref bean="itemReader3"/>
    </array>
  </property>
</bean>

EDIT2:记得设置属性name;这是让 MyItemReader.read() 正常工作的必要条件

<bean id="itemReader1" class="JdbcCursorItemReader">
  <property name="name" value="itemReader1" />
  <!-- Set other properties -->
</bean>

【讨论】:

【参考方案2】:

我建议一个简单的解决方法,它可能并不适合所有情况,但在许多情况下都会有用:

简单定义:

2 个阅读器,每个数据库一个 2 步 一个包含两个步骤的作业

这两个步骤几乎相同,它们引用相同的处理器和写入器,但它们具有不同的读取器。它们将被连续调用。

此设置是否有效取决于处理器和编写器(在不同步骤中调用时它们是否仍能正常工作)。就我而言,将appendAllowed=true 设置为编写器就足够了,这样两个步骤都可以写入同一个文件。

【讨论】:

【参考方案3】:

我建议一个棘手的方法。如果我们假设一个是您的 mysql 数据源的表是基础的,并且该表中的每一行都对应于其他 mysql 数据源表的行(如不同数据源中的连接表),您可以在批处理作业 itemreader 中执行此操作。这种方式的前;

Spring 数据源配置;

<bean id="mySqlDataSource1" class="org.apache.commons.dbcp.BasicDataSource">
    <property name="driverClassName" value="$database1.driverClassName"/>
    <property name="url" value="$database1.url"/>
    <property name="username" value="$database1.username"/>
    <property name="password" value="$database1.password"/>
    <property name="validationQuery" value="$database1.validationQuery"/>
</bean>

<bean id="mySqlDataSource2" class="org.apache.commons.dbcp.BasicDataSource">
    <property name="driverClassName" value="$database2.driverClassName"/>
    <property name="url" value="$database2.url"/>
    <property name="username" value="$database2.username"/>
    <property name="password" value="$database2.password"/>
    <property name="validationQuery" value="$database2.validationQuery"/>
</bean>

您的批处理作业.xml

<bean id="multiDatasorceReader" class="org.springframework.batch.item.database.JdbcCursorItemReader" scope="step">
    <property name="dataSource" ref="mySqlDataSource1" />
    <property name="rowMapper" ref="multiDatasourceRowMapper" />
    <property name="sql">
        <value>
            SELECT * FROM xyz
        </value>
    </property>
</bean>

<bean id="multiDatasourceRowMapper" class="yourpackage.MultiDatasourceRowMapper" scope="step">
    <property name="secondDataSource" ref="mySqlDataSource2" />
    <property name="secondSql">
        <value>
            SELECT * FROM abc
        </value>
    </property>
</bean>

你的 RowMapper 看起来像;

public class MultiDatasourceRowMapper implements RowMapper<String> 

    private DataSource secondDataSource;

    private String secondSql;

    public String mapRow(ResultSet rs, int arg1) throws SQLException 
        Connection conn = secondDataSource.getConnection();
        PreparedStatement prep = conn.prepareStatement(secondSql); 

        // Do Something

        return "";
    

    public void setSecondDataSource(DataSource secondDataSource) 
        this.secondDataSource = secondDataSource;
    

    public void setSecondSql(String secondSql) 
        this.secondSql = secondSql;
    


【讨论】:

以上是关于Spring批处理作业从多个来源读取的主要内容,如果未能解决你的问题,请参考以下文章

测试spring批处理作业stepScope

Spring Batch - 从 S3 读取多个文件

具有多个数据源的春季批处理junit。 spring data jpa无法将数据保存在内存数据库中

Spring Batch中如何读取多个CSV文件合并数据进行处理?

如果从计划的作业中调用,Spring Boot 存储库不会保存到数据库

Spring 批处理作业应仅在 Spring 集成文件轮询器轮询文件后执行一次