在 Spark DataFrame 中计算大于 0 的值的更快方法?
Posted
技术标签:
【中文标题】在 Spark DataFrame 中计算大于 0 的值的更快方法?【英文标题】:Faster way to count values greater than 0 in Spark DataFrame? 【发布时间】:2018-07-13 21:46:06 【问题描述】:我有一个 Spark DataFrame
,其中所有字段都是整数类型。我需要计算有多少单个单元格大于 0。
我在本地运行,有一个 DataFrame
,有 17,000 行和 450 列。
我尝试了两种方法,都产生了缓慢的结果:
版本 1:
(for (c <- df.columns) yield df.where(s"$c > 0").count).sum
版本 2:
df.columns.map(c => df.filter(df(c) > 0).count)
此计算需要 80 秒的挂钟时间。使用 Python Pandas,只需几分之一秒。我知道对于小型数据集和本地操作,Python may perform better,但这似乎很极端。
尝试进行 Spark 到 Spark 的比较,我发现在相同的数据(转换为 RowMatrix)上运行 MLlib 的 PCA 算法只需不到 2 秒!
我应该使用更有效的实现吗?
如果不是,那看似复杂得多的 PCA 计算怎么会这么快?
【问题讨论】:
Why is Apache-Spark - Python so slow locally as compared to pandas? 你用你的数据试过给定的答案了吗? 【参考方案1】:做什么
import org.apache.spark.sql.functions.col, count, when
df.select(df.columns map (c => count(when(col(c) > 0, 1)) as c): _*)
为什么
您的两次尝试都创建了与列数成比例的作业数。单独计算执行计划和调度作业的成本很高,并且会根据数据量增加大量开销。
此外,数据可能会在每次执行作业时从磁盘加载和/或解析,除非数据被完全缓存并具有显着的内存安全余量,以确保缓存的数据不会被驱逐。
这意味着在最坏的情况下,您使用的类似嵌套循环的结构在列数方面可能大致呈二次方。
上面显示的代码同时处理所有列,只需要一次数据扫描。
【讨论】:
虽然我选择了拉斐尔的答案,但这个解决方案也有效。两种方法都需要大约 2.5 秒挂钟【参考方案2】:您的方法的问题是扫描文件的每一列(除非您已将其缓存在内存中)。使用单个 FileScan 的最快方法应该是:
import org.apache.spark.sql.functions.explode,array
val cnt: Long = df
.select(
explode(
array(df.columns.head,df.columns.tail:_*)
).as("cell")
)
.where($"cell">0).count
我仍然认为它会比使用 Pandas 慢,因为 Spark 由于并行化引擎而具有一定的开销
【讨论】:
以上是关于在 Spark DataFrame 中计算大于 0 的值的更快方法?的主要内容,如果未能解决你的问题,请参考以下文章
Spark——RDD和DataFrame和DataSet三者间的区别