如何处理来自 S3 的大文件并在 Spring Batch 中使用它
Posted
技术标签:
【中文标题】如何处理来自 S3 的大文件并在 Spring Batch 中使用它【英文标题】:How to process a large file from S3 and use it in spring batch 【发布时间】:2021-12-07 15:44:42 【问题描述】:我有一个包含数百万条记录的 CSV 文件,大小约为 2GB。我的用例是从 S3 读取 CSV 文件并进行处理。请在下面找到我的代码:
在下面的代码中,我正在从 S3 存储桶中读取一个文件,并在 Spring 批处理中直接使用 inputStream
FlatFileItemReader reader.setResource(new InputStreamResource(inputStream));
根据这个实现,我在内存中保存 2GB 的内容并对其进行处理,这不是一种有效的方法 - 有人可以建议从 S3 存储桶中读取大文件并进行处理的有效方法是什么它在 Spring 批处理中。
提前感谢您的帮助!谢谢。
@Component
public class GetFileFromS3
public S3ObjectInputStream dowloadFile(String keyName, String bucketName, String region)
try
AmazonS3 s3Client = AmazonS3ClientBuilder.standard().withClientConfiguration(new ClientConfiguration())
.withRegion(region).build();
S3Object s3object = s3Client.getObject(bucketName, keyName);
return s3object.getObjectContent();
catch (AmazonServiceException e)
e.printStackTrace();
return null;
public class SpringBatch
@Autowired
private GetFileFromS3 getFileFromS3;
@Bean(name = "csvFile")
public Step step1()
return stepBuilderFactory.get("step1").<Employee, Employee>chunk(10)
.reader(reader())
.processor(processor())
.writer(writer())
.build();
@Bean
public FlatFileItemReader<Employee> reader()
S3ObjectInputStream inputStream = getFileFromS3.dowloadFile("employee.csv", "testBucket", "us-east-1");
FlatFileItemReader<Employee> reader = new FlatFileItemReader<Employee>();
reader.setResource(new InputStreamResource(inputStream));
reader.setLinesToSkip(1);
reader.setLineMapper(new DefaultLineMapper()
setLineTokenizer(new DelimitedLineTokenizer()
setNames(Employee.fields());
);
setFieldSetMapper(new BeanWrapperFieldSetMapper<Employee>()
setTargetType(Employee.class);
);
);
return reader;
@Bean
public ItemProcessor<Employee, Employee> processor()
return new ItemProcessor();
@Bean
public ItemWriter<Employee> writer()
return new ItemWriter<Event>();
【问题讨论】:
【参考方案1】:利用 ResourceLoader,我们可以像读取其他资源一样在 ItemReader 中读取 S3 中的文件。这将有助于以块的形式读取 S3 中的文件,而不是将整个文件加载到内存中。
为ResourceLoader
和AmazonS3 client
注入依赖项后,阅读器配置更改如下:
根据需要替换 sourceBucket
和 sourceObjectPrefix
的值。
@Autowired
private ResourceLoader resourceLoader;
@Autowired
private AmazonS3 amazonS3Client;
// READER
@Bean(destroyMethod="")
@StepScope
public SynchronizedItemStreamReader<Employee> employeeDataReader()
SynchronizedItemStreamReader synchronizedItemStreamReader = new SynchronizedItemStreamReader();
List<Resource> resourceList = new ArrayList<>();
String sourceBucket = yourBucketName;
String sourceObjectPrefix = yourSourceObjectPrefix;
log.info("sourceObjectPrefix::"+sourceObjectPrefix);
ListObjectsRequest listObjectsRequest = new ListObjectsRequest()
.withBucketName(sourceBucket)
.withPrefix(sourceObjectPrefix);
ObjectListing sourceObjectsListing;
do
sourceObjectsListing = amazonS3Client.listObjects(listObjectsRequest);
for (S3ObjectSummary sourceFile : sourceObjectsListing.getObjectSummaries())
if(!(sourceFile.getSize() > 0)
|| (!sourceFile.getKey().endsWith(DOT.concat("csv")))
)
// Skip if file is empty (or) file extension is not "csv"
continue;
log.info("Reading "+sourceFile.getKey());
resourceList.add(resourceLoader.getResource("s3://".concat(sourceBucket).concat("/")
.concat(sourceFile.getKey())));
listObjectsRequest.setMarker(sourceObjectsListing.getNextMarker());
while(sourceObjectsListing.isTruncated());
Resource[] resources = resourceList.toArray(new Resource[resourceList.size()]);
MultiResourceItemReader<Employee> multiResourceItemReader = new MultiResourceItemReader<>();
multiResourceItemReader.setName("employee-multiResource-Reader");
multiResourceItemReader.setResources(resources);
multiResourceItemReader.setDelegate(employeeFileItemReader());
synchronizedItemStreamReader.setDelegate(multiResourceItemReader);
return synchronizedItemStreamReader;
@Bean
@StepScope
public FlatFileItemReader<Employee> employeeFileItemReader()
FlatFileItemReader<Employee> reader = new FlatFileItemReader<Employee>();
reader.setLinesToSkip(1);
reader.setLineMapper(new DefaultLineMapper()
setLineTokenizer(new DelimitedLineTokenizer()
setNames(Employee.fields());
);
setFieldSetMapper(new BeanWrapperFieldSetMapper<Employee>()
setTargetType(Employee.class);
);
);
return reader;
以 MultiResourceItemReader 为例。即使您要查找的特定 S3 路径中有多个 CSV 文件,这也可以工作。
如果在一个位置只处理一个 CSV 文件,它也可以隐式地工作,Resources[] resources
包含一个条目。
【讨论】:
以上是关于如何处理来自 S3 的大文件并在 Spring Batch 中使用它的主要内容,如果未能解决你的问题,请参考以下文章
如何处理来自用户输入的偶数数组并在 C++ 中用空格显示它们