pyspark 在 udf 中使用数据框
Posted
技术标签:
【中文标题】pyspark 在 udf 中使用数据框【英文标题】:pyspark use dataframe inside udf 【发布时间】:2018-05-01 20:18:20 【问题描述】:我有两个数据框df1
+---+---+----------+
| n|val| distances|
+---+---+----------+
| 1| 1|0.27308652|
| 2| 1|0.24969208|
| 3| 1|0.21314497|
+---+---+----------+
和df2
+---+---+----------+
| x1| x2| w|
+---+---+----------+
| 1| 2|0.03103427|
| 1| 4|0.19012526|
| 1| 10|0.26805446|
| 1| 8|0.26825935|
+---+---+----------+
我想向df1
添加一个名为gamma
的新列,其中将包含df2
时df1.n == df2.x1 OR df1.n == df2.x2
时w
值的总和
我尝试使用 udf,但显然从不同的数据框中选择将不起作用,因为值应该在计算之前确定
gamma_udf = udf(lambda n: float(df2.filter("x1 = %d OR x2 = %d"%(n,n)).groupBy().sum('w').rdd.map(lambda x: x).collect()[0]), FloatType())
df1.withColumn('gamma1', gamma_udf('n'))
有什么方法可以在不使用循环的情况下使用join
或groupby
来实现吗?
【问题讨论】:
df1.join(df2, (df1.n == df2.x1) | (df1.n == df2.x2)).groupBy(df1.n).sum("w")
?
【参考方案1】:
您不能在 udf
中引用 DataFrame。正如您所提到的,这个问题最好使用join
来解决。
IIUC,您正在寻找类似的东西:
from pyspark.sql import Window
import pyspark.sql.functions as F
df1.alias("L").join(df2.alias("R"), (df1.n == df2.x1) | (df1.n == df2.x2), how="left")\
.select("L.*", F.sum("w").over(Window.partitionBy("n")).alias("gamma"))\
.distinct()\
.show()
#+---+---+----------+----------+
#| n|val| distances| gamma|
#+---+---+----------+----------+
#| 1| 1|0.27308652|0.75747334|
#| 3| 1|0.21314497| null|
#| 2| 1|0.24969208|0.03103427|
#+---+---+----------+----------+
或者,如果您更熟悉 pyspark-sql
语法,您可以注册临时表并执行以下操作:
df1.registerTempTable("df1")
df2.registerTempTable("df2")
sqlCtx.sql(
"SELECT DISTINCT L.*, SUM(R.w) OVER (PARTITION BY L.n) AS gamma "
"FROM df1 L LEFT JOIN df2 R ON L.n = R.x1 OR L.n = R.x2"
).show()
#+---+---+----------+----------+
#| n|val| distances| gamma|
#+---+---+----------+----------+
#| 1| 1|0.27308652|0.75747334|
#| 3| 1|0.21314497| null|
#| 2| 1|0.24969208|0.03103427|
#+---+---+----------+----------+
说明
在这两种情况下,我们都在做 left join 的 df1
到 df2
。这将保留 df1
中的所有行,无论是否存在匹配项。
join 子句是您在问题中指定的条件。所以df2
中的所有行,其中x1
或x2
等于n
将被加入。
接下来选择左侧表中的所有行,加上我们按(分区)n
分组,并对w
的值求和。对于n
的每个值,这将获得与连接条件匹配的所有行的总和。
最后我们只返回不同的行来消除重复。
【讨论】:
如果df2
的维度远大于df1
,会不会对df2
中的所有值取和?
总和将超过 df2 中与分区上的连接条件匹配的所有值。这不适合你吗?如果有,你能举个例子吗?
我不确定,在 pyspark 中有点新。我只是想弄清楚你的答案是如何工作的。以上是关于pyspark 在 udf 中使用数据框的主要内容,如果未能解决你的问题,请参考以下文章