将重复记录合并到 pyspark 数据框中的单个记录中

Posted

技术标签:

【中文标题】将重复记录合并到 pyspark 数据框中的单个记录中【英文标题】:Merge duplicate records into single record in a pyspark dataframe 【发布时间】:2018-12-21 08:47:58 【问题描述】:

我有一个包含重复行的数据框,我想将它们合并到一个包含所有不同列的单个记录中。

我的代码示例如下:

df1= sqlContext.createDataFrame([("81A01","TERR NAME 01","NJ","",""),("81A01","TERR NAME 01","","NY",""),("81A01","TERR NAME 01","","","LA"),("81A02","TERR NAME 01","CA","",""),("81A02","TERR NAME 01","","","NY")], ["zip_code","territory_name","state","state1","state2"])

生成的数据框如下:

df1.show()
+--------+--------------+-----+------+------+
|zip_code|territory_name|state|state1|state2|
+--------+--------------+-----+------+------+
|   81A01|  TERR NAME 01|   NJ|      |      |
|   81A01|  TERR NAME 01|     |    NY|      |
|   81A01|  TERR NAME 01|     |      |    LA|
|   81A02|  TERR NAME 01|   CA|      |      |
|   81A02|  TERR NAME 01|     |      |    NY|
+--------+--------------+-----+------+------+

我需要根据邮政编码合并/合并重复记录,并在一行中获取所有不同的状态值。

预期结果:

+--------+--------------+-----+------+------+
|zip_code|territory_name|state|state1|state2|
+--------+--------------+-----+------+------+
|   81A01|  TERR NAME 01|   NJ|    NY|    LA|
|   81A02|  TERR NAME 01|   CA|      |    LA|
+--------+--------------+-----+------+------+

我是 pyspark 的新手,不知道如何使用组/加入。有人可以帮忙写代码吗?

【问题讨论】:

【参考方案1】:

如果您确定每个 zip_code 区域组合只有 1 个州、1 个州 1 和 1 个州 2,您可以使用以下代码。 max 函数使用字符串,如果分组数据中有字符串,因为非空字符串具有更高的值(可能是 ASCII)然后空字符串 ""

from pyspark.sql.types import *
from pyspark.sql.functions import *
df1= sqlContext.createDataFrame([("81A01","TERR NAME 01","NJ","",""),("81A01","TERR NAME 01","","NY",""),("81A01","TERR NAME 01","","","LA"),("81A02","TERR NAME 01","CA","",""),("81A02","TERR NAME 01","","","NY")], ["zip_code","territory_name","state","state1","state2"])

df1.groupBy("zip_code","territory_name").agg(max("state").alias("state"),max("state1").alias("state1"),max("state2").alias("state2")).show()

结果:

+--------+--------------+-----+------+------+
|zip_code|territory_name|state|state1|state2|
+--------+--------------+-----+------+------+
|   81A02|  TERR NAME 01|   CA|      |    NY|
|   81A01|  TERR NAME 01|   NJ|    NY|    LA|
+--------+--------------+-----+------+------+

【讨论】:

【参考方案2】:

注意:对于zip_codeterritory_name的任何唯一记录,如果在任何状态列下有多个条目,则它们将是concatenated

一些解释:在这段代码中,我使用了RDDs。我首先将每条记录分为两个tuplestuple1keytuple2value。然后,我减少了keyx 对应于 (zip_code, territory_name)tuple1tuple2 包含 3 个状态列。 tuple1 被视为key,因为我们想要group by zip_codeterritory_name 的不同值。所以,像(81A01,TERR NAME 01)(81A02,TERR NAME 01) 这样的每个不同的对都是key,我们在其基础上reduceReduce 意味着一次取每两个值并对其执行一些operation,然后用这个结果和下一个元素重复相同的operation,直到整个元组用尽。

因此,使用+ operation 减少 (1,2,3,4,5) 将是 - 1+2=3,然后是 3+3=6 并执行 + operation 直到到达最后一个元素.因此,6+4=10,最后是10+5=15。由于元组以 5 结束,所以结果为 15。这就是 reduce+ 操作的工作方式。因为,这里我们有strings 而不是numbers,所以连接将发生A+B=AB

df1=df1.rdd.map(lambda r: ((r.zip_code, r.territory_name), (r.state, r.state1, r.state2)))\
       .reduceByKey(lambda x,y: (x[0] + y[0], x[1] + y[1], x[2] + y[2]))\
       .map(lambda r: (r[0][0],r[0][1],r[1][0],r[1][1],r[1][2]))\
       .toDF(["zip_code","territory_name","state","state1","state2"])
df1.show()
+--------+--------------+-----+------+------+
|zip_code|territory_name|state|state1|state2|
+--------+--------------+-----+------+------+
|   81A01|  TERR NAME 01|   NJ|    NY|    LA|
|   81A02|  TERR NAME 01|   CA|      |    NY|
+--------+--------------+-----+------+------+

【讨论】:

谢谢,你能告诉我这里的x,y是什么吗?如果我有大量行说数千或十万,它会起作用吗?这是一个示例,在实时场景中,我预计需要处理大约 15 到 5 万条记录。 你好,是的,只要保持列结构,代码就可以工作,不管你有 5 Rows 还是 500K Rows。让我解释一下答案中的代码。 谢谢。在性能方面,上述 2 个答案中的哪一个更好。任何建议 我认为gaw给出的那个更好,因为它采用了SQLDataFrames的优化。您可以使用他的代码。我想提出一种解决问题的不同方法。 已添加说明。

以上是关于将重复记录合并到 pyspark 数据框中的单个记录中的主要内容,如果未能解决你的问题,请参考以下文章

使用较低的函数将pyspark数据框中单列中的值转换为文本清理中的小写[重复]

遍历 pyspark 数据框中的列,而不为单个列创建不同的数据框

如何将数据框中的人口单元格与现有数据框合并和重复?

Pyspark 数据框中的重复行

从 PySpark 中的数据框中删除重复项

从 PySpark 中的数据框中删除重复项