如何定义自定义聚合函数来对一列向量求和?
Posted
技术标签:
【中文标题】如何定义自定义聚合函数来对一列向量求和?【英文标题】:How to define a custom aggregation function to sum a column of Vectors? 【发布时间】:2016-02-27 06:24:51 【问题描述】:我有一个包含两列的 DataFrame,Int
类型的 ID
和 Vector
(org.apache.spark.mllib.linalg.Vector
) 类型的 Vec
。
DataFrame 如下所示:
ID,Vec
1,[0,0,5]
1,[4,0,1]
1,[1,2,1]
2,[7,5,0]
2,[3,3,4]
3,[0,8,1]
3,[0,0,1]
3,[7,7,7]
....
我想做一个groupBy($"ID")
然后通过对向量求和来对每个组内的行应用聚合。
上述示例的期望输出是:
ID,SumOfVectors
1,[5,2,7]
2,[10,8,4]
3,[7,15,9]
...
可用的聚合函数将不起作用,例如df.groupBy($"ID").agg(sum($"Vec")
将导致 ClassCastException。
如何实现自定义聚合函数,允许我对向量或数组进行求和或任何其他自定义操作?
【问题讨论】:
How can I define and use a User-Defined Aggregate Function in Spark SQL?的可能重复 如果有人试图在 pyspark 中做类似的事情,语法在这里:***.com/questions/54354915/… 【参考方案1】:火花 >= 3.0
您可以将Summarizer
与sum
一起使用
import org.apache.spark.ml.stat.Summarizer
df
.groupBy($"id")
.agg(Summarizer.sum($"vec").alias("vec"))
火花
就我个人而言,我不会为 UDAF 烦恼。不仅仅是冗长而且不是很快(Spark UDAF with ArrayType as bufferSchema performance issues)相反,我会简单地使用reduceByKey
/ foldByKey
:
import org.apache.spark.sql.Row
import breeze.linalg.DenseVector => BDV
import org.apache.spark.ml.linalg.Vector, Vectors
def dv(values: Double*): Vector = Vectors.dense(values.toArray)
val df = spark.createDataFrame(Seq(
(1, dv(0,0,5)), (1, dv(4,0,1)), (1, dv(1,2,1)),
(2, dv(7,5,0)), (2, dv(3,3,4)),
(3, dv(0,8,1)), (3, dv(0,0,1)), (3, dv(7,7,7)))
).toDF("id", "vec")
val aggregated = df
.rdd
.map case Row(k: Int, v: Vector) => (k, BDV(v.toDense.values))
.foldByKey(BDV.zeros[Double](3))(_ += _)
.mapValues(v => Vectors.dense(v.toArray))
.toDF("id", "vec")
aggregated.show
// +---+--------------+
// | id| vec|
// +---+--------------+
// | 1| [5.0,2.0,7.0]|
// | 2|[10.0,8.0,4.0]|
// | 3|[7.0,15.0,9.0]|
// +---+--------------+
只是为了比较一个“简单的”UDAF。所需的进口:
import org.apache.spark.sql.expressions.MutableAggregationBuffer,
UserDefinedAggregateFunction
import org.apache.spark.ml.linalg.Vector, Vectors, SQLDataTypes
import org.apache.spark.sql.types.StructType, ArrayType, DoubleType
import org.apache.spark.sql.Row
import scala.collection.mutable.WrappedArray
类定义:
class VectorSum (n: Int) extends UserDefinedAggregateFunction
def inputSchema = new StructType().add("v", SQLDataTypes.VectorType)
def bufferSchema = new StructType().add("buff", ArrayType(DoubleType))
def dataType = SQLDataTypes.VectorType
def deterministic = true
def initialize(buffer: MutableAggregationBuffer) =
buffer.update(0, Array.fill(n)(0.0))
def update(buffer: MutableAggregationBuffer, input: Row) =
if (!input.isNullAt(0))
val buff = buffer.getAs[WrappedArray[Double]](0)
val v = input.getAs[Vector](0).toSparse
for (i <- v.indices)
buff(i) += v(i)
buffer.update(0, buff)
def merge(buffer1: MutableAggregationBuffer, buffer2: Row) =
val buff1 = buffer1.getAs[WrappedArray[Double]](0)
val buff2 = buffer2.getAs[WrappedArray[Double]](0)
for ((x, i) <- buff2.zipWithIndex)
buff1(i) += x
buffer1.update(0, buff1)
def evaluate(buffer: Row) = Vectors.dense(
buffer.getAs[Seq[Double]](0).toArray)
还有一个用法示例:
df.groupBy($"id").agg(new VectorSum(3)($"vec") alias "vec").show
// +---+--------------+
// | id| vec|
// +---+--------------+
// | 1| [5.0,2.0,7.0]|
// | 2|[10.0,8.0,4.0]|
// | 3|[7.0,15.0,9.0]|
// +---+--------------+
另请参阅:How to find mean of grouped Vector columns in Spark SQL?。
【讨论】:
我知道诀窍是使用breeze.linalg.DensVector,为什么它可以工作而mllib.linalg 的密集向量没有? 问题是mllib.linalg.Vector
的Scala版本没有+
方法。
@oluies 除了 UDAF?您可以解构内部数组,单独聚合,然后重新创建。但是,如果您询问开箱即用的清洁解决方案,我不知道。
@zero323 我现在在 Sark 2.0 上尝试这个,我将向量传递给规范化器但没有成功:org.apache.spark.mllib.linalg.DenseVector 无法转换为 org. apache.spark.ml.linalg.Vector 是否有关于 spark 2.0 的更新?
@Rami 你需要o.a.s.ml.linalg
导入。【参考方案2】:
我建议以下(适用于 Spark 2.0.2 及更高版本),它可能已优化但非常好,您必须提前知道的一件事是创建 UDAF 实例时的矢量大小
import org.apache.spark.ml.linalg._
import org.apache.spark.mllib.linalg.WeightedSparseVector
import org.apache.spark.sql.expressions.MutableAggregationBuffer, UserDefinedAggregateFunction
import org.apache.spark.sql.types._
class VectorAggregate(val numFeatures: Int)
extends UserDefinedAggregateFunction
private type B = Map[Int, Double]
def inputSchema: StructType = StructType(StructField("vec", new VectorUDT()) :: Nil)
def bufferSchema: StructType =
StructType(StructField("agg", MapType(IntegerType, DoubleType)) :: Nil)
def initialize(buffer: MutableAggregationBuffer): Unit =
buffer.update(0, Map.empty[Int, Double])
def update(buffer: MutableAggregationBuffer, input: Row): Unit =
val zero = buffer.getAs[B](0)
input match
case Row(DenseVector(values)) => buffer.update(0, values.zipWithIndex.foldLeft(zero)case (acc,(v,i)) => acc.updated(i, v + acc.getOrElse(i,0d)))
case Row(SparseVector(_, indices, values)) => buffer.update(0, values.zip(indices).foldLeft(zero)case (acc,(v,i)) => acc.updated(i, v + acc.getOrElse(i,0d)))
def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit =
val zero = buffer1.getAs[B](0)
buffer1.update(0, buffer2.getAs[B](0).foldLeft(zero)case (acc,(i,v)) => acc.updated(i, v + acc.getOrElse(i,0d)))
def deterministic: Boolean = true
def evaluate(buffer: Row): Any =
val Row(agg: B) = buffer
val indices = agg.keys.toArray.sorted
Vectors.sparse(numFeatures,indices,indices.map(agg)).compressed
def dataType: DataType = new VectorUDT()
【讨论】:
【参考方案3】:使用 pyspark 3.0.0,这是我的版本,您可以使用 Summarizer 轻松完成。您的 col 需要是 DenseVector
的类型from pyspark.ml.stat import Summarizer
sdf.groupBy("ID").agg(Summarizer.mean(sdf.Vec)).show()
注意:pyspark中没有avg函数,但是可以使用mean方法
【讨论】:
以上是关于如何定义自定义聚合函数来对一列向量求和?的主要内容,如果未能解决你的问题,请参考以下文章
python自定义聚合函数,merge与transform的区别
Flink Table API & SQL 自定义 Aggregate 聚合函数
Flink Table API & SQL 自定义 Aggregate 聚合函数