Scala 聚合函数的示例

Posted

技术标签:

【中文标题】Scala 聚合函数的示例【英文标题】:Example of the Scala aggregate function 【发布时间】:2011-10-19 04:38:35 【问题描述】:

我一直在寻找,但找不到我能理解的 Scala 中 aggregate 函数的示例或讨论。它看起来很强大。

这个函数可以用来减少元组的值来创建一个多图类型的集合吗?例如:

val list = Seq(("one", "i"), ("two", "2"), ("two", "ii"), ("one", "1"), ("four", "iv"))

应用聚合后:

Seq(("one" -> Seq("i","1")), ("two" -> Seq("2", "ii")), ("four" -> Seq("iv"))

另外,你能给出参数zsegopcombop的例子吗?我不清楚这些参数的作用。

【问题讨论】:

【参考方案1】:

让我们看看一些 ascii 艺术是否没有帮助。考虑aggregate的类型签名:

def aggregate [B] (z: B)(seqop: (B, A) ⇒ B, combop: (B, B) ⇒ B): B

另外,请注意A 指的是集合的类型。所以,假设我们在这个集合中有 4 个元素,那么 aggregate 可能会像这样工作:

z   A   z   A   z   A   z   A
 \ /     \ /seqop\ /     \ /    
  B       B       B       B
    \   /  combop   \   /
      B _           _ B
         \ combop  /
              B

让我们看一个实际的例子。假设我有一个GenSeq("This", "is", "an", "example"),我想知道其中有多少个字符。我可以写如下:

注意在下面的 sn-p 代码中使用了par。传递给聚合的第二个函数是在计算单个序列之后调用的函数。 Scala 只能对可以并行化的集合执行此操作。

import scala.collection.GenSeq
val seq = GenSeq("This", "is", "an", "example")
val chars = seq.par.aggregate(0)(_ + _.length, _ + _)

所以,首先它会计算:

0 + "This".length     // 4
0 + "is".length       // 2
0 + "an".length       // 2
0 + "example".length  // 7

它接下来会做什么无法预测(组合结果的方法不止一种),但它可能会这样做(就像上面的 ascii 艺术):

4 + 2 // 6
2 + 7 // 9

在什么时候结束

6 + 9 // 15

给出最终结果。现在,这在结构上与foldLeft 有点相似,但它有一个额外的功能(B, B) => B,这是 fold 没有的。但是,此功能使其能够并行工作!

例如,考虑一下,四个初始计算中的每一个都是相互独立的,并且可以并行完成。接下来的两个(导致 6 和 9)可以在它们所依赖的计算完成后启动,但是这两个可以并行运行。

如上所述并行化的 7 次计算只需同时进行 3 次串行计算。

实际上,对于如此小的集合,同步计算的成本将足以抵消任何收益。此外,如果你折叠它,总共只需要 4 次计算。但是,一旦您的收藏变得更大,您就会开始看到一些真正的收益。

另一方面,考虑foldLeft。因为它没有附加功能,所以它不能并行任何计算:

(((0 + "This".length) + "is".length) + "an".length) + "example".length

必须先计算每个内括号,然后才能进行外括号。

【讨论】:

我们可以说这类似于map reduce,seqop扮演mapper函数,combop扮演reducer函数?我也是一个新手,并试图理解语义。感谢 ASCII 艺术,绝对有帮助! 是的。树聚合结构是aggregate 存在的关键。 这很令人困惑,因为在您的示例中 combop 从未被调用。你可以通过简单地为第二个参数做任何你想做的事情来看到这一点。例如返回数字 11242414,你​​仍然得到相同的答案 15。 我对此进行了更多研究,发现从未调用 combop 函数,因为您使用的集合不可并行化。如果您在聚合之前调用par,它将确保调用组合。【参考方案2】:

聚合函数不这样做(除了它是一个非常通用的函数,并且可以用来这样做)。你想要groupBy。至少接近。当您从Seq[(String, String)] 开始时,您通过获取元组中的第一项(即(String, String) => String),它将返回Map[String, Seq[(String, String)])进行分组。然后,您必须丢弃 Seq[String, String)] 值中的第一个参数。

所以

list.groupBy(_._1).mapValues(_.map(_._2))

你会得到一个Map[String, Seq[(String, String)]。如果您想要 Seq 而不是 Map,请在结果上调用 toSeq。不过,我认为您不能保证生成的 Seq 中的顺序


聚合是一个更难的函数。

首先考虑 reduceLeft 和 reduceRight。 让asA 类型元素的非空序列as = Seq(a1, ... an),而f: (A,A) => A 是将A 类型的两个元素组合成一个的某种方式。我会将它记为二元运算符@a1 @ a2 而不是f(a1, a2)as.reduceLeft(@) 将计算 (((a1 @ a2) @ a3)... @ an)reduceRight 将括号放在相反的位置,(a1 @ (a2 @... @ an))))。如果@ 恰好是关联的,则不会关心括号。可以将其计算为(a1 @... @ ap) @ (ap+1 @...@an)(两个大括号内也会有括号,但我们不要在意)。然后可以并行执行这两个部分,而 reduceLeft 或 reduceRight 中的嵌套括号强制执行完全顺序计算。但是只有当@ 被认为是关联的并且reduceLeft 方法不知道这一点时才可能进行并行计算。

仍然可以有方法reduce,其调用者将负责确保操作是关联的。然后reduce 将按照它认为合适的方式对调用进行排序,可能并行进行。确实有这样的方法。

然而,各种 reduce 方法都有一个限制。 Seq 的元素只能组合成相同类型的结果:@ 必须是 (A,A) => A。但是一个更普遍的问题是将它们组合成一个B。一个以B 类型的值b 开始,并将其与序列的每个元素组合。运算符@(B,A) => B,计算(((b @ a1) @ a2) ... @ an)foldLeft 就是这样做的。 foldRight 做同样的事情,但从 an 开始。在那里,@ 操作没有机会关联。当一个人写b @ a1 @ a2 时,它一定意味着(b @ a1) @ a2,因为(a1 @ a2) 将是错误的类型。所以 foldLeft 和 foldRight 必须是顺序的。

然而,假设每个A 都可以变成B,让我们用! 来写它,a! 的类型是B。此外假设有一个+ 操作(B,B) => B,并且@ 使得b @ a 实际上是b + a!。与其用@组合元素,不如先用!将它们全部转换为B,然后用+组合它们。那将是as.map(!).reduceLeft(+)。如果+ 是关联的,那么可以使用reduce 来完成,而不是顺序的:as.map(!).reduce(+)。可能有一个假设的方法 as.associativeFold(b, !, +)。

Aggregate 非常接近这一点。然而,可能有一种比b+a! 更有效的方法来实现b@a 例如,如果类型BList[A],而b@a 是a::b,那么a! 将是a::Nilb1 + b2 将是 b2 ::: b1。 a::b 比 (a::Nil):::b 好得多。要从关联性中受益,但仍使用@,首先将b + a1! + ... + an! 拆分为(b + a1! + ap!) + (ap+1! + ..+ an!),然后返回使用@(b @ a1 @ an) + (ap+1! @ @ an)。一个仍然需要!在 ap+1 上,因为必须从一些 b 开始。 + 仍然是必要的,出现在括号之间。为此,可以将as.associativeFold(!, +) 更改为as.optimizedAssociativeFold(b, !, @, +)

返回++ 是关联的,或者等效地,(B, +) 是一个半群。实际上,编程中使用的大多数半群恰好也是幺半群,即它们在 B 中包含一个中性元素 z(对于 ),因此对于每个 bz + b = b + z = b。在这种情况下,有意义的! 操作很可能是a! = z @ a。此外,z 是一个中性元素b @ a1 ..@ an = (b + z) @ a1 @ an,即b + (z + a1 @ an)。所以总是可以用 z 开始聚合。如果需要b,则在最后执行b + result。有了所有这些假设,我们可以做 as.aggregate(z, @, +)。这就是aggregate 所做的。 @seqop 参数(应用于序列 z @ a1 @ a2 @ ap),+combop(应用于已经部分合并的结果,如在(z + a1@...@ap) + (z + ap+1@...@an))。

总而言之,as.aggregate(z)(seqop, combop) 计算与 as.foldLeft(z)( seqop) 相同的东西,前提是

(B, combop, z) 是一个幺半群 seqop(b,a) = combop(b, seqop(z,a))

聚合实现可以使用组合的关联性来对计算进行分组(但是不交换元素,+ 不必是可交换的, ::: 不是)。它可以并行运行它们。

最后,使用aggregate 解决最初的问题留给读者作为练习。提示:使用foldLeft 实现,然后找到满足上述条件的zcombo

【讨论】:

【参考方案3】:

具有 A 类型元素的集合的签名是:

def aggregate [B] (z: B)(seqop: (B, A) ⇒ B, combop: (B, B) ⇒ B): B 
z 是 B 类型的对象,充当中性元素。如果你想计算一些东西,你可以使用 0,如果你想构建一个列表,可以从一个空列表开始,等等。 segop 类似于您传递给 fold 方法的函数。它有两个参数,第一个与您传递的中性元素类型相同,表示在上一次迭代中已经聚合的内容,第二个是您集合的下一个元素。结果还必须是 B 类型。 combop: 是将两个结果合二为一的函数。

在大多数集合中,聚合在TraversableOnce 中实现为:

  def aggregate[B](z: B)(seqop: (B, A) => B, combop: (B, B) => B): B 
    = foldLeft(z)(seqop)

因此combop 被忽略。但是,对于并行集合是有意义的,因为seqop 将首先在本地并行应用,然后调用combop 来完成聚合。

因此,对于您的示例,您可以先尝试折叠:

val seqOp = 
  (map:Map[String,Set[String]],tuple: (String,String)) => 
    map + ( tuple._1 -> ( map.getOrElse( tuple._1, Set[String]() ) + tuple._2 ) )


list.foldLeft( Map[String,Set[String]]() )( seqOp )
// returns: Map(one -> Set(i, 1), two -> Set(2, ii), four -> Set(iv))

那么你必须找到一种折叠两个多图的方法:

val combOp = (map1: Map[String,Set[String]], map2: Map[String,Set[String]]) =>
       (map1.keySet ++ map2.keySet).foldLeft( Map[String,Set[String]]() )  
         (result,k) => 
           result + ( k -> ( map1.getOrElse(k,Set[String]() ) ++ map2.getOrElse(k,Set[String]() ) ) ) 
        

现在,您可以并行使用聚合:

list.par.aggregate( Map[String,Set[String]]() )( seqOp, combOp )
//Returns: Map(one -> Set(i, 1), two -> Set(2, ii), four -> Set(iv))

将方法“par”应用于列表,从而使用列表的并行集合(scala.collection.parallel.immutable.ParSeq)来真正利用多核处理器。如果没有“par”,则不会有任何性能提升,因为聚合不是在并行集合上完成的。

【讨论】:

【参考方案4】:

aggregate 类似于foldLeft,但可以并行执行。

作为missingfactor says,aggregate(z)(seqop, combop) 的线性版本等价于foldleft(z)(seqop)。然而,这在并行情况下是不切实际的,在这种情况下,我们不仅需要将下一个元素与前一个结果结合起来(就像在正常折叠中一样),而且我们希望将可迭代对象拆分为子可迭代对象,我们将其称为聚合并需要再次结合这些。 (按从左到右的顺序,但不是关联的,因为我们可能已经在可迭代的第一个部分之前组合了最后一部分。)这种重新组合通常是不平凡的,因此,需要一种方法 (S, S) => S 来做到这一点。

ParIterableLike中的定义是:

def aggregate[S](z: S)(seqop: (S, T) => S, combop: (S, S) => S): S = 
  executeAndWaitResult(new Aggregate(z, seqop, combop, splitter))

确实使用了combop

供参考,Aggregate 定义为:

protected[this] class Aggregate[S](z: S, seqop: (S, T) => S, combop: (S, S) => S, protected[this] val pit: IterableSplitter[T])
  extends Accessor[S, Aggregate[S]] 
    @volatile var result: S = null.asInstanceOf[S]
    def leaf(prevr: Option[S]) = result = pit.foldLeft(z)(seqop)
    protected[this] def newSubtask(p: IterableSplitter[T]) = new Aggregate(z, seqop, combop, p)
    override def merge(that: Aggregate[S]) = result = combop(result, that.result)

重要的部分是merge,其中combop 应用了两个子结果。

【讨论】:

这个是一个真正“明白”afa aggregate及其在树状结构聚合中的有用性的答案。【参考方案5】:

这里是关于聚合如何在具有基准的多核处理器上启用性能的博客。 http://markusjais.com/scalas-parallel-collections-and-the-aggregate-method/

这是“Scala Days 2011”中有关“Scala 并行集合”演讲的视频。 http://days2011.scala-lang.org/node/138/272

视频说明

Scala 并行集合

亚历山大·普罗科佩克

随着处理器内核数量的增加,并行编程抽象变得越来越重要。高级编程模型使程序员能够更多地关注程序,而不是低级细节,例如同步和负载平衡。 Scala 并行集合扩展了 Scala 集合框架的编程模型,提供对数据集的并行操作。 演讲将描述并行收集框架的架构,解释它们的实现和设计决策。将描述具体的集合实现,例如并行哈希映射和并行哈希尝试。最后,将展示几个示例应用程序,在实践中演示编程模型。

【讨论】:

【参考方案6】:

aggregateTraversableOnce源码中的定义是:

def aggregate[B](z: B)(seqop: (B, A) => B, combop: (B, B) => B): B = 
  foldLeft(z)(seqop)

这与简单的foldLeft 没有什么不同。 combop 似乎没有在任何地方使用。我自己对这种方法的目的感到困惑。

【讨论】:

【参考方案7】:

只是为了澄清我之前的解释,理论上这个想法是 聚合应该像这样工作,(我已经更改了参数的名称以使其更清晰):

Seq(1,2,3,4).aggragate(0)(
     addToPrev = (prev,curr) => prev + curr, 
     combineSums = (sumA,sumB) => sumA + sumB)

应该在逻辑上翻译成

Seq(1,2,3,4)
    .grouped(2) // split into groups of 2 members each
    .map(prevAndCurrList => prevAndCurrList(0) + prevAndCurrList(1))
    .foldLeft(0)(sumA,sumB => sumA + sumB)

由于聚合和映射是分开的,所以理论上可以将原始列表分成不同大小的不同组,并行运行,甚至在不同的机器上运行。 实际上,scala 当前的实现默认不支持此功能,但您可以在自己的代码中执行此操作。

【讨论】:

以上是关于Scala 聚合函数的示例的主要内容,如果未能解决你的问题,请参考以下文章

Spark多个动态聚合函数,countDistinct不起作用

2021年大数据常用语言Scala(二十七):函数式编程 聚合操作

如何在scala中获取分层数组的最终元素并在其上应用聚合函数?

Dataframe Spark Scala中的最后一个聚合函数

Spark Scala聚合函数,用于查找组中列值的出现次数

SQL Server聚合函数与聚合开窗函数