Spring集成中的多处理
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spring集成中的多处理相关的知识,希望对你有一定的参考价值。
我有一个Spring Integration服务,它需要从S3存储桶中读取数百个XML文件并处理它们并生成输出。我正在使用S3InboundFileSynchronizingMessageSource
以及AbstractInboundFileSynchronizer
的自定义实现。
@Bean(name = "s3FileSource")
@InboundChannelAdapter(value = "s3Channel" , poller = @Poller(fixedRate = "3000"))
public S3InboundFileSynchronizingMessageSource s3InboundFileSynchronizingMessageSource()
S3InboundFileSynchronizingMessageSource messageSource =
new S3InboundFileSynchronizingMessageSource(s3InboundFileSynchronizer());
messageSource.setAutoCreateLocalDirectory(true);
messageSource.setLocalDirectory(new File(inboundDir));
messageSource.setLocalFilter(new AcceptOnceFileListFilter<File>());
return messageSource;
@Bean
public CustomAbstractInboundFileSynchronizer s3InboundFileSynchronizer()
CustomAbstractInboundFileSynchronizer synchronizer = new CustomAbstractInboundFileSynchronizer(new S3SessionFactory(amazonS3));
synchronizer.setDeleteRemoteFiles(true);
synchronizer.setPreserveTimestamp(true);
synchronizer.setRemoteDirectory(s3BucketName.concat("/").concat(s3InboundFolder));
synchronizer.setFilter(new S3RegexPatternFileListFilter(".*\\.xml\\.0,1\\d0,2"));
Expression expression = PARSER.parseExpression("#this.contains('/') ? #this.substring(#this.lastIndexOf('/') + 1) : #this");
synchronizer.setLocalFilenameGeneratorExpression(expression);
return synchronizer;
@Bean
public DirectChannel s3Channel()
return new DirectChannel();
我想要同时处理这些消息,而不是一次处理一个消息。此适配器之后的集成流程对于所有消息都是相同的。
@Bean
public IntegrationFlow inboundSubFlow()
return IntegrationFlows
.from("s3Channel")
.split()
.transform(transformer)
.channel(requestChannel())
.get();
我尝试使用Splitter split()
,但似乎没有帮助。我也尝试过使用ThreadPooltaskExecutor
,但每当我使用它时,我间歇性地不断收到错误消息,说S3 key is invalid
,它将文件留在入站目录中,并带有.writing
扩展名。
Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: The specified key does not exist. (Service: Amazon S3; Status Code: 404; Error Code: NoSuchKey; Request ID: 5A96A2596B6504D6), S3 Extended Request ID: a757llOA1GbivlaRu1Pf41Sz8XLL52WrgbWCa1PnAzanhyEMCwwR3Zx1H/uytjrEJsrh0Yj8M80=
at com.amazonaws.http.AmazonHttpClient.handleErrorResponse(AmazonHttpClient.java:1378)
at com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:924)
at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:702)
at com.amazonaws.http.AmazonHttpClient.doExecute(AmazonHttpClient.java:454)
at com.amazonaws.http.AmazonHttpClient.executeWithTimer(AmazonHttpClient.java:416)
at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:365)
at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3995)
at com.amazonaws.services.s3.AmazonS3Client.getObject(AmazonS3Client.java:1291)
at com.amazonaws.services.s3.AmazonS3Client.getObject(AmazonS3Client.java:1166)
at org.springframework.integration.aws.support.S3Session.read(S3Session.java:153)
at com.application.service.sample.CustomAbstractInboundFileSynchronizer.copyFileToLocalDirectory(CustomAbstractInboundFileSynchronizer.java:124)
还有一件事,每个xml文件在处理后被移动到S3中的不同“文件夹”。如何同时处理多个xml文件而没有任何错误?
添加CustomAbstractInboundFileSynchronizer
:
@Override
protected void copyFileToLocalDirectory(String remoteDirectoryPath, S3ObjectSummary remoteFile, File localDirectory,
Session<S3ObjectSummary> session) throws IOException
String remoteFileName = this.getFilename(remoteFile);
//String localFileName = this.generateLocalFileName(remoteFileName);
String localFileName = remoteFileName;
String remoteFilePath = remoteDirectoryPath != null
? (remoteDirectoryPath + remoteFileName)
: remoteFileName;
if (!this.isFile(remoteFile))
if (this.logger.isDebugEnabled())
this.logger.debug("cannot copy, not a file: " + remoteFilePath);
return;
File localFile = new File(localDirectory, localFileName);
if (!localFile.exists())
String tempFileName = localFile.getAbsolutePath() + this.temporaryFileSuffix;
File tempFile = new File(tempFileName);
OutputStream outputStream = new BufferedOutputStream(new FileOutputStream(tempFile));
try
session.read(remoteFilePath, outputStream);
catch (Exception e)
if (e instanceof RuntimeException)
throw (RuntimeException) e;
else
throw new MessagingException("Failure occurred while copying from remote to local directory", e);
finally
try
outputStream.close();
catch (Exception ignored2)
if (tempFile.renameTo(localFile))
if (this.deleteRemoteFiles)
session.remove(remoteFilePath);
if (this.logger.isDebugEnabled())
this.logger.debug("deleted " + remoteFilePath);
if (this.preserveTimestamp)
localFile.setLastModified(getModified(remoteFile));
答案
要并行处理消息,必须使用ExecutorChannel
。
要逐行拆分文件内容,必须使用.split(Files.splitter())
。查看其JavaDocs:
/**
* The @link AbstractMessageSplitter implementation to split the @link File Message
* payload to lines.
* <p>
* With @code iterator = true (defaults to @code true) this class produces an
* @link Iterator to process file lines on demand from @link Iterator#next. Otherwise
* a @link List of all lines is returned to the to further
* @link AbstractMessageSplitter#handleRequestMessage process.
* <p>
* Can accept @link String as file path, @link File, @link Reader or
* @link InputStream as payload type. All other types are ignored and returned to the
* @link AbstractMessageSplitter as is.
* <p>
* If @link #setFirstLineAsHeader(String) is specified, the first line of the content is
* treated as a header and carried as a header with the provided name in the messages
* emitted for the remaining lines. In this case, if markers are enabled, the line count
* in the END marker does not include the header line and, if
* @link #setApplySequence(boolean) applySequence is true, the header is not included in
* the sequence.
*
* @author Artem Bilan
* @author Gary Russell
* @author Ruslan Stelmachenko
*
* @since 4.1.2
*/
public class FileSplitter extends AbstractMessageSplitter
所有其他问题值得单独提出。在这里很难进行讨论,而且我们确实对评论有限制以及回答内容大小。
以上是关于Spring集成中的多处理的主要内容,如果未能解决你的问题,请参考以下文章
JavaEE开发之Spring中的多线程编程以及任务定时器详解