在 Spark DataFrame 中计算大于 0 的值的更快方法?

Posted

技术标签:

【中文标题】在 Spark DataFrame 中计算大于 0 的值的更快方法?【英文标题】:Faster way to count values greater than 0 in Spark DataFrame? 【发布时间】:2018-07-13 21:46:06 【问题描述】:

我有一个 Spark DataFrame,其中所有字段都是整数类型。我需要计算有多少单个单元格大于 0。

我在本地运行,有一个 DataFrame,有 17,000 行和 450 列。

我尝试了两种方法,都产生了缓慢的结果:

版本 1:

(for (c <- df.columns) yield df.where(s"$c > 0").count).sum

版本 2:

df.columns.map(c => df.filter(df(c) > 0).count)

此计算需要 80 秒的挂钟时间。使用 Python Pandas,只需几分之一秒。我知道对于小型数据集和本地操作,Python may perform better,但这似乎很极端。

尝试进行 Spark 到 Spark 的比较,我发现在相同的数据(转换为 RowMatrix)上运行 MLlib 的 PCA 算法只需不到 2 秒!

我应该使用更有效的实现吗?

如果不是,那看似复杂得多的 PCA 计算怎么会这么快?

【问题讨论】:

Why is Apache-Spark - Python so slow locally as compared to pandas? 你用你的数据试过给定的答案了吗? 【参考方案1】:

做什么

import org.apache.spark.sql.functions.col, count, when

df.select(df.columns map (c => count(when(col(c) > 0, 1)) as c): _*)

为什么

您的两次尝试都创建了与列数成比例的作业数。单独计算执行计划和调度作业的成本很高,并且会根据数据量增加大量开销。

此外,数据可能会在每次执行作业时从磁盘加载和/或解析,除非数据被完全缓存并具有显着的内存安全余量,以确保缓存的数据不会被驱逐。

这意味着在最坏的情况下,您使用的类似嵌套循环的结构在列数方面可能大致呈二次方。

上面显示的代码同时处理所有列,只需要一次数据扫描。

【讨论】:

虽然我选择了拉斐尔的答案,但这个解决方案也有效。两种方法都需要大约 2.5 秒挂钟【参考方案2】:

您的方法的问题是扫描文件的每一列(除非您已将其缓存在内存中)。使用单个 FileScan 的最快方法应该是:

import org.apache.spark.sql.functions.explode,array

val cnt: Long = df
  .select(
    explode(
      array(df.columns.head,df.columns.tail:_*)
    ).as("cell")
  )
.where($"cell">0).count

我仍然认为它会比使用 Pandas 慢,因为 Spark 由于并行化引擎而具有一定的开销

【讨论】:

以上是关于在 Spark DataFrame 中计算大于 0 的值的更快方法?的主要内容,如果未能解决你的问题,请参考以下文章

计算Spark DataFrame中的非空值的数量

Spark Dataframe Join shuffle

Spark——RDD和DataFrame和DataSet三者间的区别

计算spark Dataframe中分组数据的分位数

如何计算Apache Spark DataFrame中所有列的不同值的数量[重复]

RDD和DataFrame和DataSet三者间的区别