Pyspark 基于另一个类似的数据框添加或删除数据框中的行

Posted

技术标签:

【中文标题】Pyspark 基于另一个类似的数据框添加或删除数据框中的行【英文标题】:Pyspark add or remove rows in a dataframe based on another similar dataframe 【发布时间】:2021-12-30 18:43:50 【问题描述】:

假设我有两个具有相同架构的数据框 DF1 和 DF2。 我想做的是:

对于 DF1 中的每一行,

如果 DF2 中存在 DF1.uniqueId 并且类型是新的,则添加到 DF2 并重复计数。

如果 DF2 中存在 DF1.uniqueId 且类型为旧,则将 DF2 类型更改为 DF1 类型(旧)。

如果 DF2 中不存在 DF1.uniqueId 并且类型是新的,则在 DF2 中添加一个新行。

如果 DF1.uniqueId 在 DF2 中不存在并且类型是旧的,则将该行移动到新表 -DF3

即。如果表格如下所示,则结果或更新后的 DF2 应该类似于下面的 resultDF2 表格

DF1

+----------+--------------------------+
|UniqueID  |type_                     |
+----------+--------------------------+
|1         |new                       |           
|1         |new                       |      
|1         |new                       |
|2         |old                       |
|1         |new                       |
+----------+--------------------------+

DF2

+----------+--------------------------+
|UniqueID  |type_                     |
+----------+--------------------------+
|          |                          |
+----------+--------------------------+


结果DF2

+----------+--------------------------++----------+--------------------------+
|UniqueID  |type_                     |          repeatCount                 |
+----------+--------------------------++----------+--------------------------+
|    1     |   new                    |                    3                 |
+----------+--------------------------++----------+--------------------------+

结果DF3

+----------+--------------------------++----------+--------------------------+
|UniqueID  |type_                     |          repeatCount                 |
+----------+--------------------------++----------+--------------------------+
|    1     |   old                    |                    0                 |
+----------+--------------------------++----------+--------------------------+

** 如果只有一个条目,repeatCount 为零。

我正在尝试使用 pyspark 来实现这一点。 考虑到我的两个表都在内存中,任何人都可以向我建议如何实现这一点。

【问题讨论】:

你能再澄清一下条件吗?如果 uniqueID 在数据框中既有新的type_ 又有旧的type_,会发生什么情况? 很抱歉没有提及,如果数据帧(DF1)具有相同uniqueId的新旧type_,那么它们将作为一对被取消。 【参考方案1】:

可以通过以下方式获得所需的输出:

    UniqueId 上分组df1 并获取repeatCount,在此操作期间删除具有oldnew type_UniqueId。 在第 1 步的数据框和 df2 之间应用 Full Join。 从连接结果中,删除df2 中不存在df.UniqueIddf1.type_old 的行。 最后选择UniqueIDtype_repeatCount
from pyspark.sql import functions as F

data = [(1, "new",), # Not exists and new
        (1, "new",),
        (1, "new",),
        (2, "old",), # Not exists and old
        (1, "new",),
        (3, "old",), # cancel out
        (3, "new",), # cancel out
        (4, "new",), # one entry count zero example
        (5, "new",), # Exists and new
        (6, "old",), ] # Exists and old
df1 = spark.createDataFrame(data, ("UniqueID", "type_", ))
df2 = spark.createDataFrame([(5, "new", ), (6, "new", ), ], ("UniqueID", "type_", ))

df1_grouped = (df1.groupBy("UniqueID").agg(F.collect_set("type_").alias("types_"), 
                                           (F.count("type_") - F.lit(1)).alias("repeatCount"))
                  .filter(F.size(F.col("types_")) == 1) # when more than one type of `type_` is present they cancel out
                  .withColumn("type_", F.col("types_")[0])
                  .drop("types_")
              )

id_not_exists_old = (df2["UniqueID"].isNull() & (df1_grouped["type_"] == F.lit("old")))

(df1_grouped.join(df2, df1_grouped["UniqueID"] == df2["UniqueID"], "full")
            .filter(~(id_not_exists_old))
            .select(df1_grouped["UniqueID"], df1_grouped["type_"], "repeatCount")
).show()

"""
+--------+-----+-----------+
|UniqueID|type_|repeatCount|
+--------+-----+-----------+
|       1|  new|          3|
|       4|  new|          0|
|       5|  new|          0|
|       6|  old|          0|
+--------+-----+-----------+
"""

【讨论】:

感谢@nithish,一件事是,由于 df2 表中已经存在唯一 ID 5,因此最终结果需要对 uniqueId5 进行重复计数 1。 @AlbinChandy 所以对于df1 repeatCountcount(UniqueID) - 1 但对于df2 它只是count(UniqueID)df2 也会取消吗? 问题是 df2 是一个包含所有类型“新”项目的主表,df1 是周期性重复的批次,在组合重复的项目后将插入到 df1 中,是的,df2 也有消除。类型“旧”可以保留在单独的数据框中。例如。如果 df1 uniqId 01 有一个新的计数为 2 并且 df2 已经有一个计数为 03 的 01,则生成的表具有针对 uniq id 01 的重复计数 5 @AlbinChandy 我建议用可能条件的完整示例以及预期输出将是什么来更新问题,否则很难理解逻辑。 嗨@Nitish,我在这里发布了一个新问题***.com/questions/70640200/… 你可以看看你是否可以。

以上是关于Pyspark 基于另一个类似的数据框添加或删除数据框中的行的主要内容,如果未能解决你的问题,请参考以下文章

PySpark:基于数据框中具有 UUID 的列添加新列

从列表中创建一个 pyspark 数据框列,其中列表的长度与数据框的行数相同

pyspark将列添加到列表中已经不存在的数据框

如何从pyspark数据框列值中删除方括号

如何将数据框中的连接值插入到 Pyspark 中的另一个数据框中?

PySpark - 如何使用连接更新 Dataframe?