PySpark - 如何使用连接更新 Dataframe?
Posted
技术标签:
【中文标题】PySpark - 如何使用连接更新 Dataframe?【英文标题】:PySpark - how to update Dataframe by using join? 【发布时间】:2019-10-14 13:10:47 【问题描述】:我有一个数据框:
id,value
1,11
2,22
3,33
还有另一个数据框 b:
id,value
1,123
3,345
我想用来自 b 的所有匹配值更新数据框 a(基于列 'id')。
最终数据框“c”将是:
id,value
1,123
2,22
3,345
如何使用 datafame 连接(或其他方法)来实现?
试过了:
a.join(b, a.id == b.id, "inner").drop(a.value)
给出(不需要的输出):
+---+---+-----+
| id| id|value|
+---+---+-----+
| 1| 1| 123|
| 3| 3| 345|
+---+---+-----+
谢谢。
【问题讨论】:
它会投射你,但它会给你结果。 scala> dfd.join(df.select("id"),Seq("id"),"inner").union(df.join(dfd,Seq("id"),"left_anti")).orderBy( "id").show 【参考方案1】:我认为没有更新功能。但这应该可行:
import pyspark.sql.functions as F
df1.join(df2, df1.id == df2.id, "left_outer") \
.select(df1.id, df2.id, F.when(df2.value.isNull(), df1.value).otherwise(df2.value).alias("value")))
【讨论】:
这很接近通过查看逻辑但得到错误raise TypeError("Column is not iterable")
将column()
替换为select()
以上是关于PySpark - 如何使用连接更新 Dataframe?的主要内容,如果未能解决你的问题,请参考以下文章
PySpark:如何使用带有 JDBC 连接的 MySQL 函数?
我们如何在 pyspark 的不同模块中使用相同的连接数据框用法