spark:合并两个数据帧,如果两个数据帧中的ID重复,则df1中的行覆盖df2中的行
Posted
技术标签:
【中文标题】spark:合并两个数据帧,如果两个数据帧中的ID重复,则df1中的行覆盖df2中的行【英文标题】:spark: merge two dataframes, if ID duplicated in two dataframes, the row in df1 overwrites the row in df2 【发布时间】:2019-09-08 02:42:41 【问题描述】:有两个数据框:df1 和 df2 具有相同的架构。 ID 是主键。
我需要合并两个 df1 和 df2。这可以通过union
完成,但有一个特殊要求:如果 df1 和 df2 中存在具有相同 ID 的重复行。我需要在 df1 中保留一个。
df1:
ID col1 col2
1 AA 2019
2 B 2018
df2:
ID col1 col2
1 A 2019
3 C 2017
我需要以下输出:
df1:
ID col1 col2
1 AA 2019
2 B 2018
3 C 2017
如何做到这一点?谢谢。我认为可以注册两个 tmp 表,进行完全连接并使用coalesce
。但我不喜欢这种方式,因为实际上大约有 40 列,而不是上面示例中的 3 列。
【问题讨论】:
你能展示一些示例输入和预期输出吗? 【参考方案1】:鉴于两个 DataFrame 具有相同的架构,您可以简单地将 df1
与 df2
和 df1
的 left_anti
连接:
df1.union(df2.join(df1, Seq("ID"), "left_anti")).show
// +---+---+----+
// | ID|co1|col2|
// +---+---+----+
// | 1| AA|2019|
// | 2| B|2018|
// | 3| C|2017|
// +---+---+----+
【讨论】:
非常简洁的解决方案。【参考方案2】:执行此操作的一种方法是,union
使用标识符列指定数据帧,然后使用它对来自 df1
的行进行优先级排序,并使用类似 row_number
的函数。
此处显示 PySpark SQL 解决方案。
from pyspark.sql.functions import lit,row_number,when
from pyspark.sql import Window
df1_with_identifier = df1.withColumn('identifier',lit('df1'))
df2_with_identifier = df2.withColumn('identifier',lit('df2'))
merged_df = df1_with_identifier.union(df2_with_identifier)
#Define the Window with the desired ordering
w = Window.partitionBy(merged_df.id).orderBy(when(merged_df.identifier == 'df1',1).otherwise(2))
result = merged_df.withColumn('rownum',row_number().over(w))
result.select(result.rownum == 1).show()
在df1
上带有left join
的解决方案可能要简单得多,但您必须编写多个coalesce
s。
【讨论】:
以上是关于spark:合并两个数据帧,如果两个数据帧中的ID重复,则df1中的行覆盖df2中的行的主要内容,如果未能解决你的问题,请参考以下文章
如果日期介于第二个数据帧中的两个日期之间,则 r 标记第一个数据帧中的行