如何处理来自 S3 的大文件并在 Spring Batch 中使用它

Posted

技术标签:

【中文标题】如何处理来自 S3 的大文件并在 Spring Batch 中使用它【英文标题】:How to process a large file from S3 and use it in spring batch 【发布时间】:2021-12-07 15:44:42 【问题描述】:

我有一个包含数百万条记录的 CSV 文件,大小约为 2GB。我的用例是从 S3 读取 CSV 文件并进行处理。请在下面找到我的代码:

在下面的代码中,我正在从 S3 存储桶中读取一个文件,并在 Spring 批处理中直接使用 inputStream FlatFileItemReader reader.setResource(new InputStreamResource(inputStream));

根据这个实现,我在内存中保存 2GB 的内容并对其进行处理,这不是一种有效的方法 - 有人可以建议从 S3 存储桶中读取大文件并进行处理的有效方法是什么它在 Spring 批处理中。

提前感谢您的帮助!谢谢。

@Component
public class GetFileFromS3 

    public S3ObjectInputStream dowloadFile(String keyName, String bucketName, String region) 
        try 
            AmazonS3 s3Client = AmazonS3ClientBuilder.standard().withClientConfiguration(new ClientConfiguration())
                    .withRegion(region).build();

            S3Object s3object = s3Client.getObject(bucketName, keyName);
            return s3object.getObjectContent();
         catch (AmazonServiceException e) 
            e.printStackTrace();
        
        return null;
    






public class SpringBatch 

    @Autowired
    private GetFileFromS3 getFileFromS3;


 @Bean(name = "csvFile")
    public Step step1() 
        return stepBuilderFactory.get("step1").<Employee, Employee>chunk(10)
                .reader(reader())
                .processor(processor())
                .writer(writer())
                .build();
    

    @Bean
    public FlatFileItemReader<Employee> reader() 
        S3ObjectInputStream inputStream = getFileFromS3.dowloadFile("employee.csv", "testBucket", "us-east-1");
        FlatFileItemReader<Employee> reader = new FlatFileItemReader<Employee>();
        reader.setResource(new InputStreamResource(inputStream));
        reader.setLinesToSkip(1);
        reader.setLineMapper(new DefaultLineMapper() 
            
                setLineTokenizer(new DelimitedLineTokenizer() 
                    
                        setNames(Employee.fields());
                    
                );
                setFieldSetMapper(new BeanWrapperFieldSetMapper<Employee>() 
                    
                        setTargetType(Employee.class);
                    
                );
            
        );
        return reader;
    

    @Bean
    public ItemProcessor<Employee, Employee> processor() 
        return new ItemProcessor();
    

    @Bean
    public ItemWriter<Employee> writer() 
        return new ItemWriter<Event>();
    

    

【问题讨论】:

【参考方案1】:

利用 ResourceLoader,我们可以像读取其他资源一样在 ItemReader 中读取 S3 中的文件。这将有助于以块的形式读取 S3 中的文件,而不是将整个文件加载到内存中。

ResourceLoaderAmazonS3 client 注入依赖项后,阅读器配置更改如下:

根据需要替换 sourceBucketsourceObjectPrefix 的值。

@Autowired
private ResourceLoader resourceLoader;

@Autowired
private AmazonS3 amazonS3Client;

// READER
@Bean(destroyMethod="")
@StepScope
public SynchronizedItemStreamReader<Employee> employeeDataReader() 
    SynchronizedItemStreamReader synchronizedItemStreamReader = new SynchronizedItemStreamReader();
    List<Resource> resourceList = new ArrayList<>();
    String sourceBucket = yourBucketName;
    String sourceObjectPrefix = yourSourceObjectPrefix;
    log.info("sourceObjectPrefix::"+sourceObjectPrefix);
    ListObjectsRequest listObjectsRequest = new ListObjectsRequest()
            .withBucketName(sourceBucket)
            .withPrefix(sourceObjectPrefix);
    ObjectListing sourceObjectsListing;
    do
        sourceObjectsListing = amazonS3Client.listObjects(listObjectsRequest);
        for (S3ObjectSummary sourceFile : sourceObjectsListing.getObjectSummaries())

            if(!(sourceFile.getSize() > 0)
                    || (!sourceFile.getKey().endsWith(DOT.concat("csv")))
            )
                // Skip if file is empty (or) file extension is not "csv"
                continue;
            
            log.info("Reading "+sourceFile.getKey());
            resourceList.add(resourceLoader.getResource("s3://".concat(sourceBucket).concat("/")
                    .concat(sourceFile.getKey())));
        
        listObjectsRequest.setMarker(sourceObjectsListing.getNextMarker());
    while(sourceObjectsListing.isTruncated());

    Resource[] resources = resourceList.toArray(new Resource[resourceList.size()]);
    MultiResourceItemReader<Employee> multiResourceItemReader = new MultiResourceItemReader<>();
    multiResourceItemReader.setName("employee-multiResource-Reader");
    multiResourceItemReader.setResources(resources);
    multiResourceItemReader.setDelegate(employeeFileItemReader());
    synchronizedItemStreamReader.setDelegate(multiResourceItemReader);
    return synchronizedItemStreamReader;


@Bean
@StepScope
public FlatFileItemReader<Employee> employeeFileItemReader()

    FlatFileItemReader<Employee> reader = new FlatFileItemReader<Employee>();
    reader.setLinesToSkip(1);
    reader.setLineMapper(new DefaultLineMapper() 
        
            setLineTokenizer(new DelimitedLineTokenizer() 
                
                    setNames(Employee.fields());
                
            );
            setFieldSetMapper(new BeanWrapperFieldSetMapper<Employee>() 
                
                    setTargetType(Employee.class);
                
            );
        
    );
    return reader;

以 MultiResourceItemReader 为例。即使您要查找的特定 S3 路径中有多个 CSV 文件,这也可以工作。

如果在一个位置只处理一个 CSV 文件,它也可以隐式地工作,Resources[] resources 包含一个条目。

【讨论】:

以上是关于如何处理来自 S3 的大文件并在 Spring Batch 中使用它的主要内容,如果未能解决你的问题,请参考以下文章

如何处理来自用户输入的偶数数组并在 C++ 中用空格显示它们

aws s3 如何处理覆盖文件和访问?

如何处理 R 中的大 JSON 文件?

如何处理来自catch块的spring jpa @transactional和新插入[重复]

如何处理在WPF TreeView中显示数据的大延迟

Android/Parse-如何处理来自 ParseQuery 的多个回调