spark:合并两个数据帧,如果两个数据帧中的ID重复,则df1中的行覆盖df2中的行

Posted

技术标签:

【中文标题】spark:合并两个数据帧,如果两个数据帧中的ID重复,则df1中的行覆盖df2中的行【英文标题】:spark: merge two dataframes, if ID duplicated in two dataframes, the row in df1 overwrites the row in df2 【发布时间】:2019-09-08 02:42:41 【问题描述】:

有两个数据框:df1 和 df2 具有相同的架构。 ID 是主键。

我需要合并两个 df1 和 df2。这可以通过union 完成,但有一个特殊要求:如果 df1 和 df2 中存在具有相同 ID 的重复行。我需要在 df1 中保留一个。

df1:

ID col1 col2
1  AA   2019
2  B    2018

df2:

ID col1 col2
1  A    2019
3  C    2017

我需要以下输出:

df1:

ID col1 col2
1  AA   2019
2  B    2018
3  C    2017

如何做到这一点?谢谢。我认为可以注册两个 tmp 表,进行完全连接并使用coalesce。但我不喜欢这种方式,因为实际上大约有 40 列,而不是上面示例中的 3 列。

【问题讨论】:

你能展示一些示例输入和预期输出吗? 【参考方案1】:

鉴于两个 DataFrame 具有相同的架构,您可以简单地将 df1df2df1left_anti 连接:

df1.union(df2.join(df1, Seq("ID"), "left_anti")).show
// +---+---+----+
// | ID|co1|col2|
// +---+---+----+
// |  1| AA|2019|
// |  2|  B|2018|
// |  3|  C|2017|
// +---+---+----+

【讨论】:

非常简洁的解决方案。【参考方案2】:

执行此操作的一种方法是,union 使用标识符列指定数据帧,然后使用它对来自 df1 的行进行优先级排序,并使用类似 row_number 的函数。

此处显示 PySpark SQL 解决方案。

from pyspark.sql.functions import lit,row_number,when
from pyspark.sql import Window
df1_with_identifier = df1.withColumn('identifier',lit('df1'))
df2_with_identifier = df2.withColumn('identifier',lit('df2'))
merged_df = df1_with_identifier.union(df2_with_identifier)
#Define the Window with the desired ordering
w = Window.partitionBy(merged_df.id).orderBy(when(merged_df.identifier == 'df1',1).otherwise(2))
result = merged_df.withColumn('rownum',row_number().over(w))
result.select(result.rownum == 1).show()

df1 上带有left join 的解决方案可能要简单得多,但您必须编写多个coalesces。

【讨论】:

以上是关于spark:合并两个数据帧,如果两个数据帧中的ID重复,则df1中的行覆盖df2中的行的主要内容,如果未能解决你的问题,请参考以下文章

如果日期介于第二个数据帧中的两个日期之间,则 r 标记第一个数据帧中的行

向前搜索时合并两个数据帧

合并一个值在另外两个之间的熊猫数据框[重复]

如何根据原始数据帧中的总行数将数据帧拆分为两个数据帧

合并两个不同长度的python pandas数据帧,但将所有行保留在输出数据帧中

如何从两个合并的数据帧中选择完成之前和之后的特定时间间隔?