在 groupby 操作 PySpark 中聚合列中的稀疏向量

Posted

技术标签:

【中文标题】在 groupby 操作 PySpark 中聚合列中的稀疏向量【英文标题】:Aggregate Sparse Vectors in column in groupby operation PySpark 【发布时间】:2020-11-27 16:14:23 【问题描述】:

问题:我正在尝试将稀疏向量组合成一个每个 id(它应该是按 id 对行分组后的聚合结果)。

我正在使用的原始 DataFrame(以及我对其应用了转换方法)如下所示:

输入

+---+-------+--------+--------+
| id|   col1|    col2|    col3|
+---+-------+--------+--------+
|  1|  [Red]|  [John]|  [Male]|
|  1| [Blue]| [Alice]|[Female]|
|  1|[Green]|[Celine]|  [Male]|
|  2|  [Red]|   [Bob]|  [Male]|
|  1|  [Red]|  [John]|  [Male]|
|  2|[Green]| [Alice]|[Female]|
+---+-------+--------+--------+

到目前为止所做的是两个转换:

在第一步中,我使用 CountVectorizer 来获取每行每列的特征向量,输出:

+---+-------------+-------------+-------------+
|id |vectors1     |vectors2     |vectors3     |
+---+-------------+-------------+-------------+
|1  |(3,[0],[1.0])|(4,[1],[1.0])|(2,[0],[1.0])|
|1  |(3,[2],[1.0])|(4,[0],[1.0])|(2,[1],[1.0])|
|1  |(3,[1],[1.0])|(4,[2],[1.0])|(2,[0],[1.0])|
|2  |(3,[0],[1.0])|(4,[3],[1.0])|(2,[0],[1.0])|
|1  |(3,[0],[1.0])|(4,[1],[1.0])|(2,[0],[1.0])|
|2  |(3,[1],[1.0])|(4,[0],[1.0])|(2,[1],[1.0])|
+---+-------------+-------------+-------------+

在第二步(基于之前的输出),我确实使用了VectorAssembler 将所有这些列组合成一个名为features的列:

+---+-------------------------+
|id |features                 |
+---+-------------------------+
|1  |(9,[0,4,7],[1.0,1.0,1.0])|
|1  |(9,[2,3,8],[1.0,1.0,1.0])|
|1  |(9,[1,5,7],[1.0,1.0,1.0])|
|2  |(9,[0,6,7],[1.0,1.0,1.0])|
|1  |(9,[0,4,7],[1.0,1.0,1.0])|
|2  |(9,[1,3,8],[1.0,1.0,1.0])|
+---+-------------------------+

预期的解决方案/输出/数据帧:我想要实现的是找出 id 上的 groupby 操作,然后是一些可以转换第二个输出(或者可能是第一个输出)的特定聚合函数) 进入这个:

+--+-------------------------------------------------------+
|id|features                                               |
+--+-------------------------------------------------------+
|1 |(9,[0,1,2,3,4,5,7,8],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0])|
|2 |            (9,[0,1,3,6,7,8],[1.0,1.0,1.0,1.0,1.0,1.0])|
+--+-------------------------------------------------------+

我可能错了,但我很可能正在寻找可以对所有稀疏向量求和的东西,并且只组合它们的内部数组...

*也是我正在寻找的,但作为一个选项,可以在该 agg 函数中假设数组中特定特征的出现次数,因此可以将特征列转换为:

+--+-------------------------------------------------------+
|id|features                                               |
+--+-------------------------------------------------------+
|1 |(9,[0,1,2,3,4,5,7,8],[2.0,1.0,1.0,1.0,2.0,1.0,3.0,1.0])| # 0: 2 times, 4: 2 times, 7: 3 times
|2 |            (9,[0,1,3,6,7,8],[1.0,1.0,1.0,1.0,1.0,1.0])|
+--+-------------------------------------------------------+

【问题讨论】:

请阅读ml标签的描述。 【参考方案1】:

我遇到了一个非常相似的问题,刚刚找到了一个使用 UDF 的可怕解决方案。

从您提供的示例开始:

from pyspark.ml.linalg import SparseVector, DenseVector
import pyspark.sql.functions as F
df = sc.parallelize([
    (1, SparseVector(9,[0,4,7],[1.0,1.0,1.0])),
    (1, SparseVector(9,[2,3,8],[1.0,1.0,1.0])),
    (1, SparseVector(9,[1,5,7],[1.0,1.0,1.0])),
    (2, SparseVector(9,[0,6,7],[1.0,1.0,1.0])),
    (1, SparseVector(9,[0,4,7],[1.0,1.0,1.0])),
    (2, SparseVector(9,[1,3,8],[1.0,1.0,1.0])),
]).toDF(["id", "features"])

我创建了这个 UDF 来添加向量:

from pyspark.ml.linalg import Vectors, VectorUDT
import numpy as np

@F.udf(returnType=VectorUDT())
def elementwise_sum(vectors):
    res = None
    for vec in vectors:
        if res is None:
            res = vec
        else:
            res = np.add(vec,res)
    return SparseVector(len(res),k: v for k,v in enumerate(res) if v != 0)

这样你就可以聚合向量并返回一个结果向量

df = df.groupBy('id').agg(elementwise_sum(F.collect_list('features')).alias('features'))
df.show(10,False)

+---+-------------------------------------------------------+
|id |features_raw                                           |
+---+-------------------------------------------------------+
|1  |(9,[0,1,2,3,4,5,7,8],[2.0,1.0,1.0,1.0,2.0,1.0,3.0,1.0])|
|2  |(9,[0,1,3,6,7,8],[1.0,1.0,1.0,1.0,1.0,1.0])            |
+---+-------------------------------------------------------+

【讨论】:

以上是关于在 groupby 操作 PySpark 中聚合列中的稀疏向量的主要内容,如果未能解决你的问题,请参考以下文章

在 pyspark 中,是不是可以使用 1 个 groupBy 进行 2 个聚合?

pyspark:聚合列中最频繁的值

在 pyspark 中聚合 Kolmogorov Smirnov 测试

PySpark:如何在宽度可变的列数组上聚合?

Pyspark 将 StructType 列聚合为每行的元素数组 [重复]

具有聚合唯一值的pyspark dataframe groupby [重复]