Spring批处理集成

Posted

技术标签:

【中文标题】Spring批处理集成【英文标题】:Spring batch integration 【发布时间】:2014-05-18 15:14:59 【问题描述】:

我正在寻找有关 Spring 批处理集成的指导/解决方案。我有一个外部应用程序将向其发送 xml 文件的目录。我的应用程序应该读取文件内容并将文件移动到另一个目录。

应用程序应该能够并行处理文件。

提前致谢。

【问题讨论】:

这里...在这个链接中你可能会发现一些有用的东西***.com/questions/19347084/… 【参考方案1】:

你可以使用 Spring Integration ftp / sftp 结合 Spring Batch:

1.Spring集成Ftp配置:

<bean id="ftpClientFactory"
    class="org.springframework.integration.ftp.session.DefaultFtpSessionFactory">
    <property name="host" value="$host.name" />
    <property name="port" value="$host.port" />
    <property name="username" value="$host.username" />
    <property name="password" value="$host.password" />
    <property name="bufferSize" value="100000"/>
</bean>
<int:channel id="ftpChannel" />
<int-ftp:outbound-channel-adapter id="ftpOutbound"
    channel="ftpChannel" remote-directory="/yourremotedirectory/" session-factory="ftpClientFactory" use-temporary-file-name="false" />

2.创建您的阅读器并自动连接服务以在需要时提供您的项目:

 @Scope("step")
 public class MajorItemReader implements InitializingBean

        private List<YourItem> yourItems= null;

        @Autowired
        private MyService provider;


        public YourItem read() 
            if ((yourItems!= null) && (yourItems.size() != 0)) 
                return yourItems.remove(0);
            
            return null;
        

        //Reading Items from Service
        private void reloadItems() 

        this.yourItems= new ArrayList<YourItem>();
        // use the service to provide your Items
       if (yourItems.isEmpty()) 
                yourItems= null;
            
        
        public MyService getProvider() 
            return provider;
        
        public void setProvider(MyService provider) 
            this.provider = provider;
        
        @Override
        public void afterPropertiesSet() throws Exception 
            reloadItems();
        

3。创建您自己的项目处理器

     public class MyProcessor implements
    ItemProcessor<YourItem, YourItem> 
    @Override
    public YourItem process(YourItem arg0) throws Exception 
    // Apply any logic to your Item before transferring it to the writer
    return arg0;
    
    

4。创建自己的作家:

   public class MyWriter
   @Autowired
   @Qualifier("ftpChannel")
   private MessageChannel messageChannel;
   public void write(YourItem pack) throws IOException 
   //create your file and from your Item 
   File file = new File("the_created_file");
   // Sending the file via Spring Integration Ftp Channel
   Message<File> message = MessageBuilder.withPayload(file).build();
   messageChannel.send(message);
   

5.批量配置:

<bean id="dataSourcee"
    class="org.springframework.jdbc.datasource.DriverManagerDataSource">
    <property name="driverClassName" value="" />
    <property name="url" value="" />
    <property name="username" value="" />
    <property name="password" value="" />
</bean>
<bean id="jobRepository"
    class="org.springframework.batch.core.repository.support.JobRepositoryFactoryBean">
    <property name="dataSource" ref="dataSourcee" />
    <property name="transactionManager" ref="transactionManagerrr" />
    <property name="databaseType" value="" />
</bean>
<bean id="transactionManagerrr"
    class="org.springframework.batch.support.transaction.ResourcelessTransactionManager" />
<bean id="jobLauncher"
    class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
    <property name="jobRepository" ref="jobRepository" />
</bean>

6.另一个 ApplicationContext 文件来配置你的工作:

<context:annotation-config />
<bean id="provider" class="mypackage.MyService" />
<context:component-scan base-package="mypackage" />
<bean id="myReader" class="mypackage.MyReader"
    <property name="provider" ref="provider" />
 </bean>
<bean id="myWriter" class="mypackage.MyWriter" />
<bean id="myProcessor" class="mypackage.MyProcessor" />
   <bean id="mReader"
    class="org.springframework.batch.item.adapter.ItemReaderAdapter">
    <property name="targetObject" ref="myReader" />
    <property name="targetMethod" value="read" />
</bean>
<bean id="mProcessor"
    class="org.springframework.batch.item.adapter.ItemProcessorAdapter">
    <property name="targetObject" ref="myProcessor" />
    <property name="targetMethod" value="process" />
</bean>
<bean id="mWriter"
    class="org.springframework.batch.item.adapter.ItemWriterAdapter">
    <property name="targetObject" ref="myWriter" />
    <property name="targetMethod" value="write" />
</bean>
<batch:job id="myJob">
    <batch:step id="step01">
        <batch:tasklet>
            <batch:chunk reader="mReader" writer="mWriter"
                processor="mProcessor" commit-interval="1">
            </batch:chunk>
        </batch:tasklet>
    </batch:step>
</batch:job>
<bean id="myRunScheduler" class="mypackage.MyJobLauncher" />
<task:scheduled-tasks>
    <task:scheduled ref="myJobLauncher" method="run"
        cron="0 0/5 * * * ?" />
    <!-- this will maker the job runs every 5 minutes -->
</task:scheduled-tasks>

7.最后配置一个启动器来启动你的工作:

public class MyJobLauncher 
@Autowired
private JobLauncher jobLauncher;
@Autowired
@Qualifier("myJob")
private Job job;
public void run() 
    try 
        String dateParam = new Date().toString();
        JobParameters param = new JobParametersBuilder().addString("date",
                dateParam).toJobParameters();
        JobExecution execution = jobLauncher.run(job, param);
        execution.stop();
     catch (Exception e) 
        e.printStackTrace();
    

【讨论】:

感谢 Rafik,提供详细信息。

以上是关于Spring批处理集成的主要内容,如果未能解决你的问题,请参考以下文章

Spring批处理和集成

当所有其他消息完成处理时,Spring集成处理消息

处理后的Spring集成移动文件

Spring 批处理作业应仅在 Spring 集成文件轮询器轮询文件后执行一次

Spring集成消息处理链的用法?

Spring集成流程中的错误处理实践