Spark 动态 DAG 比硬编码的 DAG 慢很多
Posted
技术标签:
【中文标题】Spark 动态 DAG 比硬编码的 DAG 慢很多【英文标题】:Spark dynamic DAG is a lot slower and different from hard coded DAG 【发布时间】:2016-12-15 17:19:21 【问题描述】:我在 spark 中有一个操作,应该对数据框中的几列执行。一般来说,有两种可能来指定这样的操作
硬编码handleBias("bar", df)
.join(handleBias("baz", df), df.columns)
.drop(columnsToDrop: _*).show
从列名列表动态生成它们
var isFirst = true
var res = df
for (col <- columnsToDrop ++ columnsToCode)
if (isFirst)
res = handleBias(col, res)
isFirst = false
else
res = handleBias(col, res)
res.drop(columnsToDrop: _*).show
问题在于动态生成的 DAG 是不同的,当使用更多列时,动态解决方案的运行时间会比硬编码操作增加得更多。
我很好奇如何将优雅的动态构造与快速执行时间结合起来。
这是示例代码的 DAG 的比较
对于大约 80 列,这为硬编码变体生成了一个相当不错的图表 对于动态构造的查询,还有一个非常大、可能不太可并行化且速度较慢的 DAG。
当前版本的 spark (2.0.2) 与 DataFrames
和 spark-sql 一起使用
完成最小示例的代码:
def handleBias(col: String, df: DataFrame, target: String = "FOO"): DataFrame =
val pre1_1 = df
.filter(df(target) === 1)
.groupBy(col, target)
.agg((count("*") / df.filter(df(target) === 1).count).alias("pre_" + col))
.drop(target)
val pre2_1 = df
.groupBy(col)
.agg(mean(target).alias("pre2_" + col))
df
.join(pre1_1, Seq(col), "left")
.join(pre2_1, Seq(col), "left")
.na.fill(0)
编辑
使用foldleft
运行任务会生成线性 DAG
并对所有列的函数进行硬编码会导致
两者都比我原来的 DAG 好很多,但硬编码的变体对我来说看起来更好。在 spark 中连接 SQL 语句的字符串可以让我动态生成硬编码的执行图,但这看起来相当难看。您还有其他选择吗?
【问题讨论】:
我认为问题在于您的“handleBias”函数非常复杂,您需要为多个列运行它。即使你对许多列进行硬编码,你的 DAG 也会很大,所以问题可能不是“动态”应用,而是应用于许多列。因此,如果您能想出一种方法来调整您的函数以同时处理多个列,那可能会有很大帮助。 @DanieldePaula 你有什么方法可以用更简单的方式来表达这种方法,从而减少所需的计算能力? 很遗憾,我现在没有太多时间考虑,对不起。如果到明天你还没有找到解决方案,我会看看它。 @DanieldePaula 到目前为止我还想不出一个简化的方法。想知道是否可以改进缓存?目前,我在更大的数据集上调用此函数之前使用缓存。 @DanieldePaula 你认为我可以通过“连接”列来摆脱其中的一些连接吗? ***.com/questions/32882529/… 因为在这里我需要一个 concat 操作(可以并行执行)。 【参考方案1】:编辑 1: 从 handleBias 中移除一个窗口函数并将其转换为广播连接。
编辑 2: 更改了空值的替换策略。
我有一些建议可以改进您的代码。首先,对于“handleBias”函数,我会使用窗口函数和“withColumn”调用,避免连接:
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
def handleBias(df: DataFrame, colName: String, target: String = "foo") =
val w1 = Window.partitionBy(colName)
val w2 = Window.partitionBy(colName, target)
val result = df
.withColumn("cnt_group", count("*").over(w2))
.withColumn("pre2_" + colName, mean(target).over(w1))
.withColumn("pre_" + colName, coalesce(min(col("cnt_group") / col("cnt_foo_eq_1")).over(w1), lit(0D)))
.drop("cnt_group")
result
然后,对于多列调用它,我建议使用foldLeft
,这是解决此类问题的“功能”方法:
val df = Seq((1, "first", "A"), (1, "second", "A"),(2, "noValidFormat", "B"),(1, "lastAssumingSameDate", "C")).toDF("foo", "bar", "baz")
val columnsToDrop = Seq("baz")
val columnsToCode = Seq("bar", "baz")
val target = "foo"
val targetCounts = df.filter(df(target) === 1).groupBy(target)
.agg(count(target).as("cnt_foo_eq_1"))
val newDF = df.join(broadcast(targetCounts), Seq(target), "left")
val result = (columnsToDrop ++ columnsToCode).toSet.foldLeft(df)
(currentDF, colName) => handleBias(currentDF, colName)
result.drop(columnsToDrop:_*).show()
+---+--------------------+------------------+--------+------------------+--------+
|foo| bar| pre_baz|pre2_baz| pre_bar|pre2_bar|
+---+--------------------+------------------+--------+------------------+--------+
| 2| noValidFormat| 0.0| 2.0| 0.0| 2.0|
| 1|lastAssumingSameDate|0.3333333333333333| 1.0|0.3333333333333333| 1.0|
| 1| second|0.6666666666666666| 1.0|0.3333333333333333| 1.0|
| 1| first|0.6666666666666666| 1.0|0.3333333333333333| 1.0|
+---+--------------------+------------------+--------+------------------+--------+
我不确定它是否会大大改善您的 DAG,但至少它使代码更清晰、更具可读性。
参考:
关于窗口函数的 Databricks 文章:https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html 可用函数的 API 文档:http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.functions$ 左折叠:https://coderwall.com/p/4l73-a/scala-fold-foldleft-and-foldright【讨论】:
非常感谢这个很棒的答案。我仍然需要在更大的数据上对其进行测试。请查看编辑/您的代码生成的 2 个不同的 DAG,以进行硬编码和 foldleft 操作。为什么这些“不一样”? @GeorgHeiler 它们是不同的,因为您的硬编码版本使用连接,这通常更糟。线性 DAG 意味着不涉及任何连接,我认为它看起来比另一个更好。尝试使用更多数据后,请告诉我哪个更快.withColumn("pre_" + colName, coalesce(col("cnt_group") / col("cnt_foo_eq_1"), lit(0D)))
不是我想要实现的,而不是替换 0 我想用 class==1 的相应值替换所有空值,以便例如对于 A&foo=1 ,它是 0.5 用于替换所有为空的 A&foo=0。
@GeorgHeiler 而不是lit(0D)
,您可以引用您想要的列,例如coalesce(..., col("foo"))
当然,但据我所知,这个值没有存储在列中,例如将需要我应该阻止的额外加入? coalesce(col("cnt_group") / col("cnt_foo_eq_1")
不能重复,因为它返回 null。以上是关于Spark 动态 DAG 比硬编码的 DAG 慢很多的主要内容,如果未能解决你的问题,请参考以下文章