使用 spark 下载、处理、上传大量 s3 文件

Posted

技术标签:

【中文标题】使用 spark 下载、处理、上传大量 s3 文件【英文标题】:download, process, upload large number of s3 files with spark 【发布时间】:2016-06-17 13:46:06 【问题描述】:

我在一个 s3 存储桶中有大量文件 (~500k hdf5),我需要对其进行处理并重新上传到另一个 s3 存储桶。

我对这些任务很陌生,所以我不太确定我的方法在这里是否正确。我执行以下操作: 我使用 boto 来获取桶内的键列表并与 spark 并行化:

s3keys = bucket.list()
data = sc.parallelize(s3keys)
data = data.map(lambda x: download_process_upload(x))
result = data.collect()

其中download_process_upload 是一个函数,它下载密钥指定的文件,对其进行一些处理并将其重新上传到另一个存储桶(如果一切成功则返回 1,如果有错误则返回 0) 所以最后我可以做到

success_rate = sum(result) / float(len(s3keys))

我读过 sparkmap 语句应该是无状态的,而我的自定义地图函数绝对不是无状态的。它将文件下载到磁盘,然后将其加载到内存等中。

那么这是执行此类任务的正确方法吗?

【问题讨论】:

【参考方案1】:

我已成功使用您的方法从 S3 下载和处理数据。我没有尝试从地图声明中上传数据。但是,我认为您没有理由无法从 s3 读取文件、处理它,然后将其上传到新位置。

此外,您可以节省一些击键,并像 data = data.map(download_process_upload) 那样从 map 语句中取出显式 lambda

【讨论】:

以上是关于使用 spark 下载、处理、上传大量 s3 文件的主要内容,如果未能解决你的问题,请参考以下文章

从 Apache Spark 分段上传到 Amazon S3

PySpark S3 文件读取性能考虑

Spark S3A 写入省略了上传部分而没有失败

如何在不下载文件的情况下搜索amazon S3存储桶中的文件内容

针对大量小文件优化 S3 下载

从 EMR Spark 处理 s3 gzip 文件的有效方法