Spring 批处理作业应仅在 Spring 集成文件轮询器轮询文件后执行一次
Posted
技术标签:
【中文标题】Spring 批处理作业应仅在 Spring 集成文件轮询器轮询文件后执行一次【英文标题】:Spring batch job should be executed only once after the spring integration file poller polls the files 【发布时间】:2019-11-24 06:50:51 【问题描述】:我正在尝试从可能出现一个或多个文件的系统文件夹中轮询文件,对于这些文件,我必须只触发一次批处理作业,而不是次数等于文件夹中的文件数。在我的情况下,我的批处理一次处理多个文件,我只希望轮询器将信号发送到批处理一次以开始其工作。
尝试 poller.maxMessagesPerPoll(1) 等,但它有所不同。我面临的问题是,批处理作业被触发等于轮询文件夹中的文件数。我只想执行一次批处理
@Bean
public FileMessageToJobRequest fileMessageToJobRequest()
FileMessageToJobRequest fileMessageToJobRequest = new FileMessageToJobRequest();
fileMessageToJobRequest.setJob(fileMessageBatchJob);
return fileMessageToJobRequest;
@Bean
public JobLaunchingGateway jobLaunchingGateway()
SimpleJobLauncher simpleJobLauncher = new SimpleJobLauncher();
simpleJobLauncher.setJobRepository(jobRepository);
simpleJobLauncher.setTaskExecutor(new SyncTaskExecutor());
JobLaunchingGateway jobLaunchingGateway = new JobLaunchingGateway(simpleJobLauncher);
return jobLaunchingGateway;
@Bean
public IntegrationFlow integrationFlow(JobLaunchingGateway jobLaunchingGateway)
System.out.println("&&&&&&&&&&&&&&&&&&Inside Integration Flow!!!!");
return IntegrationFlows
.from(Files.inboundAdapter(new File("C:\\apps_data\\recv")),
c -> c.poller(Pollers.fixedDelay(1000).maxMessagesPerPoll(1)))
.filter(onlyT4F2())
.handle(fileMessageToJobRequest)
.handle(jobLaunchingGateway)
.log(LoggingHandler.Level.WARN, "headers.id + ': ' + payload").get();
@Bean
public GenericSelector<File> onlyT4F2()
System.out.println("@@@@@@@Inside GenericSelector of XXX");
return new GenericSelector<File>()
@Override
public boolean accept(File source)
return source.getName().contains("XXX");
;
当前行为 - 当轮询器在给定位置检测到文件时,配置的批处理作业会触发多次。如果文件为 4,则批处理作业触发 4 次。
预期行为 - 文件轮询后,批处理作业应该只对任意数量的文件执行一次。因为批处理作业一次处理多个文件,所以不需要多次执行。
如果您需要我提供的任何其他信息,请告诉我。请优先提供帮助
【问题讨论】:
Spring 批处理作业从轮询位置显式读取所有文件,所以这里我只是使用轮询器来观察文件的特定格式是否即将到来然后触发批处理作业.. 【参考方案1】:您可以在只返回一个文件的入站通道适配器上使用自定义FileListFilter
。
.filter(myFilterThatOnlyReturnsOneFile)
编辑
public class OnlyOneFileListFilter implements FileListFilter<File>
@Override
public List<File> filterFiles(File[] files)
return Collections.singletonList(files[0]);
return IntegrationFlows
.from(Files.inboundAdapter(new File("C:\\apps_data\\recv"))
.filter(new OnlyOneFileListFilter()),
c -> c.poller(Pollers.fixedDelay(1000).maxMessagesPerPoll(1)))
...
【讨论】:
能否请您给我代码 sn-p.. 我理解您的逻辑,但对如何适应集成流程感到困惑.. Russel - 感谢您的解决方案,我尝试了但低于异常,并且它看起来过滤器方法接受布尔值,请您帮忙引起:java.lang.IllegalArgumentException:需要布尔结果:类java.util.Collections$SingletonList 在 org.springframework.util.Assert.assignableCheckFailed(Assert.java:655) 在 org.springframework.util.Assert.isAssignable(Assert.java:586) 在 org.springframework.integration.filter。 MessageFilter.doHandleRequestMessage(MessageFilter.java:165) 您已将.filter()
应用于主要流程,而不是像我在回答中显示的那样应用于适配器。这不是像 onlyT4F2
这样的消息过滤器,它是通道适配器的属性。【参考方案2】:
@Gary Russell - 问题已解决,仅使用 GenericSelector,如下所示。谢谢你的帮助。在第一次运行时触发批处理作业后,它会处理所有当前文件并将其移动到其他文件夹,因此我添加了 file.exists() 并且它按照我的期望运行良好。但我观察到,在 1 小时后或有时即使在提供预期文件后轮询也没有发生,需要您的帮助/建议。
@Bean
public GenericSelector<File> triggerJobOnlyOnce()
return new GenericSelector<File>()
@Override
public boolean accept(File source)
if(source.getName().contains("XXX") && source.exists())
return true;
return flag;
;
【讨论】:
以上是关于Spring 批处理作业应仅在 Spring 集成文件轮询器轮询文件后执行一次的主要内容,如果未能解决你的问题,请参考以下文章
Spring中的@SendToUser是不是仅在集成了Spring Security时才有效?
带有集成测试的 Spring Boot 应用程序的 Jenkins 作业