Spark Dataframe 在 EMR 上加载 500k 文件
Posted
技术标签:
【中文标题】Spark Dataframe 在 EMR 上加载 500k 文件【英文标题】:Spark Dataframe loading 500k files on EMR 【发布时间】:2018-06-13 17:14:57 【问题描述】:我正在使用 Spark 2.1.0、Hadoop 2.7.3、Hive 2.1.1、Sqoop 1.4.6 和 Ganglia 3.7.2 在 EMR ( 5.5.1 ) 上运行 pyspark 作业,该作业从 s3 加载数据。有多个包含输入文件的存储桶,所以我有一个函数使用 boto 遍历它们并根据某种模式将它们过滤掉。
集群大小:Master => r4.xlarge,Worker => 3 x r4.4xlarge
问题:
函数 getFilePaths
返回一个 s3 路径列表,该路径直接馈送到 spark 数据帧加载方法。
使用数据框
file_list = getFilePaths() # ['s3://some_bucket/log.json.gz','s3://some_bucket/log2.json.gz']
schema = getSchema() # for mapping to the json files
df = sparkSession.read.format('json').load(file_list, schema=schema)
使用 RDD
master_rdd = sparkSession.sparkContext.union(
map(lambda file: sparkSession.sparkContext.textFile(file), file_list)
)
df = sparkSession.createDataFrame(master_rdd, schema=schema)
由于大量的数据和文件,file_list
可能是一个巨大的列表(最多 500k 个文件)。这些路径的计算只需要 5-20 分钟,但是当尝试使用 spark 将它们作为数据帧加载时,spark UI 会在 小时 内保持不活动状态,即根本不处理任何内容。 处理 500k 文件的不活动时间超过 9 小时,而处理 100k 文件的不活动时间约为 1.5 小时。
查看 Gangilla 指标显示只有驱动程序在运行/处理,而工作人员处于空闲状态。在 spark 作业完成之前不会生成任何日志,并且我对 500k 个文件没有任何成功。
我尝试过 s3、s3n 连接器,但没有成功。
问题:
找出造成这种延迟的根本原因? 如何正确调试?【问题讨论】:
在 spark 无法并行读取文件时遇到了一些问题,因此走这条路 ***.com/questions/28685874/… 【参考方案1】:一般来说,Spark/Hadoop 更喜欢拥有可以拆分的大文件,而不是大量的小文件。您可能会尝试的一种方法是并行化您的文件列表,然后在地图调用中加载数据。
我现在没有资源来测试这个,但它应该类似于这个:
file_list = getFilePaths()
schema = getSchema() # for mapping to the json files
paths_rdd = sc.parallelize(file_list)
def get_data(path):
s3 = boto3.resource('s3')
obj = s3.Object(bucket, path)
data = obj.get()['Body'].read().decode('utf-8')
return [json.loads(r) for r in data.split('\n')]
rows_rdd = rdd.flatMap(get_data)
df = spark.createDataFrame(rows_rdd, schema=schema)
您也可以改用 mapPartition 来提高效率,这样您就不需要每次都重新创建 s3 对象。
2018 年 6 月 14 日编辑:
关于处理 gzip 数据,您可以使用 python 解压缩 gzip 数据流,详见此答案:https://***.com/a/12572031/1461187。基本上只需将obj.get()['Body'].read()
传入该答案中定义的函数即可。
【讨论】:
这些文件大约是。每个大小为 60mb,因此我无法进一步合并它们。我会试一试,看看它是否有助于提高性能。 只要您选择了可拆分的文件格式(即不是 gzip),您就应该能够将它们组合起来。它可能需要一些额外的数据加载步骤来转换您的数据,但如果您可以在保存到 S3 或对其进行任何广泛的工作之前将所有内容转换为 Parquet 或 Avro 开始,那么使用起来会容易得多。然后,您无需额外工作即可获得模式读取和文件拆分。 是的,这是我的意图,但首先我想让它工作。 Json 慢很多,所以我在开始处理它们之前正在考虑镶木地板。 Parquet 在使用 HDFS 时非常棒,但对于 S3,请确保查看需要设置的优化以避免额外写入。出于这个原因,我通常更喜欢 avro 和 S3。 有没有办法可以直接将文件路径与 dataframe 一起使用?我想避免使用 rdds【参考方案2】:出现了两个性能问题
-
读取文件:gzip 文件无法拆分以在工作人员之间共享工作负载,尽管对于 50 MB 的文件,拆分文件几乎没有什么好处
S3 连接器 spark 使用模仿目录结构的方式是复杂目录树的真正性能杀手。
问题 #2 减慢了分区速度:决定做什么的初始代码,在任何计算之前完成。
我将如何解决这个问题?好吧,这里没有魔法开关。但是
拥有更少、更大的文件;如前所述,Avro 很好,Parquet 和 ORC 后来也很好。 使用非常浅的目录树。这些文件都在一个目录中吗?还是在深层目录树中?后者更糟。首先合并文件。
我也会避免任何形式的模式推断;听起来你没有这样做(好!),但对于其他阅读此答案的人:知道对于 CSV 和大概 JSON,模式推断意味着“读取所有数据一次只是为了计算模式”
【讨论】:
不幸的是,它们处于深层目录结构中。我计划使用与 json 不同的压缩格式,但有一些限制,所以我必须至少使其适用于当前设置。 压缩更改会加快工作人员的更改,但对前面的分区过程不做任何事情。对不起以上是关于Spark Dataframe 在 EMR 上加载 500k 文件的主要内容,如果未能解决你的问题,请参考以下文章
为啥 AWS EMR 上的 Spark 不从应用程序 fat jar 加载类?
在 EMR 中使用 spark ad scala 从 redshift 加载数据