具有大量列的数据帧上的 Spark 窗口函数

Posted

技术标签:

【中文标题】具有大量列的数据帧上的 Spark 窗口函数【英文标题】:Spark window function on dataframe with large number of columns 【发布时间】:2018-02-19 16:43:42 【问题描述】:

我有一个从 csv 文件中读取的 ML 数据框。它包含三种类型的列:

ID 时间戳 Feature1 Feature2...Feature_n

其中 n 约为 500(用 ML 术语来说是 500 个特征)。数据集中的总行数约为 1.6 亿。

由于这是之前完全连接的结果,因此有许多特征没有设置值。

我的目标是运行一个“填充”函数(fillna 样式的表单 python pandas),其中每个空的特征值都被设置为该列先前可用的值,每个 Id 和 Date。

我正在尝试使用以下 spark 2.2.1 代码来实现这一点:

 val rawDataset = sparkSession.read.option("header", "true").csv(inputLocation)

 val window = Window.partitionBy("ID").orderBy("DATE").rowsBetween(-50000, -1)

 val columns = Array(...) //first 30 columns initially, just to see it working

val rawDataSetFilled = columns.foldLeft(rawDataset)  (originalDF, columnToFill) =>
      originalDF.withColumn(columnToFill, coalesce(col(columnToFill), last(col(columnToFill), ignoreNulls = true).over(window)))
    

我正在使用 spark 2.2.1 在 Amazon EMR 上的 4 个 m4.large 实例上运行此作业。并启用动态分配。

作业运行超过 2 小时未完成。

我在代码级别做错了吗?鉴于数据的大小和实例,我认为它应该在合理的时间内完成?而且我什至没有尝试过完整的 500 列,只有大约 30 列!

查看容器日志,我看到的都是很多这样的日志:

INFO codegen.CodeGenerator:在 166.677493 毫秒内生成的代码

INFO 执行。ExternalAppendOnlyUnsafeRowArray:达到溢出 阈值 4096 行,切换到 org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter

我尝试将参数 spark.sql.windowExec.buffer.spill.threshold 设置为更大的值,没有任何影响。还有其他我应该知道的设置吗?这 2 行是我在任何容器日志中看到的唯一行。

在 Ganglia 中,我看到大多数 CPU 内核在完全使用时达到峰值,但内存使用量低于可用的最大值。所有执行者都已分配并正在工作。

【问题讨论】:

你看执行计划了吗?如果您的数据框未按 ID 重新分区并按 Id 和 DATE 在分区中排序,则将在 foldLeft 之前进行洗牌和排序。这可能是原因吗?另外,限制真的需要-50000吗?也许您应该先尝试一些较小的值,例如 -10。 【参考方案1】:

我已经设法在不使用 withColumn 调用的情况下重写了 向左折叠 逻辑。显然,对于大量列,它们可能非常慢,因此我也遇到了 *** 错误。

我很想知道为什么会有如此巨大的差异 - 以及查询计划执行幕后究竟发生了什么,这使得重复 withColumns 调用如此缓慢。

证明非常有用的链接:Spark Jira issue 和 this *** question

    var rawDataset = sparkSession.read.option("header", "true").csv(inputLocation)    
    val window = Window.partitionBy("ID").orderBy("DATE").rowsBetween(Window.unboundedPreceding, Window.currentRow)
    rawDataset = rawDataset.select(rawDataset.columns.map(column => coalesce(col(column), last(col(column), ignoreNulls = true).over(window)).alias(column)): _*)
    rawDataset.write.option("header", "true").csv(outputLocation)

【讨论】:

不错的解决方案。现在有多快? 您可以看到一篇关于此here 的非常好的博客文章,您还可以获得一个很好的基准。

以上是关于具有大量列的数据帧上的 Spark 窗口函数的主要内容,如果未能解决你的问题,请参考以下文章

数据帧上的 spark GROUPED_MAP udf 是不是并行运行?

没有 orderBy 的 Spark 窗口函数

具有复杂条件的 Spark SQL 窗口函数

具有复杂条件的 Spark SQL 窗口函数

如何仅对 Spark 数据帧上的特定字段使用“立方体”?

带有包含地图的数组的数据帧上的 Spark 过滤器