pyspark 数据帧上的向量操作
Posted
技术标签:
【中文标题】pyspark 数据帧上的向量操作【英文标题】:Vector operation on pyspark dataframe 【发布时间】:2021-12-13 01:53:59 【问题描述】:我是 pyspark 的新手。
我的数据框:
df = spark.createDataFrame([[10, 8], [3, 5], [1, 3], [1, 5], [2, 8], [8, 7]], list('AB'))
df.show()
+---+---+
| A| B|
+---+---+
| 10| 8|
| 3| 5|
| 1| 3|
| 1| 5|
| 2| 8|
| 8| 7|
+---+---+
通过VectorAssembler
将 col'A' & col'B' 转换为向量:
from pyspark.ml.feature import VectorAssembler,Normalizer
Vector = VectorAssembler(inputCols=['A','B'], outputCol="Vector_AB").transform(df)
Normalizer
的单位向量_AB:
Vector = Normalizer(inputCol="Vector_AB",outputCol="Unit_AB",p=2).transform(Vector)
+---+---+----------+--------------------+
| A| B| Vector_AB| Unit_AB|
+---+---+----------+--------------------+
| 10| 8|[10.0,8.0]|[0.78086880944303...|
| 3| 5| [3.0,5.0]|[0.51449575542752...|
| 1| 3| [1.0,3.0]|[0.31622776601683...|
| 1| 5| [1.0,5.0]|[0.19611613513818...|
| 2| 8| [2.0,8.0]|[0.24253562503633...|
| 8| 7| [8.0,7.0]|[0.75257669470687...|
+---+---+----------+--------------------+
如何计算Vector_AB的内积? (2个规范)
喜欢,
inputCol:'Vector_AB'
-->[10.0,8.0]
,得到outputCol:Inner_Product_AB
-->(10^2+8^2) = 164
我尝试:
Vector = Vector.withColumn('Inner_Product_AB', Vector['A']*Vector['A']+Vector['B']*Vector['B'])
有没有内置函数可以得到这个结果?
我想要的数据框:
+---+---+----------+--------------------+----------------+
| A| B| Vector_AB| Norm_AB|Inner_Product_AB|
+---+---+----------+--------------------+----------------+
| 10| 8|[10.0,8.0]|[0.78086880944303...| 164|
| 3| 5| [3.0,5.0]|[0.51449575542752...| 34|
| 1| 3| [1.0,3.0]|[0.31622776601683...| 10|
| 1| 5| [1.0,5.0]|[0.19611613513818...| 26|
| 2| 8| [2.0,8.0]|[0.24253562503633...| 68|
| 8| 7| [8.0,7.0]|[0.75257669470687...| 113|
+---+---+----------+--------------------+----------------+
那我想做向量运算:col['Norm_AB']/col['Inner_Product_AB']
有没有可以做这个操作的内置函数?
【问题讨论】:
【参考方案1】:Vector_AB的内积如何计算? (2个规范)
一种方法是使用内置函数 dot
即内积来定义一个对 pyspark.ml.linalg.DenseVector
对象进行操作的 UDF:
dot_prod_udf = F.udf(lambda v: int(v.dot(v)), LongType())
例子:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import VectorUDT
from pyspark.sql.types import FloatType
data = [
"A": 10, "B": 8,
"A": 3, "B": 5,
"A": 1, "B": 3,
"A": 1, "B": 5,
"A": 2, "B": 8,
"A": 8, "B": 7,
]
spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame(data)
Vector = VectorAssembler(inputCols=["A", "B"], outputCol="Vector_AB").transform(df)
dot_prod_udf = F.udf(lambda v: float(v.dot(v)), FloatType())
norm_udf = F.udf(lambda x, y: x / y, VectorUDT())
Vector = Vector.withColumn("Inner_Product_AB", dot_prod_udf("Vector_AB"))
Vector = Vector.withColumn("Inner_Product_AB_sqrt", F.sqrt("Inner_Product_AB"))
Vector = Vector.withColumn("Norm_AB", norm_udf("Vector_AB", "Inner_Product_AB_sqrt"))
结果:
+---+---+----------+----------------+---------------------+----------------------------------------+
|A |B |Vector_AB |Inner_Product_AB|Inner_Product_AB_sqrt|Norm_AB |
+---+---+----------+----------------+---------------------+----------------------------------------+
|10 |8 |[10.0,8.0]|164.0 |12.806248474865697 |[0.7808688094430304,0.6246950475544243] |
|3 |5 |[3.0,5.0] |34.0 |5.830951894845301 |[0.5144957554275265,0.8574929257125441] |
|1 |3 |[1.0,3.0] |10.0 |3.1622776601683795 |[0.31622776601683794,0.9486832980505138]|
|1 |5 |[1.0,5.0] |26.0 |5.0990195135927845 |[0.19611613513818404,0.9805806756909202]|
|2 |8 |[2.0,8.0] |68.0 |8.246211251235321 |[0.24253562503633297,0.9701425001453319]|
|8 |7 |[8.0,7.0] |113.0 |10.63014581273465 |[0.7525766947068778,0.658504607868518] |
+---+---+----------+----------------+---------------------+----------------------------------------+
【讨论】:
如何操作col['Vector_AB']/col['Inner_Product_AB']
?
@Drizzle 请查看更新后的答案。我包括了计算dot
产品的示例,它的sqrt
和它们Norm_AB
通过除以DenseVector
。以上是关于pyspark 数据帧上的向量操作的主要内容,如果未能解决你的问题,请参考以下文章
pyspark 数据帧上的复杂逻辑,包括前一行现有值以及动态生成的前一行值