Spark DAG 与“withColumn”和“select”不同
Posted
技术标签:
【中文标题】Spark DAG 与“withColumn”和“select”不同【英文标题】:Spark DAG differs with 'withColumn' vs 'select' 【发布时间】:2020-05-04 11:30:22 【问题描述】:上下文
在最近的SO-post 中,我发现使用withColumn
可以在结合不同的窗口规范处理堆叠/链列表达式时改进 DAG。然而,在这个例子中,withColumn
实际上使 DAG 变得更糟,并且与使用 select
的结果不同。
可重现的例子
首先,一些测试数据(PySpark 2.4.4 独立):
import pandas as pd
import numpy as np
from pyspark.sql import SparkSession, Window
from pyspark.sql import functions as F
spark = SparkSession.builder.getOrCreate()
dfp = pd.DataFrame(
"col1": np.random.randint(0, 5, size=100),
"col2": np.random.randint(0, 5, size=100),
"col3": np.random.randint(0, 5, size=100),
"col4": np.random.randint(0, 5, size=100),
"col5": np.random.randint(0, 5, size=100),
)
df = spark.createDataFrame(dfp)
df.show(5)
+----+----+----+----+----+
|col1|col2|col3|col4|col5|
+----+----+----+----+----+
| 0| 3| 2| 2| 2|
| 1| 3| 3| 2| 4|
| 0| 0| 3| 3| 2|
| 3| 0| 1| 4| 4|
| 4| 0| 3| 3| 3|
+----+----+----+----+----+
only showing top 5 rows
这个例子很简单。 In 包含 2 个窗口规范和 4 个基于它们的独立列表达式:
w1 = Window.partitionBy("col1").orderBy("col2")
w2 = Window.partitionBy("col3").orderBy("col4")
col_w1_1 = F.max("col5").over(w1).alias("col_w1_1")
col_w1_2 = F.sum("col5").over(w1).alias("col_w1_2")
col_w2_1 = F.max("col5").over(w2).alias("col_w2_1")
col_w2_2 = F.sum("col5").over(w2).alias("col_w2_2")
expr = [col_w1_1, col_w1_2, col_w2_1, col_w2_2]
withColumn - 4 次随机播放
如果 withColumn
与交替窗口规范一起使用,则 DAG 会创建不必要的随机播放:
df.withColumn("col_w1_1", col_w1_1)\
.withColumn("col_w2_1", col_w2_1)\
.withColumn("col_w1_2", col_w1_2)\
.withColumn("col_w2_2", col_w2_2)\
.explain()
== Physical Plan ==
Window [sum(col5#92L) windowspecdefinition(col3#90L, col4#91L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS col_w2_2#147L], [col3#90L], [col4#91L ASC NULLS FIRST]
+- *(4) Sort [col3#90L ASC NULLS FIRST, col4#91L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(col3#90L, 200)
+- Window [sum(col5#92L) windowspecdefinition(col1#88L, col2#89L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS col_w1_2#143L], [col1#88L], [col2#89L ASC NULLS FIRST]
+- *(3) Sort [col1#88L ASC NULLS FIRST, col2#89L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(col1#88L, 200)
+- Window [max(col5#92L) windowspecdefinition(col3#90L, col4#91L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS col_w2_1#145L], [col3#90L], [col4#91L ASC NULLS FIRST]
+- *(2) Sort [col3#90L ASC NULLS FIRST, col4#91L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(col3#90L, 200)
+- Window [max(col5#92L) windowspecdefinition(col1#88L, col2#89L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS col_w1_1#141L], [col1#88L], [col2#89L ASC NULLS FIRST]
+- *(1) Sort [col1#88L ASC NULLS FIRST, col2#89L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(col1#88L, 200)
+- Scan ExistingRDD[col1#88L,col2#89L,col3#90L,col4#91L,col5#92L]
选择 - 2 次随机播放
如果所有列都以select
传递,则 DAG 是正确的。
df.select("*", *expr).explain()
== Physical Plan ==
Window [max(col5#92L) windowspecdefinition(col3#90L, col4#91L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS col_w2_1#119L, sum(col5#92L) windowspecdefinition(col3#90L, col4#91L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS col_w2_2#121L], [col3#90L], [col4#91L ASC NULLS FIRST]
+- *(2) Sort [col3#90L ASC NULLS FIRST, col4#91L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(col3#90L, 200)
+- Window [max(col5#92L) windowspecdefinition(col1#88L, col2#89L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS col_w1_1#115L, sum(col5#92L) windowspecdefinition(col1#88L, col2#89L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS col_w1_2#117L], [col1#88L], [col2#89L ASC NULLS FIRST]
+- *(1) Sort [col1#88L ASC NULLS FIRST, col2#89L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(col1#88L, 200)
+- Scan ExistingRDD[col1#88L,col2#89L,col3#90L,col4#91L,col5#92L]
问题
有一些关于为什么要避免使用withColumn
的现有信息,但是它们主要关注的是多次调用withColumn
,并且它们没有解决偏离DAG 的问题(请参阅here 和here )。有谁知道为什么 DAG 在 withColumn
和 select
之间有所不同? Spark 的优化算法应该适用于任何情况,并且不应该依赖于不同的方式来表达完全相同的东西。
提前致谢。
【问题讨论】:
【参考方案1】:何时使用嵌套的 withColumns 和窗口函数?
假设我想做:
w1 = ...rangeBetween(-300, 0)
w2 = ...rowsBetween(-1,0)
(df.withColumn("some1", col(f.max("original1").over(w1))
.withColumn("some2", lag("some1")).over(w2)).show()
即使数据集非常小,我也会遇到很多内存问题和大量溢出。如果我使用 select 而不是 withColumn 做同样的事情,它的执行速度会更快。
df.select(
f.max(col("original1")).over(w1).alias("some1"),
f.lag("some1")).over(w2)
).show()
【讨论】:
感谢您的回答!我会对一个最小的可重现示例感兴趣,该示例允许观察您描述的内存问题和高溢出。【参考方案2】:这看起来像是由withColumn
引起的内部投影的结果。在 Spark 文档中记录了 here
官方建议是按照 Jay 的建议去做,而不是在处理多列时做一个 select
【讨论】:
以上是关于Spark DAG 与“withColumn”和“select”不同的主要内容,如果未能解决你的问题,请参考以下文章