Spark withColumn 和 where 执行顺序

Posted

技术标签:

【中文标题】Spark withColumn 和 where 执行顺序【英文标题】:Spark withColumn and where execution order 【发布时间】:2020-01-16 11:42:21 【问题描述】:

我有一个 Spark 查询,它从 S3 读取大量镶木地板数据,对其进行过滤,并添加一个计算为 regexp_extract(input_file_name, ...) 的列,我认为这是一个相对繁重的操作(如果在过滤之前而不是之后应用)。

整个查询如下所示:

val df = spark
    .read
    .option("mergeSchema", "true")
    .parquet("s3://bucket/path/date=2020-01-15,6/clientType=EXTENSION_CHROME/type=ACCEPT,IGNORE*/")
    .where(...)
    .withColumn("type", regexp_extract(input_file_name, "type=([^/]+)", 1))
    .repartition(300)
    .cache()

df.count()

withColumn 是在 where 之后还是在 where 之前执行的?这取决于我写它们的顺序吗?如果我的 where 语句使用了 withColumn 添加的列怎么办?

【问题讨论】:

【参考方案1】:

withColumnfilter 按照调用顺序执行。该计划对此进行了解释。请自下而上阅读计划。

val employees = spark.createDataFrame(Seq(("E1",100.0), ("E2",200.0),("E3",300.0))).toDF("employee","salary")

employees.withColumn("column1", when(col("salary") > 200, lit("rich")).otherwise("poor")).filter(col("column1")==="poor").explain(true)

计划 - 项目首先发生,然后过滤。

== Parsed Logical Plan ==
'Filter ('column1 = poor)
+- Project [employee#4, salary#5, CASE WHEN (salary#5 > cast(200 as double)) THEN rich ELSE poor END AS column1#8]
   +- Project [_1#0 AS employee#4, _2#1 AS salary#5]
      +- LocalRelation [_1#0, _2#1]

== Analyzed Logical Plan ==
employee: string, salary: double, column1: string
Filter (column1#8 = poor)
+- Project [employee#4, salary#5, CASE WHEN (salary#5 > cast(200 as double)) THEN rich ELSE poor END AS column1#8]
   +- Project [_1#0 AS employee#4, _2#1 AS salary#5]
      +- LocalRelation [_1#0, _2#1]

代码 1st 过滤器然后添加新列

employees.filter(col("employee")==="E1").withColumn("column1", when(col("salary") > 200, lit("rich")).otherwise("poor")).explain(true)

计划 - 第一个过滤器然后是项目

== Parsed Logical Plan ==
'Project [employee#4, salary#5, CASE WHEN ('salary > 200) THEN rich ELSE poor END AS column1#13]
+- Filter (employee#4 = E1)
   +- Project [_1#0 AS employee#4, _2#1 AS salary#5]
      +- LocalRelation [_1#0, _2#1]

== Analyzed Logical Plan ==
employee: string, salary: double, column1: string
Project [employee#4, salary#5, CASE WHEN (salary#5 > cast(200 as double)) THEN rich ELSE poor END AS column1#13]
+- Filter (employee#4 = E1)
   +- Project [_1#0 AS employee#4, _2#1 AS salary#5]
      +- LocalRelation [_1#0, _2#1]

另一个证据 - 在添加之前在列上调用过滤器时会出错(显然)

employees.filter(col("column1")==="poor").withColumn("column1", when(col("salary") > 200, lit("rich")).otherwise("poor")).show()
org.apache.spark.sql.AnalysisException: cannot resolve '`column1`' given input columns: [employee, salary];;
'Filter ('column1 = poor)
+- Project [_1#0 AS employee#4, _2#1 AS salary#5]
   +- LocalRelation [_1#0, _2#1]

【讨论】:

以上是关于Spark withColumn 和 where 执行顺序的主要内容,如果未能解决你的问题,请参考以下文章

Spark 中的 Join 和 withColumn 异常

Spark是否会通过数据传递多个withColumn?

在 spark scala 中为 withcolumn 编写通用函数

spark dataFrame withColumn

在 Spark Scala 中使用“withColumn”函数的替代方法

Spark SQL UDF 使用 df.WithColumn() 返回 scala 不可变映射