pyspark:如果列在不同行中具有相同的值,则合并两行或多行

Posted

技术标签:

【中文标题】pyspark:如果列在不同行中具有相同的值,则合并两行或多行【英文标题】:pyspark: Merge two or more rows if column have equal values in different rows 【发布时间】:2017-12-28 07:55:15 【问题描述】:

由于数据量大,我不得不使用pyspark将不同行中的dataframe值(一个列表)组合起来。

这样的数据框:

x = sqlContext.createDataFrame([("A", ['1','2','3']),("B", ['4','2','5','6']),("C", ['2','4','9','10']),("D", ['11','12','15','16'])],["index", "num_group"])
+-----+----------------+
|index|       num_group|
+-----+----------------+
|    A|       [1, 2, 3]|
|    B|    [4, 2, 5, 6]|
|    C|   [2, 4, 9, 10]|
|    D|[11, 12, 15, 16]|
+-----+----------------+

我想通过具有相同元素的列表合并num_group,如下所示: (索引是一个无意义的值或字符串)

+-------------------------+
|                num_group|
+-------------------------+
|[1, 2, 3, 4, 5, 6, 9, 10]|
|         [11, 12, 15, 16]|
+-------------------------+

我想我可以使用 graphframes GraphX 来查找连接并根据不同行中的相等值合并两行或多行。

有可能吗?我不太了解documents 的例子。

任何帮助将不胜感激。

【问题讨论】:

【参考方案1】:

您不需要使用GraphX 库。 您所需要的只是collect_listudfexplode 中的pyspark.sql.functions 函数和一些小的python 操作。

因此,您要做的第一步是收集num_group 列中的所有lists

from pyspark.sql import functions as F
y = x.select(F.collect_list("num_group").alias("collected"))

应该给你dataframe

+----------------------------------------------------------------------------------------------------------+
|collected                                                                                                 |
+----------------------------------------------------------------------------------------------------------+
|[WrappedArray(1, 2, 3), WrappedArray(4, 2, 5, 6), WrappedArray(2, 4, 9, 10), WrappedArray(11, 12, 15, 16)]|
+----------------------------------------------------------------------------------------------------------+

下一步将定义一个udf 函数来遍历所有收集的列表并检查每个列表中的元素,并根据您的要求创建一个包含合并列表的新列表数组。

def computation(s):
    finalList = []
    finalList.append(list(str(i) for i in s[0]))
    for index in range(1, len(s)):
        for finals in finalList:
            check = False
            for x in s[index]:
                if x in finals:
                    check = True
                    break
            if check == True:
                finals_1 = finals + list(str(i) for i in s[index])
                finalList.remove(finals)
                finalList.append(sorted(list(set(str(i) for i in finals_1))))
            else:
                finalList.append(list(str(i) for i in s[index]))
    return finalList

from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType
collecting_udf = udf(computation, ArrayType(StringType()))

然后您可以只使用explode 函数将最终列表分成单独的行。

from pyspark.sql.functions import explode
y.select(explode(collecting_udf("collected")).alias("num_group"))

你应该有以下输出

+-------------------------+
|num_group                |
+-------------------------+
|[1, 10, 2, 3, 4, 5, 6, 9]|
|[11, 12, 15, 16]         |
+-------------------------+

【讨论】:

感谢您的回复。我已经尝试过您几天前自己提供的方式,但是由于数据量大(100,000,000行)而卡住了。这就是为什么我必须找到其他方法来解决这个问题! 你有其他解决这个问题的方法吗?

以上是关于pyspark:如果列在不同行中具有相同的值,则合并两行或多行的主要内容,如果未能解决你的问题,请参考以下文章

获取最新日期的最新结果,其中三列在mysql中具有相同的值

NSString 散列在不同版本的 iOS(不是 OS X)中具有相同的值吗?

如何在pyspark中连接具有相同名称的列的值

PHP MySQL将来自不同行的单元格组合在一起具有相同的值

如何在pyspark中使用具有相同客户ID的新数据框的值更新一行旧数据框

UITableView - 具有相同数组的部分中的不同行