何时合并发生在Spark中的用户定义聚合函数UDAF中

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了何时合并发生在Spark中的用户定义聚合函数UDAF中相关的知识,希望对你有一定的参考价值。

我想知道Spark将在哪些情况下执行合并作为UDAF功能的一部分。

动机:我在Spark项目中使用了许多UDAF函数。我经常想回答一个问题:

在30天的窗口中,当前交易在同一国家/地区进行了多少次信用卡交易?

窗口将从当前事务开始,但不会将其包含在计数中。它需要当前交易的价值来了解过去30天内要计算的国家/地区。

val rollingWindow = Window
      .partitionBy(partitionByColumn)
      .orderBy(orderByColumn.desc)
      .rangeBetween(0, windowSize)

df.withColumn(
  outputColumnName,
  customUDAF(inputColumn, orderByColumn).over(rollingWindow))

我写了我的customUDAF进行计数。我总是使用.orderBy(orderByColumn.desc)并且由于.desc,当前交易在计算期间显示为窗口中的第一个。

UDAF函数需要实现merge函数,该函数在并行计算中合并两个中间聚合缓冲区。如果发生任何合并,我的current transaction对于不同的缓冲区可能不一样,并且UDAF的结果将是不正确的。

我写了一个UDAF函数来计算我的数据集中的合并次数,并且只保留窗口中的第一个事务以与当前事务进行比较。

 class FirstUDAF() extends UserDefinedAggregateFunction {

  def inputSchema = new StructType().add("x", StringType)
    .add("y", StringType)

  def bufferSchema = new StructType()
    .add("first", StringType)
    .add("numMerge", IntegerType)

  def dataType = new StructType()
    .add("firstCode", StringType)
    .add("numMerge", IntegerType)

  def deterministic = true

  def initialize(buffer: MutableAggregationBuffer) = {
    buffer(0) = ""
    buffer(1) = 1
  }

  def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
    if (buffer.getString(0) == "")
      buffer(0) = input.getString(0)

  }

  def merge(buffer1: MutableAggregationBuffer, buffer2: Row) = {
    buffer1(1) = buffer1.getInt(1) + buffer2.getInt(1)
  }

  def evaluate(buffer: Row) = buffer
}

当我在具有16个CPU的本地主服务器上使用spark 2.0.1运行它时,从来没有任何合并,并且窗口中的第一个事务始终是当前事务。这就是我要的。在不久的将来,我将在x100更大的数据集和真正的分布式Spark集群上运行我的代码,并想知道合并是否可以在那里发生。

问题:

  • 在哪些情况下/条件合并发生在UDAF?
  • Windows与orderBy有没有合并?
  • 有可能告诉Spark不要合并吗?
答案

在哪些情况下/条件合并发生在UDAF?

当在混洗后合并部分应用聚合函数(“映射侧聚合”)时,调用merge(“减少副聚合”)。

Windows与orderBy有没有合并?

在目前的实施中永远不会。至于现在窗口函数只是花哨的groupByKey,并没有部分聚合。这当然是实施细节,未来可能会更改,恕不另行通知。

有可能告诉Spark不要合并吗?

它不是。但是,如果数据已经由聚合键分区,则不需要merge,只使用combine

最后:

在30天的窗口中,当前交易在同一国家/地区进行了多少次信用卡交易?

不要求UDAFs或窗口功能。我可能会创建使用o.a.s.sql.functions.window的翻滚窗口,按用户,国家和窗口聚合并与输入连接。

以上是关于何时合并发生在Spark中的用户定义聚合函数UDAF中的主要内容,如果未能解决你的问题,请参考以下文章

用户定义的聚合函数 Spark Java - 合并问题

极简spark教程spark聚合函数

如何将数组传递给 Spark (UDAF) 中的用户定义聚合函数

如何在 Spark SQL 中定义和使用用户定义的聚合函数?

在 pyspark 中应用用户定义的聚合函数的替代方法

P1008(“禁止与用户声明的构造函数聚合”)在实践中何时有用?