pyspark:如果列在不同行中具有相同的值,则合并两行或多行
Posted
技术标签:
【中文标题】pyspark:如果列在不同行中具有相同的值,则合并两行或多行【英文标题】:pyspark: Merge two or more rows if column have equal values in different rows 【发布时间】:2017-12-28 07:55:15 【问题描述】:由于数据量大,我不得不使用pyspark
将不同行中的dataframe值(一个列表)组合起来。
这样的数据框:
x = sqlContext.createDataFrame([("A", ['1','2','3']),("B", ['4','2','5','6']),("C", ['2','4','9','10']),("D", ['11','12','15','16'])],["index", "num_group"])
+-----+----------------+
|index| num_group|
+-----+----------------+
| A| [1, 2, 3]|
| B| [4, 2, 5, 6]|
| C| [2, 4, 9, 10]|
| D|[11, 12, 15, 16]|
+-----+----------------+
我想通过具有相同元素的列表合并num_group
,如下所示:
(索引是一个无意义的值或字符串)
+-------------------------+
| num_group|
+-------------------------+
|[1, 2, 3, 4, 5, 6, 9, 10]|
| [11, 12, 15, 16]|
+-------------------------+
我想我可以使用 graphframes GraphX 来查找连接并根据不同行中的相等值合并两行或多行。
有可能吗?我不太了解documents 的例子。
任何帮助将不胜感激。
【问题讨论】:
【参考方案1】:您不需要使用GraphX 库。 您所需要的只是collect_list
、udf
和explode
中的pyspark.sql.functions
函数和一些小的python 操作。
因此,您要做的第一步是收集num_group
列中的所有lists
。
from pyspark.sql import functions as F
y = x.select(F.collect_list("num_group").alias("collected"))
应该给你dataframe
+----------------------------------------------------------------------------------------------------------+
|collected |
+----------------------------------------------------------------------------------------------------------+
|[WrappedArray(1, 2, 3), WrappedArray(4, 2, 5, 6), WrappedArray(2, 4, 9, 10), WrappedArray(11, 12, 15, 16)]|
+----------------------------------------------------------------------------------------------------------+
下一步将定义一个udf
函数来遍历所有收集的列表并检查每个列表中的元素,并根据您的要求创建一个包含合并列表的新列表数组。
def computation(s):
finalList = []
finalList.append(list(str(i) for i in s[0]))
for index in range(1, len(s)):
for finals in finalList:
check = False
for x in s[index]:
if x in finals:
check = True
break
if check == True:
finals_1 = finals + list(str(i) for i in s[index])
finalList.remove(finals)
finalList.append(sorted(list(set(str(i) for i in finals_1))))
else:
finalList.append(list(str(i) for i in s[index]))
return finalList
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType
collecting_udf = udf(computation, ArrayType(StringType()))
然后您可以只使用explode
函数将最终列表分成单独的行。
from pyspark.sql.functions import explode
y.select(explode(collecting_udf("collected")).alias("num_group"))
你应该有以下输出
+-------------------------+
|num_group |
+-------------------------+
|[1, 10, 2, 3, 4, 5, 6, 9]|
|[11, 12, 15, 16] |
+-------------------------+
【讨论】:
感谢您的回复。我已经尝试过您几天前自己提供的方式,但是由于数据量大(100,000,000行)而卡住了。这就是为什么我必须找到其他方法来解决这个问题! 你有其他解决这个问题的方法吗?以上是关于pyspark:如果列在不同行中具有相同的值,则合并两行或多行的主要内容,如果未能解决你的问题,请参考以下文章
NSString 散列在不同版本的 iOS(不是 OS X)中具有相同的值吗?
PHP MySQL将来自不同行的单元格组合在一起具有相同的值