pyspark 在 udf 中使用数据框

Posted

技术标签:

【中文标题】pyspark 在 udf 中使用数据框【英文标题】:pyspark use dataframe inside udf 【发布时间】:2018-05-01 20:18:20 【问题描述】:

我有两个数据框df1

+---+---+----------+
|  n|val| distances|
+---+---+----------+
|  1|  1|0.27308652|
|  2|  1|0.24969208|
|  3|  1|0.21314497|
+---+---+----------+

df2

+---+---+----------+
| x1| x2|         w|
+---+---+----------+
|  1|  2|0.03103427|
|  1|  4|0.19012526|
|  1| 10|0.26805446|
|  1|  8|0.26825935|
+---+---+----------+

我想向df1 添加一个名为gamma 的新列,其中将包含df2df1.n == df2.x1 OR df1.n == df2.x2w 值的总和

我尝试使用 udf,但显然从不同的数据框中选择将不起作用,因为值应该在计算之前确定

gamma_udf = udf(lambda n: float(df2.filter("x1 = %d OR x2 = %d"%(n,n)).groupBy().sum('w').rdd.map(lambda x: x).collect()[0]), FloatType())
df1.withColumn('gamma1', gamma_udf('n'))

有什么方法可以在不使用循环的情况下使用joingroupby 来实现吗?

【问题讨论】:

df1.join(df2, (df1.n == df2.x1) | (df1.n == df2.x2)).groupBy(df1.n).sum("w")? 【参考方案1】:

您不能在 udf 中引用 DataFrame。正如您所提到的,这个问题最好使用join 来解决。

IIUC,您正在寻找类似的东西:

from pyspark.sql import Window
import pyspark.sql.functions as F

df1.alias("L").join(df2.alias("R"), (df1.n == df2.x1) | (df1.n == df2.x2), how="left")\
    .select("L.*", F.sum("w").over(Window.partitionBy("n")).alias("gamma"))\
    .distinct()\
    .show()
#+---+---+----------+----------+
#|  n|val| distances|     gamma|
#+---+---+----------+----------+
#|  1|  1|0.27308652|0.75747334|
#|  3|  1|0.21314497|      null|
#|  2|  1|0.24969208|0.03103427|
#+---+---+----------+----------+

或者,如果您更熟悉 pyspark-sql 语法,您可以注册临时表并执行以下操作:

df1.registerTempTable("df1")
df2.registerTempTable("df2")

sqlCtx.sql(
    "SELECT DISTINCT L.*, SUM(R.w) OVER (PARTITION BY L.n) AS gamma "
    "FROM df1 L LEFT JOIN df2 R ON L.n = R.x1 OR L.n = R.x2"
).show()
#+---+---+----------+----------+
#|  n|val| distances|     gamma|
#+---+---+----------+----------+
#|  1|  1|0.27308652|0.75747334|
#|  3|  1|0.21314497|      null|
#|  2|  1|0.24969208|0.03103427|
#+---+---+----------+----------+

说明

在这两种情况下,我们都在做 left join 的 df1df2。这将保留 df1 中的所有行,无论是否存在匹配项。

join 子句是您在问题中指定的条件。所以df2 中的所有行,其中x1x2 等于n 将被加入。

接下来选择左侧表中的所有行,加上我们按(分区)n 分组,并对w 的值求和。对于n 的每个值,这将获得与连接条件匹配的所有行的总和。

最后我们只返回不同的行来消除重复。

【讨论】:

如果df2的维度远大于df1,会不会对df2中的所有值取和? 总和将超过 df2 中与分区上的连接条件匹配的所有值。这不适合你吗?如果有,你能举个例子吗? 我不确定,在 pyspark 中有点新。我只是想弄清楚你的答案是如何工作的。

以上是关于pyspark 在 udf 中使用数据框的主要内容,如果未能解决你的问题,请参考以下文章

pyspark 数据框 UDF 异常处理

如何创建 Pyspark UDF 以向数据框添加新列

PySpark - 将列表作为参数传递给 UDF + 迭代数据框列添加

带有数据框查询的 PySpark UDF 函数?

pyspark:将多个数据框字段传递给 udf

将 pyspark pandas_udf 与 AWS EMR 一起使用时出现“没有名为‘pandas’的模块”错误