Spark 动态 DAG 比硬编码的 DAG 慢很多

Posted

技术标签:

【中文标题】Spark 动态 DAG 比硬编码的 DAG 慢很多【英文标题】:Spark dynamic DAG is a lot slower and different from hard coded DAG 【发布时间】:2016-12-15 17:19:21 【问题描述】:

我在 spark 中有一个操作,应该对数据框中的几列执行。一般来说,有两种可能来指定这样的操作

硬编码
handleBias("bar", df)
  .join(handleBias("baz", df), df.columns)
  .drop(columnsToDrop: _*).show
从列名列表动态生成它们
var isFirst = true
var res = df
for (col <- columnsToDrop ++ columnsToCode) 
  if (isFirst) 
    res = handleBias(col, res)
    isFirst = false
   else 
    res = handleBias(col, res)
  

res.drop(columnsToDrop: _*).show

问题在于动态生成的 DAG 是不同的,当使用更多列时,动态解决方案的运行时间会比硬编码操作增加得更多。

我很好奇如何将优雅的动态构造与快速执行时间结合起来

这是示例代码的 DAG 的比较

对于大约 80 列,这为硬编码变体生成了一个相当不错的图表 对于动态构造的查询,还有一个非常大、可能不太可并行化且速度较慢的 DAG。

当前版本的 spark (2.0.2) 与 DataFrames 和 spark-sql 一起使用

完成最小示例的代码:

def handleBias(col: String, df: DataFrame, target: String = "FOO"): DataFrame = 
  val pre1_1 = df
    .filter(df(target) === 1)
    .groupBy(col, target)
    .agg((count("*") / df.filter(df(target) === 1).count).alias("pre_" + col))
    .drop(target)

  val pre2_1 = df
    .groupBy(col)
    .agg(mean(target).alias("pre2_" + col))

  df
    .join(pre1_1, Seq(col), "left")
    .join(pre2_1, Seq(col), "left")
    .na.fill(0)

编辑

使用foldleft 运行任务会生成线性 DAG 并对所有列的函数进行硬编码会导致

两者都比我原来的 DAG 好很多,但硬编码的变体对我来说看起来更好。在 spark 中连接 SQL 语句的字符串可以让我动态生成硬编码的执行图,但这看起来相当难看。您还有其他选择吗?

【问题讨论】:

我认为问题在于您的“handleBias”函数非常复杂,您需要为多个列运行它。即使你对许多列进行硬编码,你的 DAG 也会很大,所以问题可能不是“动态”应用,而是应用于许多列。因此,如果您能想出一种方法来调整您的函数以同时处理多个列,那可能会有很大帮助。 @DanieldePaula 你有什么方法可以用更简单的方式来表达这种方法,从而减少所需的计算能力? 很遗憾,我现在没有太多时间考虑,对不起。如果到明天你还没有找到解决方案,我会看看它。 @DanieldePaula 到目前为止我还想不出一个简化的方法。想知道是否可以改进缓存?目前,我在更大的数据集上调用此函数之前使用缓存。 @DanieldePaula 你认为我可以通过“连接”列来摆脱其中的一些连接吗? ***.com/questions/32882529/… 因为在这里我需要一个 concat 操作(可以并行执行)。 【参考方案1】:

编辑 1: 从 handleBias 中移除一个窗口函数并将其转换为广播连接。

编辑 2: 更改了空值的替换策略。

我有一些建议可以改进您的代码。首先,对于“handleBias”函数,我会使用窗口函数和“withColumn”调用,避免连接:

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window

def handleBias(df: DataFrame, colName: String, target: String = "foo") = 
  val w1 = Window.partitionBy(colName)
  val w2 = Window.partitionBy(colName, target)
  val result = df
    .withColumn("cnt_group", count("*").over(w2))
    .withColumn("pre2_" + colName, mean(target).over(w1))
    .withColumn("pre_" + colName, coalesce(min(col("cnt_group") / col("cnt_foo_eq_1")).over(w1), lit(0D)))
    .drop("cnt_group")
  result

然后,对于多列调用它,我建议使用foldLeft,这是解决此类问题的“功能”方法:

val df = Seq((1, "first", "A"), (1, "second", "A"),(2, "noValidFormat", "B"),(1, "lastAssumingSameDate", "C")).toDF("foo", "bar", "baz")

val columnsToDrop = Seq("baz")
val columnsToCode = Seq("bar", "baz")
val target = "foo"

val targetCounts = df.filter(df(target) === 1).groupBy(target)
  .agg(count(target).as("cnt_foo_eq_1"))
val newDF = df.join(broadcast(targetCounts), Seq(target), "left")

val result = (columnsToDrop ++ columnsToCode).toSet.foldLeft(df) 
  (currentDF, colName) => handleBias(currentDF, colName)


result.drop(columnsToDrop:_*).show()

+---+--------------------+------------------+--------+------------------+--------+
|foo|                 bar|           pre_baz|pre2_baz|           pre_bar|pre2_bar|
+---+--------------------+------------------+--------+------------------+--------+
|  2|       noValidFormat|               0.0|     2.0|               0.0|     2.0|
|  1|lastAssumingSameDate|0.3333333333333333|     1.0|0.3333333333333333|     1.0|
|  1|              second|0.6666666666666666|     1.0|0.3333333333333333|     1.0|
|  1|               first|0.6666666666666666|     1.0|0.3333333333333333|     1.0|
+---+--------------------+------------------+--------+------------------+--------+

我不确定它是否会大大改善您的 DAG,但至少它使代码更清晰、更具可读性。

参考:

关于窗口函数的 Databricks 文章:https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html 可用函数的 API 文档:http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.functions$ 左折叠:https://coderwall.com/p/4l73-a/scala-fold-foldleft-and-foldright

【讨论】:

非常感谢这个很棒的答案。我仍然需要在更大的数据上对其进行测试。请查看编辑/您的代码生成的 2 个不同的 DAG,以进行硬编码和 foldleft 操作。为什么这些“不一样”? @GeorgHeiler 它们是不同的,因为您的硬编码版本使用连接,这通常更糟。线性 DAG 意味着不涉及任何连接,我认为它看起来比另一个更好。尝试使用更多数据后,请告诉我哪个更快 .withColumn("pre_" + colName, coalesce(col("cnt_group") / col("cnt_foo_eq_1"), lit(0D))) 不是我想要实现的,而不是替换 0 我想用 class==1 的相应值替换所有空值,以便例如对于 A&foo=1 ,它是 0.5 用于替换所有为空的 A&foo=0。 @GeorgHeiler 而不是lit(0D),您可以引用您想要的列,例如coalesce(..., col("foo")) 当然,但据我所知,这个值没有存储在列中,例如将需要我应该阻止的额外加入? coalesce(col("cnt_group") / col("cnt_foo_eq_1") 不能重复,因为它返回 null。

以上是关于Spark 动态 DAG 比硬编码的 DAG 慢很多的主要内容,如果未能解决你的问题,请参考以下文章

spark中的RDD以及DAG

spark-DAG,宽窄依赖,Stage,Shuffle

在 Apache Spark 中创建 DAG

Spark DAG 依赖关系 Stage

DAG 如何让 Apache Spark 容错?

spark是不是优化了pyspark中相同但独立的DAG?