将文件上传为带有集成的春季批处理中的并行过程

Posted

技术标签:

【中文标题】将文件上传为带有集成的春季批处理中的并行过程【英文标题】:File upload as parallel process in spring batch with integration 【发布时间】:2018-09-25 06:03:00 【问题描述】:

我正在尝试使用带有集成的 Spring 批处理将多个文件上传到 SFTP 服务器。使用带有 threadPoolExecutorService 的 Future 并行上传多个文件。但我想与 spring 批处理配置并行执行 tasklet [不是我现在对未来任务所做的方式] 以及假设如果文件上传失败,我想在一定间隔内重试文件上传过程。

@Autowired
UploadGateway gateway;

@Bean
public Job importDataJob() 
    return jobBuilderFactory.get(FILE_UPLOAD_JOB_NAME).listener(jobExecutionListener(threadPoolTaskExecutor()))
            .incrementer(new RunIdIncrementer()).flow(uploadFiles())
            .end().build();



@Bean
public Step uploadFiles() 
    return stepBuilderFactory.get(UPLOAD_FILE_STEP_NAME).tasklet(new Tasklet() 

        @Override
        public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception 
            log.info("Upload tasklet start executing..");

                resources = resourcePatternResolver.getResources(quantumRuntimeProperties.getInputFilePaths());

                for (Resource anInputResource : resources) 
                    log.info("Incoming file <> to upload....", anInputResource.getFilename());

                    Future<?> submit = threadPoolTaskExecutor().submit(new Callable<Boolean>() 

                        @Override
                        public Boolean call() throws Exception 
                            try 

                                gateway.upload(anInputResource.getFilename());

                             catch (Exception e) 

                            
                            return true;
                        
                    );
                    resultList.add(submit);
                

            return RepeatStatus.FINISHED;
        
    ).build();


@Bean
public JobExecutionListener jobExecutionListener(ThreadPoolTaskExecutor executor) 
    return new JobExecutionListener() 
        private ThreadPoolTaskExecutor taskExecutor = executor;

        @Override
        public void beforeJob(JobExecution jobExecution) 
            // DO-NOTHING
        

        @Override
        public void afterJob(JobExecution jobExecution) 

            for (Future<?> future : resultList) 
                try 
                    // Wait for all the file uploads to complete
                    future.get();
                 catch (InterruptedException | ExecutionException e) 
                    log.error("Error occured while waiting for all files to get uploaded...");
                

            
            taskExecutor.shutdown();
        
    ;

【问题讨论】:

【参考方案1】:

我建议你看看ChunkMessageChannelItemWriterRemoteChunkHandlerFactoryBean - Spring Batch 与 Spring Integration 的集成。请参阅Reference Manual 了解更多信息。

【讨论】:

以上是关于将文件上传为带有集成的春季批处理中的并行过程的主要内容,如果未能解决你的问题,请参考以下文章

并行处理nodejs中的多个文件

从春季启动以角度4发布上传文件?

在itemwriter中,春季批处理未在运行时异常上滚动

春季批处理 - 传输文件

春季文件上传 - '所需的请求部分不存在'

sftp无法上传文件到linux处理过程