PySpark - 如何使用连接更新 Dataframe?

Posted

技术标签:

【中文标题】PySpark - 如何使用连接更新 Dataframe?【英文标题】:PySpark - how to update Dataframe by using join? 【发布时间】:2019-10-14 13:10:47 【问题描述】:

我有一个数据框:

id,value
1,11
2,22
3,33

还有另一个数据框 b:

id,value
1,123
3,345

我想用来自 b 的所有匹配值更新数据框 a(基于列 'id')。

最终数据框“c”将是:

id,value
1,123
2,22
3,345

如何使用 datafame 连接(或其他方法)来实现?

试过了:

a.join(b, a.id == b.id, "inner").drop(a.value)

给出(不需要的输出):

+---+---+-----+
| id| id|value|
+---+---+-----+
|  1|  1|  123|
|  3|  3|  345|
+---+---+-----+

谢谢。

【问题讨论】:

它会投射你,但它会给你结果。 scala> dfd.join(df.select("id"),Seq("id"),"inner").union(df.join(dfd,Seq("id"),"left_anti")).orderBy( "id").show 【参考方案1】:

我认为没有更新功能。但这应该可行:

import pyspark.sql.functions as F

df1.join(df2, df1.id == df2.id, "left_outer") \
   .select(df1.id, df2.id, F.when(df2.value.isNull(), df1.value).otherwise(df2.value).alias("value")))

【讨论】:

这很接近通过查看逻辑但得到错误raise TypeError("Column is not iterable") column() 替换为select()

以上是关于PySpark - 如何使用连接更新 Dataframe?的主要内容,如果未能解决你的问题,请参考以下文章

PySpark:如何使用带有 JDBC 连接的 MySQL 函数?

如何使用分隔符连接 PySpark 中的多列?

我们如何在 pyspark 的不同模块中使用相同的连接数据框用法

如何使用 Hive 仓库连接器在 pyspark 中执行 HQL 文件

如何使用 PySpark 对两个 RDD 进行完全外连接?

如何使用 groupby 和聚合将 pyspark 数据框中的行与多列连接起来