pyspark 数据帧上的向量操作

Posted

技术标签:

【中文标题】pyspark 数据帧上的向量操作【英文标题】:Vector operation on pyspark dataframe 【发布时间】:2021-12-13 01:53:59 【问题描述】:

我是 pyspark 的新手。

我的数据框:

df = spark.createDataFrame([[10,  8], [3,  5], [1,  3], [1,  5], [2,  8], [8,  7]], list('AB'))
df.show()

+---+---+
|  A|  B|
+---+---+
| 10|  8|
|  3|  5|
|  1|  3|
|  1|  5|
|  2|  8|
|  8|  7|
+---+---+

通过VectorAssembler将 col'A' & col'B' 转换为向量:

from pyspark.ml.feature import VectorAssembler,Normalizer
Vector = VectorAssembler(inputCols=['A','B'], outputCol="Vector_AB").transform(df)

Normalizer的单位向量_AB:

Vector = Normalizer(inputCol="Vector_AB",outputCol="Unit_AB",p=2).transform(Vector)

+---+---+----------+--------------------+
|  A|  B| Vector_AB|             Unit_AB|
+---+---+----------+--------------------+
| 10|  8|[10.0,8.0]|[0.78086880944303...|
|  3|  5| [3.0,5.0]|[0.51449575542752...|
|  1|  3| [1.0,3.0]|[0.31622776601683...|
|  1|  5| [1.0,5.0]|[0.19611613513818...|
|  2|  8| [2.0,8.0]|[0.24253562503633...|
|  8|  7| [8.0,7.0]|[0.75257669470687...|
+---+---+----------+--------------------+

如何计算Vector_AB的内积? (2个规范)

喜欢, inputCol:'Vector_AB'-->[10.0,8.0],得到outputCol:Inner_Product_AB-->(10^2+8^2) = 164

我尝试:

Vector = Vector.withColumn('Inner_Product_AB', Vector['A']*Vector['A']+Vector['B']*Vector['B'])

有没有内置函数可以得到这个结果?

我想要的数据框:

+---+---+----------+--------------------+----------------+
|  A|  B| Vector_AB|             Norm_AB|Inner_Product_AB|
+---+---+----------+--------------------+----------------+
| 10|  8|[10.0,8.0]|[0.78086880944303...|             164|
|  3|  5| [3.0,5.0]|[0.51449575542752...|              34|
|  1|  3| [1.0,3.0]|[0.31622776601683...|              10|
|  1|  5| [1.0,5.0]|[0.19611613513818...|              26|
|  2|  8| [2.0,8.0]|[0.24253562503633...|              68|
|  8|  7| [8.0,7.0]|[0.75257669470687...|             113|
+---+---+----------+--------------------+----------------+

那我想做向量运算:col['Norm_AB']/col['Inner_Product_AB']

有没有可以做这个操作的内置函数?

【问题讨论】:

【参考方案1】:

Vector_AB的内积如何计算? (2个规范)

一种方法是使用内置函数 dot 即内积来定义一个对 pyspark.ml.linalg.DenseVector 对象进行操作的 UDF:

dot_prod_udf = F.udf(lambda v: int(v.dot(v)), LongType())

例子:

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import VectorUDT
from pyspark.sql.types import FloatType


data = [
    "A": 10, "B": 8,
    "A": 3, "B": 5,
    "A": 1, "B": 3,
    "A": 1, "B": 5,
    "A": 2, "B": 8,
    "A": 8, "B": 7,
]

spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame(data)

Vector = VectorAssembler(inputCols=["A", "B"], outputCol="Vector_AB").transform(df)

dot_prod_udf = F.udf(lambda v: float(v.dot(v)), FloatType())
norm_udf = F.udf(lambda x, y: x / y, VectorUDT())

Vector = Vector.withColumn("Inner_Product_AB", dot_prod_udf("Vector_AB"))
Vector = Vector.withColumn("Inner_Product_AB_sqrt", F.sqrt("Inner_Product_AB"))
Vector = Vector.withColumn("Norm_AB", norm_udf("Vector_AB", "Inner_Product_AB_sqrt"))

结果:

+---+---+----------+----------------+---------------------+----------------------------------------+
|A  |B  |Vector_AB |Inner_Product_AB|Inner_Product_AB_sqrt|Norm_AB                                 |
+---+---+----------+----------------+---------------------+----------------------------------------+
|10 |8  |[10.0,8.0]|164.0           |12.806248474865697   |[0.7808688094430304,0.6246950475544243] |
|3  |5  |[3.0,5.0] |34.0            |5.830951894845301    |[0.5144957554275265,0.8574929257125441] |
|1  |3  |[1.0,3.0] |10.0            |3.1622776601683795   |[0.31622776601683794,0.9486832980505138]|
|1  |5  |[1.0,5.0] |26.0            |5.0990195135927845   |[0.19611613513818404,0.9805806756909202]|
|2  |8  |[2.0,8.0] |68.0            |8.246211251235321    |[0.24253562503633297,0.9701425001453319]|
|8  |7  |[8.0,7.0] |113.0           |10.63014581273465    |[0.7525766947068778,0.658504607868518]  |
+---+---+----------+----------------+---------------------+----------------------------------------+

【讨论】:

如何操作col['Vector_AB']/col['Inner_Product_AB'] @Drizzle 请查看更新后的答案。我包括了计算dot 产品的示例,它的sqrt 和它们Norm_AB 通过除以DenseVector

以上是关于pyspark 数据帧上的向量操作的主要内容,如果未能解决你的问题,请参考以下文章

pyspark 数据帧上的复杂逻辑,包括前一行现有值以及动态生成的前一行值

在 pandas 数据帧上应用 Pyspark 管道

在 pyspark 中的数据帧上应用 udf 后出错

如何在 pyspark 中的数据帧上使用 fuzz.ratio

在 pyspark 数据帧上减少和 Lambda

如何在 pyspark 数据帧上应用 group by 并对结果对象进行转换