Spring集成中的多处理

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spring集成中的多处理相关的知识,希望对你有一定的参考价值。

我有一个Spring Integration服务,它需要从S3存储桶中读取数百个XML文件并处理它们并生成输出。我正在使用S3InboundFileSynchronizingMessageSource以及AbstractInboundFileSynchronizer的自定义实现。

@Bean(name = "s3FileSource")
@InboundChannelAdapter(value = "s3Channel" , poller = @Poller(fixedRate = "3000"))
public S3InboundFileSynchronizingMessageSource s3InboundFileSynchronizingMessageSource() 
    S3InboundFileSynchronizingMessageSource messageSource =
            new S3InboundFileSynchronizingMessageSource(s3InboundFileSynchronizer());
    messageSource.setAutoCreateLocalDirectory(true);
    messageSource.setLocalDirectory(new File(inboundDir));
    messageSource.setLocalFilter(new AcceptOnceFileListFilter<File>());
    return messageSource;


@Bean
public CustomAbstractInboundFileSynchronizer s3InboundFileSynchronizer() 

    CustomAbstractInboundFileSynchronizer synchronizer = new CustomAbstractInboundFileSynchronizer(new S3SessionFactory(amazonS3));
    synchronizer.setDeleteRemoteFiles(true);
    synchronizer.setPreserveTimestamp(true);
    synchronizer.setRemoteDirectory(s3BucketName.concat("/").concat(s3InboundFolder));
    synchronizer.setFilter(new S3RegexPatternFileListFilter(".*\\.xml\\.0,1\\d0,2"));
    Expression expression = PARSER.parseExpression("#this.contains('/') ? #this.substring(#this.lastIndexOf('/') + 1) : #this");
    synchronizer.setLocalFilenameGeneratorExpression(expression);
    return synchronizer;

@Bean
public DirectChannel s3Channel() 
    return new DirectChannel();

我想要同时处理这些消息,而不是一次处理一个消息。此适配器之后的集成流程对于所有消息都是相同的。

@Bean
public IntegrationFlow inboundSubFlow() 
    return IntegrationFlows
            .from("s3Channel")
            .split()
            .transform(transformer)
            .channel(requestChannel())
            .get();

我尝试使用Splitter split(),但似乎没有帮助。我也尝试过使用ThreadPooltaskExecutor,但每当我使用它时,我间歇性地不断收到错误消息,说S3 key is invalid,它将文件留在入站目录中,并带有.writing扩展名。

Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: The specified key does not exist. (Service: Amazon S3; Status Code: 404; Error Code: NoSuchKey; Request ID: 5A96A2596B6504D6), S3 Extended Request ID: a757llOA1GbivlaRu1Pf41Sz8XLL52WrgbWCa1PnAzanhyEMCwwR3Zx1H/uytjrEJsrh0Yj8M80=
at com.amazonaws.http.AmazonHttpClient.handleErrorResponse(AmazonHttpClient.java:1378)
at com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:924)
at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:702)
at com.amazonaws.http.AmazonHttpClient.doExecute(AmazonHttpClient.java:454)
at com.amazonaws.http.AmazonHttpClient.executeWithTimer(AmazonHttpClient.java:416)
at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:365)
at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3995)
at com.amazonaws.services.s3.AmazonS3Client.getObject(AmazonS3Client.java:1291)
at com.amazonaws.services.s3.AmazonS3Client.getObject(AmazonS3Client.java:1166)
at org.springframework.integration.aws.support.S3Session.read(S3Session.java:153)
at com.application.service.sample.CustomAbstractInboundFileSynchronizer.copyFileToLocalDirectory(CustomAbstractInboundFileSynchronizer.java:124)

还有一件事,每个xml文件在处理后被移动到S3中的不同“文件夹”。如何同时处理多个xml文件而没有任何错误?

添加CustomAbstractInboundFileSynchronizer

    @Override
protected void copyFileToLocalDirectory(String remoteDirectoryPath, S3ObjectSummary remoteFile, File localDirectory,
                                        Session<S3ObjectSummary> session) throws IOException 
    String remoteFileName = this.getFilename(remoteFile);
    //String localFileName = this.generateLocalFileName(remoteFileName);
    String localFileName = remoteFileName;
    String remoteFilePath = remoteDirectoryPath != null
            ? (remoteDirectoryPath + remoteFileName)
            : remoteFileName;
    if (!this.isFile(remoteFile)) 
        if (this.logger.isDebugEnabled()) 
            this.logger.debug("cannot copy, not a file: " + remoteFilePath);
        
        return;
    

    File localFile = new File(localDirectory, localFileName);
    if (!localFile.exists()) 
        String tempFileName = localFile.getAbsolutePath() + this.temporaryFileSuffix;
        File tempFile = new File(tempFileName);
        OutputStream outputStream = new BufferedOutputStream(new FileOutputStream(tempFile));
        try 
            session.read(remoteFilePath, outputStream);
        
        catch (Exception e) 
            if (e instanceof RuntimeException) 
                throw (RuntimeException) e;
            
            else 
                throw new MessagingException("Failure occurred while copying from remote to local directory", e);
            
        
        finally 
            try 
                outputStream.close();
            
            catch (Exception ignored2) 
            
        

        if (tempFile.renameTo(localFile)) 
            if (this.deleteRemoteFiles) 
                session.remove(remoteFilePath);
                if (this.logger.isDebugEnabled()) 
                    this.logger.debug("deleted " + remoteFilePath);
                
            
        
        if (this.preserveTimestamp) 
            localFile.setLastModified(getModified(remoteFile));
        
    

答案

要并行处理消息,必须使用ExecutorChannel

要逐行拆分文件内容,必须使用.split(Files.splitter())。查看其JavaDocs:

/**
 * The @link AbstractMessageSplitter implementation to split the @link File Message
 * payload to lines.
 * <p>
 * With @code iterator = true (defaults to @code true) this class produces an
 * @link Iterator to process file lines on demand from @link Iterator#next. Otherwise
 * a @link List of all lines is returned to the to further
 * @link AbstractMessageSplitter#handleRequestMessage process.
 * <p>
 * Can accept @link String as file path, @link File, @link Reader or
 * @link InputStream as payload type. All other types are ignored and returned to the
 * @link AbstractMessageSplitter as is.
 * <p>
 * If @link #setFirstLineAsHeader(String) is specified, the first line of the content is
 * treated as a header and carried as a header with the provided name in the messages
 * emitted for the remaining lines. In this case, if markers are enabled, the line count
 * in the END marker does not include the header line and, if
 * @link #setApplySequence(boolean) applySequence is true, the header is not included in
 * the sequence.
 *
 * @author Artem Bilan
 * @author Gary Russell
 * @author Ruslan Stelmachenko
 *
 * @since 4.1.2
 */
public class FileSplitter extends AbstractMessageSplitter 

所有其他问题值得单独提出。在这里很难进行讨论,而且我们确实对评论有限制以及回答内容大小。

以上是关于Spring集成中的多处理的主要内容,如果未能解决你的问题,请参考以下文章

Spring集成流程中的错误处理实践

将文件上传为带有集成的春季批处理中的并行过程

JavaEE开发之Spring中的多线程编程以及任务定时器详解

如何将 Spring Boot 项目集成到已经存在的多模块 Maven 项目中?

Spring Boot 中的多租户

Spring Boot 中的多对多映射问题