使用 Spring 集成轮询 S3 存储桶以获取文件

Posted

技术标签:

【中文标题】使用 Spring 集成轮询 S3 存储桶以获取文件【英文标题】:Polling S3 Bucket for file using Spring integration 【发布时间】:2018-12-29 15:50:23 【问题描述】:

我正在处理一个项目,我需要轮询 S3 存储桶中的文件并上传到不同的 S3 存储桶中。作为实现它的第一步,我正在尝试轮询 S3 存储桶以查找创建的新文件,并使用 Spring Integration 在我的本地目录中创建它们。为了实现这一点,我使用 maven 创建了一个简单的 spring-boot 应用程序,具有以下对象轮询配置,同时处理 fileReading IntegrationFlow

@Configuration
@EnableIntegration
@IntegrationComponentScan
@EnableAsync
public class ObjectPollerConfiguration 
    @Value("$amazonProperties.bucketName")
    private String bucketName;
    public static final String OUTPUT_DIR2 = "target2";
    @Autowired
    private AmazonClient amazonClient;
    @Bean
    public S3InboundFileSynchronizer s3InboundFileSynchronizer() 
        S3InboundFileSynchronizer synchronizer = new S3InboundFileSynchronizer(amazonClient.getS3Client());
        synchronizer.setDeleteRemoteFiles(true);
        synchronizer.setPreserveTimestamp(true);
        synchronizer.setRemoteDirectory(bucketName);            
        return synchronizer;
    
    @Bean
    @InboundChannelAdapter(value = "s3FilesChannel", poller = @Poller(fixedDelay = "30"))
    public S3InboundFileSynchronizingMessageSource s3InboundFileSynchronizingMessageSource() 
        S3InboundFileSynchronizingMessageSource messageSource =
                new S3InboundFileSynchronizingMessageSource(s3InboundFileSynchronizer());
        messageSource.setAutoCreateLocalDirectory(true);
        messageSource.setLocalDirectory(new File("."));
        messageSource.setLocalFilter(new AcceptOnceFileListFilter<File>());
        return messageSource;
    
    @Bean
    public PollableChannel s3FilesChannel() 
        return new QueueChannel();
    
    @Bean
    IntegrationFlow fileReadingFlow() 
        return IntegrationFlows
                .from(s3InboundFileSynchronizingMessageSource(),
                        e -> e.poller(p -> p.fixedDelay(30, TimeUnit.SECONDS)))
                .handle(fileProcessor())
                .get();
    
    @Bean
    public MessageHandler fileProcessor() 
        FileWritingMessageHandler handler = new FileWritingMessageHandler(new File(OUTPUT_DIR2));
        handler.setExpectReply(false); // end of pipeline, reply not needed
        return handler;
    
*

但是当我将我的应用程序作为 java 应用程序启动并将文件上传到 S3 时,我没有看到带有文件的 target2 目录,也没有得到任何与轮询执行相对应的日志。有人可以帮我让它工作吗?

【问题讨论】:

【参考方案1】:

我认为你没有使用 OUTPUT_DIR2 属性来推入本地目录的问题。

您的本地目录代码如下:

messageSource.setLocalDirectory(new File("."));

这完全不是您想要的。 尝试将其更改为

messageSource.setLocalDirectory(new File(OUTPUT_DIR2));

【讨论】:

谢谢阿特姆。让我试试。同时,我为 S3 对象上传和消费创建了发布事件 我正在处理一个我应该轮询 S3 的用例 -> 读取图像内容的流 -> 进行一些处理并将其上传到另一个存储桶,而不是在我的服务器中写入文件。你能帮我用 S3StreamingMessageSource 实现这个吗? 让我为此创建一个问题。请回答 请不要推动开源社区。我们根据意愿提供帮助。如果我们决定不回答,您将不会收到我们的消息。我们这边也确实有一些工作要做……尊重他人的时间和资源。 我很抱歉以这种方式寻求帮助。我理解并尊重他人的时间和资源。将确保我以后不会那样说

以上是关于使用 Spring 集成轮询 S3 存储桶以获取文件的主要内容,如果未能解决你的问题,请参考以下文章

定期轮询具有大量文件的 AWS S3 存储桶中的新文件?

Spring 批处理作业应仅在 Spring 集成文件轮询器轮询文件后执行一次

Powershell:使用临时凭证访问AWS s3存储桶

Spring集成中的多处理

aws 胶水主要丢弃空字段

轮询新 S3 对象的最佳方法?