Spring批处理集成
Posted
技术标签:
【中文标题】Spring批处理集成【英文标题】:Spring batch integration 【发布时间】:2014-05-18 15:14:59 【问题描述】:我正在寻找有关 Spring 批处理集成的指导/解决方案。我有一个外部应用程序将向其发送 xml 文件的目录。我的应用程序应该读取文件内容并将文件移动到另一个目录。
应用程序应该能够并行处理文件。
提前致谢。
【问题讨论】:
这里...在这个链接中你可能会发现一些有用的东西***.com/questions/19347084/… 【参考方案1】:你可以使用 Spring Integration ftp / sftp 结合 Spring Batch:
1.Spring集成Ftp配置:
<bean id="ftpClientFactory"
class="org.springframework.integration.ftp.session.DefaultFtpSessionFactory">
<property name="host" value="$host.name" />
<property name="port" value="$host.port" />
<property name="username" value="$host.username" />
<property name="password" value="$host.password" />
<property name="bufferSize" value="100000"/>
</bean>
<int:channel id="ftpChannel" />
<int-ftp:outbound-channel-adapter id="ftpOutbound"
channel="ftpChannel" remote-directory="/yourremotedirectory/" session-factory="ftpClientFactory" use-temporary-file-name="false" />
2.创建您的阅读器并自动连接服务以在需要时提供您的项目:
@Scope("step")
public class MajorItemReader implements InitializingBean
private List<YourItem> yourItems= null;
@Autowired
private MyService provider;
public YourItem read()
if ((yourItems!= null) && (yourItems.size() != 0))
return yourItems.remove(0);
return null;
//Reading Items from Service
private void reloadItems()
this.yourItems= new ArrayList<YourItem>();
// use the service to provide your Items
if (yourItems.isEmpty())
yourItems= null;
public MyService getProvider()
return provider;
public void setProvider(MyService provider)
this.provider = provider;
@Override
public void afterPropertiesSet() throws Exception
reloadItems();
3。创建您自己的项目处理器
public class MyProcessor implements
ItemProcessor<YourItem, YourItem>
@Override
public YourItem process(YourItem arg0) throws Exception
// Apply any logic to your Item before transferring it to the writer
return arg0;
4。创建自己的作家:
public class MyWriter
@Autowired
@Qualifier("ftpChannel")
private MessageChannel messageChannel;
public void write(YourItem pack) throws IOException
//create your file and from your Item
File file = new File("the_created_file");
// Sending the file via Spring Integration Ftp Channel
Message<File> message = MessageBuilder.withPayload(file).build();
messageChannel.send(message);
5.批量配置:
<bean id="dataSourcee"
class="org.springframework.jdbc.datasource.DriverManagerDataSource">
<property name="driverClassName" value="" />
<property name="url" value="" />
<property name="username" value="" />
<property name="password" value="" />
</bean>
<bean id="jobRepository"
class="org.springframework.batch.core.repository.support.JobRepositoryFactoryBean">
<property name="dataSource" ref="dataSourcee" />
<property name="transactionManager" ref="transactionManagerrr" />
<property name="databaseType" value="" />
</bean>
<bean id="transactionManagerrr"
class="org.springframework.batch.support.transaction.ResourcelessTransactionManager" />
<bean id="jobLauncher"
class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
<property name="jobRepository" ref="jobRepository" />
</bean>
6.另一个 ApplicationContext 文件来配置你的工作:
<context:annotation-config />
<bean id="provider" class="mypackage.MyService" />
<context:component-scan base-package="mypackage" />
<bean id="myReader" class="mypackage.MyReader"
<property name="provider" ref="provider" />
</bean>
<bean id="myWriter" class="mypackage.MyWriter" />
<bean id="myProcessor" class="mypackage.MyProcessor" />
<bean id="mReader"
class="org.springframework.batch.item.adapter.ItemReaderAdapter">
<property name="targetObject" ref="myReader" />
<property name="targetMethod" value="read" />
</bean>
<bean id="mProcessor"
class="org.springframework.batch.item.adapter.ItemProcessorAdapter">
<property name="targetObject" ref="myProcessor" />
<property name="targetMethod" value="process" />
</bean>
<bean id="mWriter"
class="org.springframework.batch.item.adapter.ItemWriterAdapter">
<property name="targetObject" ref="myWriter" />
<property name="targetMethod" value="write" />
</bean>
<batch:job id="myJob">
<batch:step id="step01">
<batch:tasklet>
<batch:chunk reader="mReader" writer="mWriter"
processor="mProcessor" commit-interval="1">
</batch:chunk>
</batch:tasklet>
</batch:step>
</batch:job>
<bean id="myRunScheduler" class="mypackage.MyJobLauncher" />
<task:scheduled-tasks>
<task:scheduled ref="myJobLauncher" method="run"
cron="0 0/5 * * * ?" />
<!-- this will maker the job runs every 5 minutes -->
</task:scheduled-tasks>
7.最后配置一个启动器来启动你的工作:
public class MyJobLauncher
@Autowired
private JobLauncher jobLauncher;
@Autowired
@Qualifier("myJob")
private Job job;
public void run()
try
String dateParam = new Date().toString();
JobParameters param = new JobParametersBuilder().addString("date",
dateParam).toJobParameters();
JobExecution execution = jobLauncher.run(job, param);
execution.stop();
catch (Exception e)
e.printStackTrace();
【讨论】:
感谢 Rafik,提供详细信息。以上是关于Spring批处理集成的主要内容,如果未能解决你的问题,请参考以下文章