将重复记录合并到 pyspark 数据框中的单个记录中
Posted
技术标签:
【中文标题】将重复记录合并到 pyspark 数据框中的单个记录中【英文标题】:Merge duplicate records into single record in a pyspark dataframe 【发布时间】:2018-12-21 08:47:58 【问题描述】:我有一个包含重复行的数据框,我想将它们合并到一个包含所有不同列的单个记录中。
我的代码示例如下:
df1= sqlContext.createDataFrame([("81A01","TERR NAME 01","NJ","",""),("81A01","TERR NAME 01","","NY",""),("81A01","TERR NAME 01","","","LA"),("81A02","TERR NAME 01","CA","",""),("81A02","TERR NAME 01","","","NY")], ["zip_code","territory_name","state","state1","state2"])
生成的数据框如下:
df1.show()
+--------+--------------+-----+------+------+
|zip_code|territory_name|state|state1|state2|
+--------+--------------+-----+------+------+
| 81A01| TERR NAME 01| NJ| | |
| 81A01| TERR NAME 01| | NY| |
| 81A01| TERR NAME 01| | | LA|
| 81A02| TERR NAME 01| CA| | |
| 81A02| TERR NAME 01| | | NY|
+--------+--------------+-----+------+------+
我需要根据邮政编码合并/合并重复记录,并在一行中获取所有不同的状态值。
预期结果:
+--------+--------------+-----+------+------+
|zip_code|territory_name|state|state1|state2|
+--------+--------------+-----+------+------+
| 81A01| TERR NAME 01| NJ| NY| LA|
| 81A02| TERR NAME 01| CA| | LA|
+--------+--------------+-----+------+------+
我是 pyspark 的新手,不知道如何使用组/加入。有人可以帮忙写代码吗?
【问题讨论】:
【参考方案1】:如果您确定每个 zip_code 区域组合只有 1 个州、1 个州 1 和 1 个州 2,您可以使用以下代码。 max
函数使用字符串,如果分组数据中有字符串,因为非空字符串具有更高的值(可能是 ASCII)然后空字符串 ""
from pyspark.sql.types import *
from pyspark.sql.functions import *
df1= sqlContext.createDataFrame([("81A01","TERR NAME 01","NJ","",""),("81A01","TERR NAME 01","","NY",""),("81A01","TERR NAME 01","","","LA"),("81A02","TERR NAME 01","CA","",""),("81A02","TERR NAME 01","","","NY")], ["zip_code","territory_name","state","state1","state2"])
df1.groupBy("zip_code","territory_name").agg(max("state").alias("state"),max("state1").alias("state1"),max("state2").alias("state2")).show()
结果:
+--------+--------------+-----+------+------+
|zip_code|territory_name|state|state1|state2|
+--------+--------------+-----+------+------+
| 81A02| TERR NAME 01| CA| | NY|
| 81A01| TERR NAME 01| NJ| NY| LA|
+--------+--------------+-----+------+------+
【讨论】:
【参考方案2】:注意:对于zip_code
和territory_name
的任何唯一记录,如果在任何状态列下有多个条目,则它们将是concatenated
。
一些解释:在这段代码中,我使用了RDDs
。我首先将每条记录分为两个tuples
,tuple1
为key
,tuple2
为value
。然后,我减少了key
。 x
对应于 (zip_code, territory_name)
的 tuple1
和 tuple2
包含 3 个状态列。 tuple1
被视为key
,因为我们想要group by
zip_code
和territory_name
的不同值。所以,像(81A01,TERR NAME 01)
、(81A02,TERR NAME 01)
这样的每个不同的对都是key
,我们在其基础上reduce
。 Reduce
意味着一次取每两个值并对其执行一些operation
,然后用这个结果和下一个元素重复相同的operation
,直到整个元组用尽。
因此,使用+
operation
减少 (1,2,3,4,5) 将是 - 1+2=3
,然后是 3+3=6
并执行 +
operation
直到到达最后一个元素.因此,6+4=10
,最后是10+5=15
。由于元组以 5 结束,所以结果为 15。这就是 reduce
与 +
操作的工作方式。因为,这里我们有strings
而不是numbers
,所以连接将发生A+B=AB
。
df1=df1.rdd.map(lambda r: ((r.zip_code, r.territory_name), (r.state, r.state1, r.state2)))\
.reduceByKey(lambda x,y: (x[0] + y[0], x[1] + y[1], x[2] + y[2]))\
.map(lambda r: (r[0][0],r[0][1],r[1][0],r[1][1],r[1][2]))\
.toDF(["zip_code","territory_name","state","state1","state2"])
df1.show()
+--------+--------------+-----+------+------+
|zip_code|territory_name|state|state1|state2|
+--------+--------------+-----+------+------+
| 81A01| TERR NAME 01| NJ| NY| LA|
| 81A02| TERR NAME 01| CA| | NY|
+--------+--------------+-----+------+------+
【讨论】:
谢谢,你能告诉我这里的x,y是什么吗?如果我有大量行说数千或十万,它会起作用吗?这是一个示例,在实时场景中,我预计需要处理大约 15 到 5 万条记录。 你好,是的,只要保持列结构,代码就可以工作,不管你有 5 Rows 还是 500K Rows。让我解释一下答案中的代码。 谢谢。在性能方面,上述 2 个答案中的哪一个更好。任何建议 我认为gaw
给出的那个更好,因为它采用了SQL
和DataFrames
的优化。您可以使用他的代码。我想提出一种解决问题的不同方法。
已添加说明。以上是关于将重复记录合并到 pyspark 数据框中的单个记录中的主要内容,如果未能解决你的问题,请参考以下文章
使用较低的函数将pyspark数据框中单列中的值转换为文本清理中的小写[重复]