使用 ArrayType 作为 bufferSchema 的 Spark UDAF 性能问题

Posted

技术标签:

【中文标题】使用 ArrayType 作为 bufferSchema 的 Spark UDAF 性能问题【英文标题】:Spark UDAF with ArrayType as bufferSchema performance issues 【发布时间】:2017-11-14 19:06:23 【问题描述】:

我正在开发一个返回元素数组的 UDAF。

每次更新的输入是索引和值的元组。

UDAF 所做的是将同一索引下的所有值相加。

例子:

对于输入(索引,值):(2,1), (3,1), (2,3)

应该返回 (0,0,4,1,...,0)

逻辑工作正常,但 update 方法 有问题,我的实现只为每行更新 1 个单元格,但该方法中的最后一个分配实际上复制整个数组 - 这是多余且非常耗时的。

仅此分配就占了我的查询执行时间的 98%

我的问题是,我怎样才能减少这个时间?是否可以在缓冲区数组中分配 1 个值而不必替换整个缓冲区?

P.S.:我正在使用 Spark 1.6,我无法很快升级它,所以请坚持使用适用于该版本的解决方案。

class SumArrayAtIndexUDAF() extends UserDefinedAggregateFunction

  val bucketSize = 1000

  def inputSchema: StructType =  StructType(StructField("index",LongType) :: StructField("value",LongType) :: Nil)

  def dataType: DataType = ArrayType(LongType)

  def deterministic: Boolean = true

  def bufferSchema: StructType = 
    StructType(
      StructField("buckets", ArrayType(LongType)) :: Nil  
    )
  

  override def initialize(buffer: MutableAggregationBuffer): Unit = 
    buffer(0) = new Array[Long](bucketSize)
  

  override def update(buffer: MutableAggregationBuffer, input: Row): Unit = 
    val index = input.getLong(0)
    val value = input.getLong(1)

    val arr = buffer.getAs[mutable.WrappedArray[Long]](0)

    buffer(0) = arr   // TODO THIS TAKES WAYYYYY TOO LONG - it actually copies the entire array for every call to this method (which essentially updates only 1 cell)
  

    override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = 
    val arr1 = buffer1.getAs[mutable.WrappedArray[Long]](0)
    val arr2 = buffer2.getAs[mutable.WrappedArray[Long]](0)

    for(i <- arr1.indices)
      arr1.update(i, arr1(i) + arr2(i))
    

    buffer1(0) = arr1
  

  override def evaluate(buffer: Row): Any = 
    buffer.getAs[mutable.WrappedArray[Long]](0)
  

【问题讨论】:

到目前为止,我找到了this link,这可能有助于解释更多事情。我希望有更好的解决方案。 【参考方案1】:

TL;DR要么不使用 UDAF,要么使用原始类型代替 ArrayType

没有UserDefinedFunction

这两种解决方案都应该避免在内部和外部表示之间进行昂贵的处理。

使用标准聚合和pivot

这使用标准 SQL 聚合。虽然在内部进行了优化,但当键的数量和数组的大小增加时可能会很昂贵。

给定输入:

val df = Seq((1, 2, 1), (1, 3, 1), (1, 2, 3)).toDF("id", "index", "value")

你可以:

import org.apache.spark.sql.functions.array, coalesce, col, lit

val nBuckets = 10
@transient val values = array(
  0 until nBuckets map (c => coalesce(col(c.toString), lit(0))): _*
)

df
  .groupBy("id")
  .pivot("index", 0 until nBuckets)
  .sum("value")
  .select($"id", values.alias("values"))
+---+--------------------+                                                      
| id|              values|
+---+--------------------+
|  1|[0, 0, 4, 1, 0, 0...|
+---+--------------------+

将 RDD API 与 combineByKey / aggregateByKey 一起使用。

带有可变缓冲区的普通旧 byKey 聚合。没有花里胡哨,但应该在广泛的输入范围内表现得相当好。如果你怀疑输入是稀疏的,你可以考虑更有效的中间表示,比如可变的Map

rdd
  .aggregateByKey(Array.fill(nBuckets)(0L))(
     case (acc, (index, value)) =>  acc(index) += value; acc ,
    (acc1, acc2) =>  for (i <- 0 until nBuckets) acc1(i) += acc2(i); acc1
  ).toDF
+---+--------------------+
| _1|                  _2|
+---+--------------------+
|  1|[0, 0, 4, 1, 0, 0...|
+---+--------------------+

UserDefinedFunction 与原始类型一起使用

据我了解,性能瓶颈是ArrayConverter.toCatalystImpl

看起来它是为每个调用MutableAggregationBuffer.update 调用的,然后为每个Row 分配新的GenericArrayData

如果我们将bufferSchema 重新定义为:

def bufferSchema: StructType = 
  StructType(
    0 to nBuckets map (i => StructField(s"x$i", LongType))
  )

updatemerge 都可以表示为缓冲区中原始值的普通替换。调用链将保持相当长,但it won't require copies / conversions 和疯狂的分配。省略 null 检查您将需要类似于

val index = input.getLong(0)
buffer.update(index, buffer.getLong(index) + input.getLong(1))

for(i <- 0 to nBuckets)
  buffer1.update(i, buffer1.getLong(i) + buffer2.getLong(i))

分别。

最后evaluate应该把Row转换成输出Seq

 for (i <- 0 to nBuckets)  yield buffer.getLong(i)

请注意,在此实现中,可能的瓶颈是merge。虽然它不应该引入任何新的性能问题,但对于 M 个存储桶,对 merge 的每次调用都是 O(M)

使用 K 个唯一键和 P 个分区,在最坏的情况下,它将被调用 M * K 次,其中每个键,在每个分区上至少发生一次。这有效地将merge 组件的同谋度提高到O(M * N * K)

一般而言,您对此无能为力。但是,如果您对数据分布做出特定假设(数据是稀疏的,键分布是均匀的),您可以稍微简化一下,然后先洗牌:

df
  .repartition(n, $"key")
  .groupBy($"key")
  .agg(SumArrayAtIndexUDAF($"index", $"value"))

如果满足假设,它应该:

通过改组稀疏对而不是类似于密集数组的Rows,以违反直觉的方式减少改组大小。 仅使用更新聚合数据(每个 O(1))可能仅作为索引的子集触及。

但是,如果不满足一个或两个假设,您可以预期 shuffle 大小会增加,而更新次数将保持不变。同时,数据倾斜会使事情变得比update - shuffle - merge 的情况更糟。

使用Aggregator 和“强”输入Dataset

import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.Encoder, Encoders

class SumArrayAtIndex[I](f: I => (Int, Long))(bucketSize: Int)  extends Aggregator[I, Array[Long], Seq[Long]]
    with Serializable 
  def zero = Array.fill(bucketSize)(0L)
  def reduce(acc: Array[Long], x: I) = 
    val (i, v) = f(x)
    acc(i) += v
    acc
  

  def merge(acc1: Array[Long], acc2: Array[Long]) = 
    for 
      i <- 0 until bucketSize
     acc1(i) += acc2(i)
    acc1
  

  def finish(acc: Array[Long]) = acc.toSeq

  def bufferEncoder: Encoder[Array[Long]] = Encoders.kryo[Array[Long]]
  def outputEncoder: Encoder[Seq[Long]] = ExpressionEncoder()

可以如下图使用

val ds = Seq((1, (1, 3L)), (1, (2, 5L)), (1, (0, 1L)), (1, (4, 6L))).toDS

ds
  .groupByKey(_._1)
  .agg(new SumArrayAtIndex[(Int, (Int, Long))](_._2)(10).toColumn)
  .show(false)
+-----+-------------------------------+
|value|SumArrayAtIndex(scala.Tuple2)  |
+-----+-------------------------------+
|1    |[1, 3, 5, 0, 6, 0, 0, 0, 0, 0] |
|2    |[0, 11, 0, 0, 0, 0, 0, 0, 0, 0]|
+-----+-------------------------------+

注意

另请参阅SPARK-27296 - 用户定义的聚合函数 (UDAF) 存在主要的效率问题

【讨论】:

以上是关于使用 ArrayType 作为 bufferSchema 的 Spark UDAF 性能问题的主要内容,如果未能解决你的问题,请参考以下文章

使用 ArrayType 列将 UDF 重写为 pandas udf

SyntaxError:使用 ArrayType 创建 DataFrame 时语法无效

将 Spark 中的多个 ArrayType 列合并为一个 ArrayType 列

聚合 ArrayType 行由使用高阶函数的浮点数组成

使用 pyspark 将 StructType、ArrayType 转换/转换为 StringType(单值)

在 PySpark 中将 ArrayType(StringType()) 的列转换为 ArrayType(DateType())