Spark DAG 与“withColumn”和“select”不同

Posted

技术标签:

【中文标题】Spark DAG 与“withColumn”和“select”不同【英文标题】:Spark DAG differs with 'withColumn' vs 'select' 【发布时间】:2020-05-04 11:30:22 【问题描述】:

上下文

在最近的SO-post 中,我发现使用withColumn 可以在结合不同的窗口规范处理堆叠/链列表达式时改进 DAG。然而,在这个例子中,withColumn 实际上使 DAG 变得更糟,并且与使用 select 的结果不同。

可重现的例子

首先,一些测试数据(PySpark 2.4.4 独立):

import pandas as pd
import numpy as np

from pyspark.sql import SparkSession, Window
from pyspark.sql import functions as F

spark = SparkSession.builder.getOrCreate()

dfp = pd.DataFrame(
    
        "col1": np.random.randint(0, 5, size=100),
        "col2": np.random.randint(0, 5, size=100),
        "col3": np.random.randint(0, 5, size=100),
        "col4": np.random.randint(0, 5, size=100),      
        "col5": np.random.randint(0, 5, size=100),        

    
)

df = spark.createDataFrame(dfp)
df.show(5)

+----+----+----+----+----+
|col1|col2|col3|col4|col5|
+----+----+----+----+----+
|   0|   3|   2|   2|   2|
|   1|   3|   3|   2|   4|
|   0|   0|   3|   3|   2|
|   3|   0|   1|   4|   4|
|   4|   0|   3|   3|   3|
+----+----+----+----+----+
only showing top 5 rows

这个例子很简单。 In 包含 2 个窗口规范和 4 个基于它们的独立列表达式:

w1 = Window.partitionBy("col1").orderBy("col2")
w2 = Window.partitionBy("col3").orderBy("col4")

col_w1_1 = F.max("col5").over(w1).alias("col_w1_1")
col_w1_2 = F.sum("col5").over(w1).alias("col_w1_2")
col_w2_1 = F.max("col5").over(w2).alias("col_w2_1")
col_w2_2 = F.sum("col5").over(w2).alias("col_w2_2")

expr = [col_w1_1, col_w1_2, col_w2_1, col_w2_2]

withColumn - 4 次随机播放

如果 withColumn 与交替窗口规范一起使用,则 DAG 会创建不必要的随机播放:

df.withColumn("col_w1_1", col_w1_1)\
  .withColumn("col_w2_1", col_w2_1)\
  .withColumn("col_w1_2", col_w1_2)\
  .withColumn("col_w2_2", col_w2_2)\
  .explain()

== Physical Plan ==
Window [sum(col5#92L) windowspecdefinition(col3#90L, col4#91L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS col_w2_2#147L], [col3#90L], [col4#91L ASC NULLS FIRST]
+- *(4) Sort [col3#90L ASC NULLS FIRST, col4#91L ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(col3#90L, 200)
      +- Window [sum(col5#92L) windowspecdefinition(col1#88L, col2#89L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS col_w1_2#143L], [col1#88L], [col2#89L ASC NULLS FIRST]
         +- *(3) Sort [col1#88L ASC NULLS FIRST, col2#89L ASC NULLS FIRST], false, 0
            +- Exchange hashpartitioning(col1#88L, 200)
               +- Window [max(col5#92L) windowspecdefinition(col3#90L, col4#91L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS col_w2_1#145L], [col3#90L], [col4#91L ASC NULLS FIRST]
                  +- *(2) Sort [col3#90L ASC NULLS FIRST, col4#91L ASC NULLS FIRST], false, 0
                     +- Exchange hashpartitioning(col3#90L, 200)
                        +- Window [max(col5#92L) windowspecdefinition(col1#88L, col2#89L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS col_w1_1#141L], [col1#88L], [col2#89L ASC NULLS FIRST]
                           +- *(1) Sort [col1#88L ASC NULLS FIRST, col2#89L ASC NULLS FIRST], false, 0
                              +- Exchange hashpartitioning(col1#88L, 200)
                                 +- Scan ExistingRDD[col1#88L,col2#89L,col3#90L,col4#91L,col5#92L]

选择 - 2 次随机播放

如果所有列都以select 传递,则 DAG 是正确的。

df.select("*", *expr).explain()

== Physical Plan ==
Window [max(col5#92L) windowspecdefinition(col3#90L, col4#91L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS col_w2_1#119L, sum(col5#92L) windowspecdefinition(col3#90L, col4#91L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS col_w2_2#121L], [col3#90L], [col4#91L ASC NULLS FIRST]
+- *(2) Sort [col3#90L ASC NULLS FIRST, col4#91L ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(col3#90L, 200)
      +- Window [max(col5#92L) windowspecdefinition(col1#88L, col2#89L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS col_w1_1#115L, sum(col5#92L) windowspecdefinition(col1#88L, col2#89L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS col_w1_2#117L], [col1#88L], [col2#89L ASC NULLS FIRST]
         +- *(1) Sort [col1#88L ASC NULLS FIRST, col2#89L ASC NULLS FIRST], false, 0
            +- Exchange hashpartitioning(col1#88L, 200)
               +- Scan ExistingRDD[col1#88L,col2#89L,col3#90L,col4#91L,col5#92L]

问题

有一些关于为什么要避免使用withColumn 的现有信息,但是它们主要关注的是多次调用withColumn,并且它们没有解决偏离DAG 的问题(请参阅here 和here )。有谁知道为什么 DAG 在 withColumnselect 之间有所不同? Spark 的优化算法应该适用于任何情况,并且不应该依赖于不同的方式来表达完全相同的东西。

提前致谢。

【问题讨论】:

【参考方案1】:

何时使用嵌套的 withColumns 和窗口函数?

假设我想做:

w1 = ...rangeBetween(-300, 0)
w2 = ...rowsBetween(-1,0)

(df.withColumn("some1", col(f.max("original1").over(w1))
   .withColumn("some2", lag("some1")).over(w2)).show()

即使数据集非常小,我也会遇到很多内存问题和大量溢出。如果我使用 select 而不是 withColumn 做同样的事情,它的执行速度会更快。

df.select(
    f.max(col("original1")).over(w1).alias("some1"),
    f.lag("some1")).over(w2)
).show()

【讨论】:

感谢您的回答!我会对一个最小的可重现示例感兴趣,该示例允许观察您描述的内存问题和高溢出。【参考方案2】:

这看起来像是由withColumn 引起的内部投影的结果。在 Spark 文档中记录了 here

官方建议是按照 Jay 的建议去做,而不是在处理多列时做一个 select

【讨论】:

以上是关于Spark DAG 与“withColumn”和“select”不同的主要内容,如果未能解决你的问题,请参考以下文章

Spark 中的 Join 和 withColumn 异常

Spark withColumn 和 where 执行顺序

Spark是否会通过数据传递多个withColumn?

在 spark scala 中为 withcolumn 编写通用函数

spark dataFrame withColumn

在 Spark Scala 中使用“withColumn”函数的替代方法