在 groupby 操作 PySpark 中聚合列中的稀疏向量
Posted
技术标签:
【中文标题】在 groupby 操作 PySpark 中聚合列中的稀疏向量【英文标题】:Aggregate Sparse Vectors in column in groupby operation PySpark 【发布时间】:2020-11-27 16:14:23 【问题描述】:问题:我正在尝试将稀疏向量组合成一个每个 id(它应该是按 id 对行分组后的聚合结果)。
我正在使用的原始 DataFrame(以及我对其应用了转换方法)如下所示:
输入:
+---+-------+--------+--------+
| id| col1| col2| col3|
+---+-------+--------+--------+
| 1| [Red]| [John]| [Male]|
| 1| [Blue]| [Alice]|[Female]|
| 1|[Green]|[Celine]| [Male]|
| 2| [Red]| [Bob]| [Male]|
| 1| [Red]| [John]| [Male]|
| 2|[Green]| [Alice]|[Female]|
+---+-------+--------+--------+
到目前为止所做的是两个转换:
在第一步中,我使用 CountVectorizer
来获取每行每列的特征向量,输出:
+---+-------------+-------------+-------------+
|id |vectors1 |vectors2 |vectors3 |
+---+-------------+-------------+-------------+
|1 |(3,[0],[1.0])|(4,[1],[1.0])|(2,[0],[1.0])|
|1 |(3,[2],[1.0])|(4,[0],[1.0])|(2,[1],[1.0])|
|1 |(3,[1],[1.0])|(4,[2],[1.0])|(2,[0],[1.0])|
|2 |(3,[0],[1.0])|(4,[3],[1.0])|(2,[0],[1.0])|
|1 |(3,[0],[1.0])|(4,[1],[1.0])|(2,[0],[1.0])|
|2 |(3,[1],[1.0])|(4,[0],[1.0])|(2,[1],[1.0])|
+---+-------------+-------------+-------------+
在第二步(基于之前的输出),我确实使用了VectorAssembler
将所有这些列组合成一个名为features的列:
+---+-------------------------+
|id |features |
+---+-------------------------+
|1 |(9,[0,4,7],[1.0,1.0,1.0])|
|1 |(9,[2,3,8],[1.0,1.0,1.0])|
|1 |(9,[1,5,7],[1.0,1.0,1.0])|
|2 |(9,[0,6,7],[1.0,1.0,1.0])|
|1 |(9,[0,4,7],[1.0,1.0,1.0])|
|2 |(9,[1,3,8],[1.0,1.0,1.0])|
+---+-------------------------+
预期的解决方案/输出/数据帧:我想要实现的是找出 id 上的 groupby 操作,然后是一些可以转换第二个输出(或者可能是第一个输出)的特定聚合函数) 进入这个:
+--+-------------------------------------------------------+
|id|features |
+--+-------------------------------------------------------+
|1 |(9,[0,1,2,3,4,5,7,8],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0])|
|2 | (9,[0,1,3,6,7,8],[1.0,1.0,1.0,1.0,1.0,1.0])|
+--+-------------------------------------------------------+
我可能错了,但我很可能正在寻找可以对所有稀疏向量求和的东西,并且只组合它们的内部数组...
*也是我正在寻找的,但作为一个选项,可以在该 agg 函数中假设数组中特定特征的出现次数,因此可以将特征列转换为:
+--+-------------------------------------------------------+
|id|features |
+--+-------------------------------------------------------+
|1 |(9,[0,1,2,3,4,5,7,8],[2.0,1.0,1.0,1.0,2.0,1.0,3.0,1.0])| # 0: 2 times, 4: 2 times, 7: 3 times
|2 | (9,[0,1,3,6,7,8],[1.0,1.0,1.0,1.0,1.0,1.0])|
+--+-------------------------------------------------------+
【问题讨论】:
请阅读ml标签的描述。 【参考方案1】:我遇到了一个非常相似的问题,刚刚找到了一个使用 UDF 的可怕解决方案。
从您提供的示例开始:
from pyspark.ml.linalg import SparseVector, DenseVector
import pyspark.sql.functions as F
df = sc.parallelize([
(1, SparseVector(9,[0,4,7],[1.0,1.0,1.0])),
(1, SparseVector(9,[2,3,8],[1.0,1.0,1.0])),
(1, SparseVector(9,[1,5,7],[1.0,1.0,1.0])),
(2, SparseVector(9,[0,6,7],[1.0,1.0,1.0])),
(1, SparseVector(9,[0,4,7],[1.0,1.0,1.0])),
(2, SparseVector(9,[1,3,8],[1.0,1.0,1.0])),
]).toDF(["id", "features"])
我创建了这个 UDF 来添加向量:
from pyspark.ml.linalg import Vectors, VectorUDT
import numpy as np
@F.udf(returnType=VectorUDT())
def elementwise_sum(vectors):
res = None
for vec in vectors:
if res is None:
res = vec
else:
res = np.add(vec,res)
return SparseVector(len(res),k: v for k,v in enumerate(res) if v != 0)
这样你就可以聚合向量并返回一个结果向量
df = df.groupBy('id').agg(elementwise_sum(F.collect_list('features')).alias('features'))
df.show(10,False)
+---+-------------------------------------------------------+
|id |features_raw |
+---+-------------------------------------------------------+
|1 |(9,[0,1,2,3,4,5,7,8],[2.0,1.0,1.0,1.0,2.0,1.0,3.0,1.0])|
|2 |(9,[0,1,3,6,7,8],[1.0,1.0,1.0,1.0,1.0,1.0]) |
+---+-------------------------------------------------------+
【讨论】:
以上是关于在 groupby 操作 PySpark 中聚合列中的稀疏向量的主要内容,如果未能解决你的问题,请参考以下文章
在 pyspark 中,是不是可以使用 1 个 groupBy 进行 2 个聚合?
在 pyspark 中聚合 Kolmogorov Smirnov 测试