PySpark .groupBy() 和 .count() 在相​​对较小的 Dataframe 上运行缓慢

Posted

技术标签:

【中文标题】PySpark .groupBy() 和 .count() 在相​​对较小的 Dataframe 上运行缓慢【英文标题】:PySpark .groupBy() and .count() slow on a relatively small Dataframe 【发布时间】:2017-01-12 11:19:51 【问题描述】:

好的,让我先描述一下我是如何创建Dataframe 的以及其中的内容。

我有一组压缩过的 html 文档和一组压缩过的元数据到这些 HTML 文档

对于这两个,我提供了一个RDDs 的路径列表,如下所示:

Wet_Paths_RDD = sc.parallelize(Wet_Paths)
Wet_RDD = Wet_Paths_RDD.map(open_wet_filelist).flatMap(split_wetfiles)

我以这样的方式准备两个 RDD:

(k,(some,other,values))

然后我像这样将我的元数据RDD 和我的内容RDD 连接在一起:

Wat_Wet_RDD = Wat_RDD.join(Wet_RDD)

然后我解压缩现在相对复杂的元组并进行语言检测。我必须加入RDDs,因为到目前为止,我所有的字符串都表示为byte strings,无法在Dataframe 中表示。

Wat_Wet_RDD = Wat_Wet_RDD.map(unpack_wat_wet_tuple_and_decoding_and_langdetect)

然后我将加入的RDD 转移到Dataframe

wat_wet_schema = StructType([
    StructField("URI", StringType(), True),
    StructField("Links", StringType(), True),
    StructField("N_Links", IntegerType(), True),
    StructField("Content_type", StringType(), True),
    StructField("Original_Encoding", StringType(), True),
    StructField("Content", StringType(), True),
    StructField("Language", StringType(), True),
    StructField("Language_confidence", IntegerType(), True),
])

WatWet_DF = sqlContext.createDataFrame(Wat_Wet_RDD, schema=wat_wet_schema)

然后看看它:

print(WatWet_DF.show(20))

到目前为止,一切都需要 24 分钟,但下一步:

print(WatWet_DF.groupBy(WatWet_DF.Language).count().orderBy(desc('count')).show(100))

我在 24 小时后中止了此阶段的任何任务都没有解决。

目前我正在一个测试 Linux 虚拟机上运行集群。 VM 有 4 个核心,同时运行 Master 和 Worker。 Worker 有 4 个刽子手,每个有 3.5G 的内存。 Dataframe 应该由大约 100 万行组成。 Apache Spark 版本为 2.1.0,使用 python 3.5。 VM 运行在具有 24G RAM 的过时 Xeon W3680 6(v12) 内核之上。

【问题讨论】:

WatWet_DF 中有多少行? 应该是给或拿一百万。 .count() 一个人已经花费了很长时间。 WatWet_DF.content 字段包含整个文档,所以也许这就是 Dataframe 慢的原因? 【参考方案1】:

好的,所以我发现了为什么 .count().groupBy() 在这个数据集上花费的时间比 .show() 长得多。原因是.count().groupBy()要全部提供结果,这里Wat_Wet_RDD.map(unpack_wat_wet_tuple_and_decoding_and_langdetect)在map阶段执行的函数需要应用于整个数据集。要让.show() 提供结果,只需将这些函数应用于整个数据集的子集,从而更快地提供结果。现在映射阶段,Wat_Wet_RDD.map(unpack_wat_wet_tuple_and_decoding_and_langdetect) 有一些非常昂贵的函数,导致计算时间非常长,尤其是当 .count().groupBy().show() 进行比较时。

【讨论】:

以上是关于PySpark .groupBy() 和 .count() 在相​​对较小的 Dataframe 上运行缓慢的主要内容,如果未能解决你的问题,请参考以下文章

Pyspark 中的 Groupby 和标准化值

PySpark Groupby 和接收特定列

PySpark 1.5 Groupby Sum 用于 Dataframe 中的新列

PySpark groupby 和最大值选择

PySpark 中的 Groupby 和 UDF/UDAF,同时保持 DataFrame 结构

PySpark .groupBy() 和 .count() 在相​​对较小的 Dataframe 上运行缓慢