PySpark - 将另一列的值作为 spark 函数的参数传递

Posted

技术标签:

【中文标题】PySpark - 将另一列的值作为 spark 函数的参数传递【英文标题】:PySpark - pass a value from another column as the parameter of spark function 【发布时间】:2020-06-19 21:34:34 【问题描述】:

我有一个 spark 数据框,看起来像这样,其中 expr 是 SQL/Hive 过滤器表达式。

+-----------------------------------------+
|expr                     |var1     |var2 |
+-------------------------+---------+-----+
|var1 > 7                 |9        |0    |
|var1 > 7                 |9        |0    |
|var1 > 7                 |9        |0    |
|var1 > 7                 |9        |0    |
|var1 = 3 AND var2 >= 0   |9        |0    |
|var1 = 3 AND var2 >= 0   |9        |0    |
|var1 = 3 AND var2 >= 0   |9        |0    |
|var1 = 3 AND var2 >= 0   |9        |0    |
|var1 = 2 AND var2 >= 0   |9        |0    |
+-------------------------+---------+-----+

我想将此数据帧转换为下面的数据帧,其中 flag 是在评估“expr”列中的表达式后找到的布尔值

+---------------------------------------------------+
|expr                     |var1     |var2 |flag     |
+-------------------------+---------+-----+---------+
|var1 > 7                 |9        |0    |  True   |
|var1 > 7                 |9        |0    |  True   |
|var1 > 7                 |9        |0    |  True   |
|var1 > 7                 |9        |0    |  True   |
|var1 = 3 AND var2 >= 0   |9        |0    |     .   |
|var1 = 3 AND var2 >= 0   |9        |0    |     .   |
|var1 = 3 AND var2 >= 0   |9        |0    |     .   |
|var1 = 3 AND var2 >= 0   |9        |0    |     .   |
|var1 = 2 AND var2 >= 0   |9        |0    |     .   |
+-------------------------+---------+-----+---------+

我尝试过这样使用 expr 函数:

df.withColumn('flag', expr(col('expr')))

它会按预期失败,因为 expr 函数需要一个字符串作为参数。

我想到的另一个想法是制作一个 UDF 并将“expr”列的值传递给它,但这不允许我使用 pyspark 的 expr 函数,因为 UDF 都是非火花代码。

我的方法应该是什么?请问有什么建议吗?

【问题讨论】:

简短的回答你可以做到这一点,但使用窗口功能。您能否解释您要解决的更大问题或上传更好的示例数据,即一个包含一列到 groupby 的示例数据 如何使用窗口函数 @Dee 做到这一点。我认为UDF是唯一的解决方案。 Mappartition 将是另一种解决方案,可能会更快 我需要 OP 来解释真实数据是什么样的,或者这正在解决什么更大的问题 @Manish 现在查看我的答案 @Dee 假设我有两个数据帧,一个包含所有真实数据,另一个包含所有过滤规则(如上所述)以应用于真实数据。对于第一个数据帧中的每一行,只有一个过滤规则为真,根据该规则将为该行派生更多值。为此,我正在做的是在 2 个数据帧之间进行交叉连接(这是我在上面示例中显示的数据帧),然后尝试评估每一行的每个规则。让我知道解释是否清楚? 【参考方案1】:

所以这是一个没有 UDF 的 PySpark 解决方案。在 Scala 中,我相信您可以使用具有相同逻辑的 map 或 foldleft。

exprs = df.select('expr').distinct().collect()[0][0]

for ex in exprs:
    df = df.withColumn('test', when(col('expr') == lit(ex), expr(ex)))
    
df.show()
+--------------------+----+----+----+
|                expr|var1|var2|test|
+--------------------+----+----+----+
|            var1 > 7|   9|   0|true|
|            var1 > 7|   9|   0|true|
|            var1 > 7|   9|   0|true|
|            var1 > 7|   9|   0|true|
|var1 = 3 AND var2...|   9|   0|null|
|var1 = 3 AND var2...|   9|   0|null|
|var1 = 3 AND var2...|   9|   0|null|
|var1 = 3 AND var2...|   9|   0|null|
|var1 = 2 AND var2...|   9|   0|null|
+--------------------+----+----+----+

我应该指出,我不明白为什么 OP 想要这样做,如果他们为问题提供更好的上下文,我确信有更好的方法。

对 DF 进行迭代并不是最有效的做法,但在这种情况下,它实际上会运行得非常快,因为它不会对数据进行迭代,因此 Spark 实际上会在一个计划中执行它。此外,单个 collect() 仅在 20+ 百万 DF 上增加 2 秒的执行时间。


更新:

我现在更好地理解了这个问题,这会更快,因为 Spark 会在将它们合并到一列之前一次计算所有过滤器。

# Tip: perform the collect statement on the smaller DF that contains the filter expressions

exprs = df.select('expr').distinct().collect()[0][0]

df = df.withColumn('filter',
              coalesce(*[when(col('expr') == lit(ex), expr(ex)) for ex in exprs])
             )
df.show()
+--------------------+----+----+------+
|                expr|var1|var2|filter|
+--------------------+----+----+------+
|            var1 > 7|   9|   0|true  |
|            var1 > 7|   9|   0|true  |
|            var1 > 7|   9|   0|true  |
|            var1 > 7|   9|   0|true  |
|var1 = 3 AND var2...|   9|   0|null  |
|var1 = 3 AND var2...|   9|   0|null  |
|var1 = 3 AND var2...|   9|   0|null  |
|var1 = 3 AND var2...|   9|   0|null  |
|var1 = 2 AND var2...|   9|   0|null  |
+--------------------+----+----+------+

【讨论】:

【参考方案2】:

不是udf

  val exprs5 =   sourceDF.select('expr).distinct().as[String].collect()
  val d1 = exprs5.map(i => 
    val df = sourceDF.filter('expr.equalTo(i))
    df.withColumn("flag", expr(i))
  )
  val d2 = d1.reduce(_ union _)

udf

package spark

import org.apache.spark.sql.DataFrame, SparkSession

object Filter extends App 

  val spark = SparkSession.builder()
    .master("local")
    .appName("DataFrame-example")
    .getOrCreate()

  import spark.implicits._

  val sourceDF = Seq(("var1 > 7", 9, 0),
  ("var1 > 7", 9, 0),
  ("var1 > 7", 9, 0),
  ("var1 > 7", 9, 0),
  ("var1 = 3 AND var2 >= 0", 9, 0),
  ("var1 = 3 AND var2 >= 0", 9, 0),
  ("var1 = 3 AND var2 >= 0", 9, 0),
  ("var1 = 3 AND var2 >= 0", 9, 0),
  ("var1 = 2 AND var2 >= 0", 9, 0)).toDF("expr", "var1","var2")

  import org.apache.spark.sql.functions._

  val fCheck = udf((expr: String, val1: Int, val2: Int) => 
    expr.split(" ") match
      case Array(vr, z, vl) if (vr == "var1" && z == ">") => Some(val1 > vl.toInt)
      case Array(vr1, z1,  vl1, logic1, vr2, z2, vl2)
        if (vr1 == "var1") && (z1 == "=") && (logic1 == "AND") && (vr2 == "var2") && (z2 == ">=")
      => Some((val1 == vl1.toInt ) && (val2 >= vl2.toInt))
      case _ => None
    
  )

  val resultDF = sourceDF.withColumn("flag", lit(fCheck('expr, 'var1, 'var2)))

  resultDF.show(false)
//  +----------------------+----+----+-----+
//  |expr                  |var1|var2|flag |
//  +----------------------+----+----+-----+
//  |var1 > 7              |9   |0   |true |
//  |var1 > 7              |9   |0   |true |
//  |var1 > 7              |9   |0   |true |
//  |var1 > 7              |9   |0   |true |
//  |var1 = 3 AND var2 >= 0|9   |0   |false|
//  |var1 = 3 AND var2 >= 0|9   |0   |false|
//  |var1 = 3 AND var2 >= 0|9   |0   |false|
//  |var1 = 3 AND var2 >= 0|9   |0   |false|
//  |var1 = 2 AND var2 >= 0|9   |0   |false|
//  +----------------------+----+----+-----+


【讨论】:

以上是关于PySpark - 将另一列的值作为 spark 函数的参数传递的主要内容,如果未能解决你的问题,请参考以下文章

Pyspark:如何根据另一列的值填充空值

检查一列中的值是不是存在于另一列中,如果存在,则将另一列中的值复制到新列中

PySpark:使用一列索引另一列(两列的udf?)

根据另一列的元素从 pyspark 数组中删除元素

Pyspark 通过使用另一列中的值替换 Spark 数据框列中的字符串

如何根据 PySpark 数据框的另一列中的值修改列? F.当边缘情况