PySpark - 将另一列的值作为 spark 函数的参数传递
Posted
技术标签:
【中文标题】PySpark - 将另一列的值作为 spark 函数的参数传递【英文标题】:PySpark - pass a value from another column as the parameter of spark function 【发布时间】:2020-06-19 21:34:34 【问题描述】:我有一个 spark 数据框,看起来像这样,其中 expr 是 SQL/Hive 过滤器表达式。
+-----------------------------------------+
|expr |var1 |var2 |
+-------------------------+---------+-----+
|var1 > 7 |9 |0 |
|var1 > 7 |9 |0 |
|var1 > 7 |9 |0 |
|var1 > 7 |9 |0 |
|var1 = 3 AND var2 >= 0 |9 |0 |
|var1 = 3 AND var2 >= 0 |9 |0 |
|var1 = 3 AND var2 >= 0 |9 |0 |
|var1 = 3 AND var2 >= 0 |9 |0 |
|var1 = 2 AND var2 >= 0 |9 |0 |
+-------------------------+---------+-----+
我想将此数据帧转换为下面的数据帧,其中 flag 是在评估“expr”列中的表达式后找到的布尔值
+---------------------------------------------------+
|expr |var1 |var2 |flag |
+-------------------------+---------+-----+---------+
|var1 > 7 |9 |0 | True |
|var1 > 7 |9 |0 | True |
|var1 > 7 |9 |0 | True |
|var1 > 7 |9 |0 | True |
|var1 = 3 AND var2 >= 0 |9 |0 | . |
|var1 = 3 AND var2 >= 0 |9 |0 | . |
|var1 = 3 AND var2 >= 0 |9 |0 | . |
|var1 = 3 AND var2 >= 0 |9 |0 | . |
|var1 = 2 AND var2 >= 0 |9 |0 | . |
+-------------------------+---------+-----+---------+
我尝试过这样使用 expr 函数:
df.withColumn('flag', expr(col('expr')))
它会按预期失败,因为 expr 函数需要一个字符串作为参数。
我想到的另一个想法是制作一个 UDF 并将“expr”列的值传递给它,但这不允许我使用 pyspark 的 expr 函数,因为 UDF 都是非火花代码。
我的方法应该是什么?请问有什么建议吗?
【问题讨论】:
简短的回答你可以做到这一点,但使用窗口功能。您能否解释您要解决的更大问题或上传更好的示例数据,即一个包含一列到 groupby 的示例数据 如何使用窗口函数 @Dee 做到这一点。我认为UDF是唯一的解决方案。 Mappartition 将是另一种解决方案,可能会更快 我需要 OP 来解释真实数据是什么样的,或者这正在解决什么更大的问题 @Manish 现在查看我的答案 @Dee 假设我有两个数据帧,一个包含所有真实数据,另一个包含所有过滤规则(如上所述)以应用于真实数据。对于第一个数据帧中的每一行,只有一个过滤规则为真,根据该规则将为该行派生更多值。为此,我正在做的是在 2 个数据帧之间进行交叉连接(这是我在上面示例中显示的数据帧),然后尝试评估每一行的每个规则。让我知道解释是否清楚? 【参考方案1】:所以这是一个没有 UDF 的 PySpark 解决方案。在 Scala 中,我相信您可以使用具有相同逻辑的 map 或 foldleft。
exprs = df.select('expr').distinct().collect()[0][0]
for ex in exprs:
df = df.withColumn('test', when(col('expr') == lit(ex), expr(ex)))
df.show()
+--------------------+----+----+----+
| expr|var1|var2|test|
+--------------------+----+----+----+
| var1 > 7| 9| 0|true|
| var1 > 7| 9| 0|true|
| var1 > 7| 9| 0|true|
| var1 > 7| 9| 0|true|
|var1 = 3 AND var2...| 9| 0|null|
|var1 = 3 AND var2...| 9| 0|null|
|var1 = 3 AND var2...| 9| 0|null|
|var1 = 3 AND var2...| 9| 0|null|
|var1 = 2 AND var2...| 9| 0|null|
+--------------------+----+----+----+
我应该指出,我不明白为什么 OP 想要这样做,如果他们为问题提供更好的上下文,我确信有更好的方法。
对 DF 进行迭代并不是最有效的做法,但在这种情况下,它实际上会运行得非常快,因为它不会对数据进行迭代,因此 Spark 实际上会在一个计划中执行它。此外,单个 collect() 仅在 20+ 百万 DF 上增加 2 秒的执行时间。
更新:
我现在更好地理解了这个问题,这会更快,因为 Spark 会在将它们合并到一列之前一次计算所有过滤器。
# Tip: perform the collect statement on the smaller DF that contains the filter expressions
exprs = df.select('expr').distinct().collect()[0][0]
df = df.withColumn('filter',
coalesce(*[when(col('expr') == lit(ex), expr(ex)) for ex in exprs])
)
df.show()
+--------------------+----+----+------+
| expr|var1|var2|filter|
+--------------------+----+----+------+
| var1 > 7| 9| 0|true |
| var1 > 7| 9| 0|true |
| var1 > 7| 9| 0|true |
| var1 > 7| 9| 0|true |
|var1 = 3 AND var2...| 9| 0|null |
|var1 = 3 AND var2...| 9| 0|null |
|var1 = 3 AND var2...| 9| 0|null |
|var1 = 3 AND var2...| 9| 0|null |
|var1 = 2 AND var2...| 9| 0|null |
+--------------------+----+----+------+
【讨论】:
【参考方案2】:不是udf
val exprs5 = sourceDF.select('expr).distinct().as[String].collect()
val d1 = exprs5.map(i =>
val df = sourceDF.filter('expr.equalTo(i))
df.withColumn("flag", expr(i))
)
val d2 = d1.reduce(_ union _)
udf
package spark
import org.apache.spark.sql.DataFrame, SparkSession
object Filter extends App
val spark = SparkSession.builder()
.master("local")
.appName("DataFrame-example")
.getOrCreate()
import spark.implicits._
val sourceDF = Seq(("var1 > 7", 9, 0),
("var1 > 7", 9, 0),
("var1 > 7", 9, 0),
("var1 > 7", 9, 0),
("var1 = 3 AND var2 >= 0", 9, 0),
("var1 = 3 AND var2 >= 0", 9, 0),
("var1 = 3 AND var2 >= 0", 9, 0),
("var1 = 3 AND var2 >= 0", 9, 0),
("var1 = 2 AND var2 >= 0", 9, 0)).toDF("expr", "var1","var2")
import org.apache.spark.sql.functions._
val fCheck = udf((expr: String, val1: Int, val2: Int) =>
expr.split(" ") match
case Array(vr, z, vl) if (vr == "var1" && z == ">") => Some(val1 > vl.toInt)
case Array(vr1, z1, vl1, logic1, vr2, z2, vl2)
if (vr1 == "var1") && (z1 == "=") && (logic1 == "AND") && (vr2 == "var2") && (z2 == ">=")
=> Some((val1 == vl1.toInt ) && (val2 >= vl2.toInt))
case _ => None
)
val resultDF = sourceDF.withColumn("flag", lit(fCheck('expr, 'var1, 'var2)))
resultDF.show(false)
// +----------------------+----+----+-----+
// |expr |var1|var2|flag |
// +----------------------+----+----+-----+
// |var1 > 7 |9 |0 |true |
// |var1 > 7 |9 |0 |true |
// |var1 > 7 |9 |0 |true |
// |var1 > 7 |9 |0 |true |
// |var1 = 3 AND var2 >= 0|9 |0 |false|
// |var1 = 3 AND var2 >= 0|9 |0 |false|
// |var1 = 3 AND var2 >= 0|9 |0 |false|
// |var1 = 3 AND var2 >= 0|9 |0 |false|
// |var1 = 2 AND var2 >= 0|9 |0 |false|
// +----------------------+----+----+-----+
【讨论】:
以上是关于PySpark - 将另一列的值作为 spark 函数的参数传递的主要内容,如果未能解决你的问题,请参考以下文章
检查一列中的值是不是存在于另一列中,如果存在,则将另一列中的值复制到新列中