两个非常相似的 Spark Dataframe 之间性能差异的可能原因

Posted

技术标签:

【中文标题】两个非常相似的 Spark Dataframe 之间性能差异的可能原因【英文标题】:Possible causes of performance difference between two very similar Spark Dataframes 【发布时间】:2016-10-13 10:36:35 【问题描述】:

我正在为我正在制作原型的推荐引擎改进一些 Spark 操作的性能。我偶然发现我正在使用的 DataFrame 之间存在显着的性能差异。下面是 describe() 的结果。

df1(快速,numPartitions = 4):

+-------+------------------+--------------------+
|summary|           item_id|          popularity|
+-------+------------------+--------------------+
|  count|            187824|              187824|
|   mean| 96693.34836868558|                 1.0|
| stddev|55558.023793621316|5.281958866780519...|
|    min|                 0|  0.9999999999999998|
|    max|            192806|                 1.0|
+-------+------------------+--------------------+

df2(大约慢 10 倍,numPartitions = ±170):

+-------+-----------------+-----------------+
|summary|          item_id|            count|
+-------+-----------------+-----------------+
|  count|           187824|           187824|
|   mean|96693.34836868558|28.70869537439305|
| stddev|55558.02379362146|21.21976457710462|
|    min|                0|                1|
|    max|           192806|              482|
+-------+-----------------+-----------------+

两个 DataFrame 都已缓存,行 (187824) 和列 (2) 大小相同,并且具有相同的 item_id 列。主要区别在于第 1 帧在第二列中包含浮点数,而第 2 帧包含整数。

似乎 DataFrame 2 的每个操作都慢得多,从简单的 .describe().show() 操作到更精细的 .subtract().subtract().take()。在后一种情况下,DataFrame 2 需要 18 秒,而第 1 帧需要 2 秒(几乎慢了 10 倍!)。

我不知道从哪里开始寻找造成这种差异的原因的解释。非常感谢任何朝着正确方向的提示或轻推。

更新:正如 Viacheslav Rodionov 提出的,数据帧的分区数量似乎是 df2 性能问题的原因。

深入挖掘,这两个数据帧都是.groupBy().agg().sortBy() 对同一原始数据帧进行操作的结果。 .groupBy().agg() 操作产生 200 个分区,然后 .sortBy() 分别返回 4 和 ±170 个分区,为什么会这样?

【问题讨论】:

我先看看 df.rdd.getNumPartitions() 分区数为 174(慢)与 4(快)。感谢这个提示,我记得读过一些关于这个的东西,我会更深入地了解情况。分区数由 Spark 自动选择。尝试、错误和手动调整是解决此问题的唯一方法吗? 【参考方案1】:

我先看看df.rdd.getNumPartitions()

较少数量的较大分区几乎总是一个好主意,因为它可以更好地压缩数据并执行更多实际工作,而不是操作文件。

要查看的另一件事是您的数据的外观。它适合您正在尝试执行的任务吗?

如果它是按日期字段排序的,您使用它来应用BETWEEN 操作,它将比仅处理未排序的数据更快。 如果您使用特定的月份或年份,则按它们对数据进行分区是有意义的。 ID 也是如此。如果您使用某些 ID,请通过对数据集进行分区/排序,将相同的 ID 彼此“靠近”。

我在存储数据时的经验法则 - 首先按几个低基数字段(主要是布尔值和日期)进行分区,然后按数据重要性顺序使用 sortWithinPartitions所有 其他字段进行排序。这样,您将获得最佳压缩率(意味着更快的处理时间)和更好的数据局部性(再次更快的处理时间)。但与往常一样,这一切都取决于您的用例,请始终考虑如何处理数据并相应地进行准备。

【讨论】:

感谢您的回答。实际上,两个数据框都按它们的第二列(分别为计数和流行度)排序。此外,这两个帧都是来自同一原始数据帧的.groupBy().agg().sortBy() 操作的结果。看起来步骤groupBy.agg() 在这两种情况下都会产生 200 个分区,而sortBy 分别产生 4 和 ~170。现在试图提炼为什么会发生这种情况 我可以确认将 df2 重新分区为 4 会产生很大的性能提升。 @Fulco groupBy 将首先在本地对您的数据进行分组。检查groupBy().agg() 前后的getNumPartitions。是一样的吗?然后,当您排序时,您将首先在本地进行预排序,然后将所有数据传输到一个地方。如果您的数据按您排序的字段进行分区,并且在分区内进行预排序,那么您不必传输太多,您已经拥有正确的顺序了吗? @Fulco 如果我的回答解决了您的问题,您能否将其标记为最佳答案? 分区数的演变如下;原始=1,groupBy().agg()=200 之后,sortBy()=4 或 ±170 之后。 sortBy() 步骤是分区数量不同的部分,但我仍然不知道为什么会发生这种情况。我已经在我的本地机器上将spark.sql.shuffle.partitions 调整为 4,而不是默认的 200,这提高了 Spark 在我机器上的整体性能。当我移动到集群时,我需要弄清楚如何明智地处理此设置。

以上是关于两个非常相似的 Spark Dataframe 之间性能差异的可能原因的主要内容,如果未能解决你的问题,请参考以下文章

spark利器2函数之dataframe全局排序id与分组后保留最大值行

Spark笔记(1) :余弦相似度计算

字符串列包含通过 spark scala 精确匹配的单词

spark DataFrame 常见操作

Spark之Dataframe基本操作

Spark-SQL之DataFrame操作大全