pyspark - 使用 RDD 进行聚合比 DataFrame 快得多

Posted

技术标签:

【中文标题】pyspark - 使用 RDD 进行聚合比 DataFrame 快得多【英文标题】:pyspark - Aggregate using RDDs much faster than DataFrames 【发布时间】:2018-05-07 07:49:03 【问题描述】:

我正在尝试对来自谷歌克的(大)CSV 进行简单的查找和聚合。为此,我有我的patterns_set 广播变量,其中属于我想要查找的所有键,然后我在df 中查找它们,pyspark.sql.DataFrame 使用databricks.csv 格式创建。 所以我想按ngram(col 0)分组,然后对match_count(col 1)求和。

但是当我在本地尝试时(16 毫秒与 43 秒),使用 RDD 或使用 DataFrame 计算该作业之间存在巨大差异。不完全确定集群上也会发生这种情况 - 这是预期的吗?

%%time
from operator import itemgetter, add
df.rdd.filter(lambda x: x[0] in patterns_set.value).keyBy(itemgetter(0))\
.mapValues(itemgetter(1))\
.mapValues(int)\
.reduceByKey(add)

这需要:

CPU times: user 7.04 ms, sys: 3.24 ms, total: 10.3 ms
Wall time: 16.7 ms

但是在尝试使用数据框时:

%%time
df.filter(df.ngram.isin(patterns_set.value))\
  .groupby('ngram').sum('match_count')

墙上的时间要长得多

CPU times: user 6.78 s, sys: 1.54 s, total: 8.32 s
Wall time: 43.3 s

【问题讨论】:

这是您在笔记本单元格中的实际代码吗?仅此而已? 除了导入之外,使用databricks.csv 加载df 并使用pickle 加载patterns_set(并广播它),仅此而已。 【参考方案1】:

你的代码并不能衡量你认为它做了什么。

第一个 sn-p 非常快,因为它几乎什么都不做。 RDD 转换是惰性的,因此根本不会触及数据(或仅被访问以推断架构,具体取决于上游代码)。

根据您所展示的内容,不可能为什么第二个 sn-p 很慢,但最好的选择是元存储初始化(如果这个 sn-p 实际上首先执行)或计算执行计划所需的时间(这个尤其是在有大量列的情况下可能会发生)。与第一个 sn-p 相同,它(或多或少)是惰性的,因此实际上没有处理数据。

【讨论】:

以上是关于pyspark - 使用 RDD 进行聚合比 DataFrame 快得多的主要内容,如果未能解决你的问题,请参考以下文章

[Pyspark]RDD常用方法总结

在 python (pyspark) 中使用 combinebykey spark rdd 计算组上的聚合

Pyspark - 多列聚合

PySpark|比RDD更快的DataFrame

pyspark 如何像在 scala .drop 中一样删除 rdd 列

如何使用 PySpark 对两个 RDD 进行完全外连接?