Pyspark 基于另一个类似的数据框添加或删除数据框中的行
Posted
技术标签:
【中文标题】Pyspark 基于另一个类似的数据框添加或删除数据框中的行【英文标题】:Pyspark add or remove rows in a dataframe based on another similar dataframe 【发布时间】:2021-12-30 18:43:50 【问题描述】:假设我有两个具有相同架构的数据框 DF1 和 DF2。 我想做的是:
对于 DF1 中的每一行,
如果 DF2 中存在 DF1.uniqueId 并且类型是新的,则添加到 DF2 并重复计数。
如果 DF2 中存在 DF1.uniqueId 且类型为旧,则将 DF2 类型更改为 DF1 类型(旧)。
如果 DF2 中不存在 DF1.uniqueId 并且类型是新的,则在 DF2 中添加一个新行。
如果 DF1.uniqueId 在 DF2 中不存在并且类型是旧的,则将该行移动到新表 -DF3
即。如果表格如下所示,则结果或更新后的 DF2 应该类似于下面的 resultDF2 表格
DF1
+----------+--------------------------+
|UniqueID |type_ |
+----------+--------------------------+
|1 |new |
|1 |new |
|1 |new |
|2 |old |
|1 |new |
+----------+--------------------------+
DF2
+----------+--------------------------+
|UniqueID |type_ |
+----------+--------------------------+
| | |
+----------+--------------------------+
结果DF2
+----------+--------------------------++----------+--------------------------+
|UniqueID |type_ | repeatCount |
+----------+--------------------------++----------+--------------------------+
| 1 | new | 3 |
+----------+--------------------------++----------+--------------------------+
结果DF3
+----------+--------------------------++----------+--------------------------+
|UniqueID |type_ | repeatCount |
+----------+--------------------------++----------+--------------------------+
| 1 | old | 0 |
+----------+--------------------------++----------+--------------------------+
** 如果只有一个条目,repeatCount 为零。
我正在尝试使用 pyspark 来实现这一点。 考虑到我的两个表都在内存中,任何人都可以向我建议如何实现这一点。
【问题讨论】:
你能再澄清一下条件吗?如果 uniqueID 在数据框中既有新的type_
又有旧的type_
,会发生什么情况?
很抱歉没有提及,如果数据帧(DF1)具有相同uniqueId的新旧type_,那么它们将作为一对被取消。
【参考方案1】:
可以通过以下方式获得所需的输出:
-
在
UniqueId
上分组df1
并获取repeatCount
,在此操作期间删除具有old
和new
type_
的UniqueId
。
在第 1 步的数据框和 df2
之间应用 Full Join
。
从连接结果中,删除df2
中不存在df.UniqueId
且df1.type_
为old
的行。
最后选择UniqueID
、type_
和repeatCount
。
from pyspark.sql import functions as F
data = [(1, "new",), # Not exists and new
(1, "new",),
(1, "new",),
(2, "old",), # Not exists and old
(1, "new",),
(3, "old",), # cancel out
(3, "new",), # cancel out
(4, "new",), # one entry count zero example
(5, "new",), # Exists and new
(6, "old",), ] # Exists and old
df1 = spark.createDataFrame(data, ("UniqueID", "type_", ))
df2 = spark.createDataFrame([(5, "new", ), (6, "new", ), ], ("UniqueID", "type_", ))
df1_grouped = (df1.groupBy("UniqueID").agg(F.collect_set("type_").alias("types_"),
(F.count("type_") - F.lit(1)).alias("repeatCount"))
.filter(F.size(F.col("types_")) == 1) # when more than one type of `type_` is present they cancel out
.withColumn("type_", F.col("types_")[0])
.drop("types_")
)
id_not_exists_old = (df2["UniqueID"].isNull() & (df1_grouped["type_"] == F.lit("old")))
(df1_grouped.join(df2, df1_grouped["UniqueID"] == df2["UniqueID"], "full")
.filter(~(id_not_exists_old))
.select(df1_grouped["UniqueID"], df1_grouped["type_"], "repeatCount")
).show()
"""
+--------+-----+-----------+
|UniqueID|type_|repeatCount|
+--------+-----+-----------+
| 1| new| 3|
| 4| new| 0|
| 5| new| 0|
| 6| old| 0|
+--------+-----+-----------+
"""
【讨论】:
感谢@nithish,一件事是,由于 df2 表中已经存在唯一 ID 5,因此最终结果需要对 uniqueId5 进行重复计数 1。 @AlbinChandy 所以对于df1
repeatCount
是count(UniqueID) - 1
但对于df2
它只是count(UniqueID)
? df2
也会取消吗?
问题是 df2 是一个包含所有类型“新”项目的主表,df1 是周期性重复的批次,在组合重复的项目后将插入到 df1 中,是的,df2 也有消除。类型“旧”可以保留在单独的数据框中。例如。如果 df1 uniqId 01 有一个新的计数为 2 并且 df2 已经有一个计数为 03 的 01,则生成的表具有针对 uniq id 01 的重复计数 5
@AlbinChandy 我建议用可能条件的完整示例以及预期输出将是什么来更新问题,否则很难理解逻辑。
嗨@Nitish,我在这里发布了一个新问题***.com/questions/70640200/… 你可以看看你是否可以。以上是关于Pyspark 基于另一个类似的数据框添加或删除数据框中的行的主要内容,如果未能解决你的问题,请参考以下文章
从列表中创建一个 pyspark 数据框列,其中列表的长度与数据框的行数相同