Scala 聚合函数的示例
Posted
技术标签:
【中文标题】Scala 聚合函数的示例【英文标题】:Example of the Scala aggregate function 【发布时间】:2011-10-19 04:38:35 【问题描述】:我一直在寻找,但找不到我能理解的 Scala 中 aggregate
函数的示例或讨论。它看起来很强大。
这个函数可以用来减少元组的值来创建一个多图类型的集合吗?例如:
val list = Seq(("one", "i"), ("two", "2"), ("two", "ii"), ("one", "1"), ("four", "iv"))
应用聚合后:
Seq(("one" -> Seq("i","1")), ("two" -> Seq("2", "ii")), ("four" -> Seq("iv"))
另外,你能给出参数z
、segop
和combop
的例子吗?我不清楚这些参数的作用。
【问题讨论】:
【参考方案1】:让我们看看一些 ascii 艺术是否没有帮助。考虑aggregate
的类型签名:
def aggregate [B] (z: B)(seqop: (B, A) ⇒ B, combop: (B, B) ⇒ B): B
另外,请注意A
指的是集合的类型。所以,假设我们在这个集合中有 4 个元素,那么 aggregate
可能会像这样工作:
z A z A z A z A
\ / \ /seqop\ / \ /
B B B B
\ / combop \ /
B _ _ B
\ combop /
B
让我们看一个实际的例子。假设我有一个GenSeq("This", "is", "an", "example")
,我想知道其中有多少个字符。我可以写如下:
注意在下面的 sn-p 代码中使用了par
。传递给聚合的第二个函数是在计算单个序列之后调用的函数。 Scala 只能对可以并行化的集合执行此操作。
import scala.collection.GenSeq
val seq = GenSeq("This", "is", "an", "example")
val chars = seq.par.aggregate(0)(_ + _.length, _ + _)
所以,首先它会计算:
0 + "This".length // 4
0 + "is".length // 2
0 + "an".length // 2
0 + "example".length // 7
它接下来会做什么无法预测(组合结果的方法不止一种),但它可能会这样做(就像上面的 ascii 艺术):
4 + 2 // 6
2 + 7 // 9
在什么时候结束
6 + 9 // 15
给出最终结果。现在,这在结构上与foldLeft
有点相似,但它有一个额外的功能(B, B) => B
,这是 fold 没有的。但是,此功能使其能够并行工作!
例如,考虑一下,四个初始计算中的每一个都是相互独立的,并且可以并行完成。接下来的两个(导致 6 和 9)可以在它们所依赖的计算完成后启动,但是这两个可以也并行运行。
如上所述并行化的 7 次计算只需同时进行 3 次串行计算。
实际上,对于如此小的集合,同步计算的成本将足以抵消任何收益。此外,如果你折叠它,总共只需要 4 次计算。但是,一旦您的收藏变得更大,您就会开始看到一些真正的收益。
另一方面,考虑foldLeft
。因为它没有附加功能,所以它不能并行任何计算:
(((0 + "This".length) + "is".length) + "an".length) + "example".length
必须先计算每个内括号,然后才能进行外括号。
【讨论】:
我们可以说这类似于map reduce,seqop
扮演mapper
函数,combop
扮演reducer
函数?我也是一个新手,并试图理解语义。感谢 ASCII 艺术,绝对有帮助!
是的。树聚合结构是aggregate
存在的关键。
这很令人困惑,因为在您的示例中 combop 从未被调用。你可以通过简单地为第二个参数做任何你想做的事情来看到这一点。例如返回数字 11242414,你仍然得到相同的答案 15。
我对此进行了更多研究,发现从未调用 combop 函数,因为您使用的集合不可并行化。如果您在聚合之前调用par
,它将确保调用组合。【参考方案2】:
聚合函数不这样做(除了它是一个非常通用的函数,并且可以用来这样做)。你想要groupBy
。至少接近。当您从Seq[(String, String)]
开始时,您通过获取元组中的第一项(即(String, String) => String)
,它将返回Map[String, Seq[(String, String)]
)进行分组。然后,您必须丢弃 Seq[String, String)] 值中的第一个参数。
所以
list.groupBy(_._1).mapValues(_.map(_._2))
你会得到一个Map[String, Seq[(String, String)]
。如果您想要 Seq
而不是 Map
,请在结果上调用 toSeq
。不过,我认为您不能保证生成的 Seq 中的顺序
聚合是一个更难的函数。
首先考虑 reduceLeft 和 reduceRight。
让as
是A
类型元素的非空序列as = Seq(a1, ... an)
,而f: (A,A) => A
是将A
类型的两个元素组合成一个的某种方式。我会将它记为二元运算符@
、a1 @ a2
而不是f(a1, a2)
。 as.reduceLeft(@)
将计算 (((a1 @ a2) @ a3)... @ an)
。 reduceRight
将括号放在相反的位置,(a1 @ (a2 @... @ an))))
。如果@
恰好是关联的,则不会关心括号。可以将其计算为(a1 @... @ ap) @ (ap+1 @...@an)
(两个大括号内也会有括号,但我们不要在意)。然后可以并行执行这两个部分,而 reduceLeft 或 reduceRight 中的嵌套括号强制执行完全顺序计算。但是只有当@
被认为是关联的并且reduceLeft 方法不知道这一点时才可能进行并行计算。
仍然可以有方法reduce
,其调用者将负责确保操作是关联的。然后reduce
将按照它认为合适的方式对调用进行排序,可能并行进行。确实有这样的方法。
然而,各种 reduce 方法都有一个限制。 Seq 的元素只能组合成相同类型的结果:@
必须是 (A,A) => A
。但是一个更普遍的问题是将它们组合成一个B
。一个以B
类型的值b
开始,并将其与序列的每个元素组合。运算符@
是(B,A) => B
,计算(((b @ a1) @ a2) ... @ an)
。 foldLeft
就是这样做的。 foldRight
做同样的事情,但从 an
开始。在那里,@
操作没有机会关联。当一个人写b @ a1 @ a2
时,它一定意味着(b @ a1) @ a2
,因为(a1 @ a2)
将是错误的类型。所以 foldLeft 和 foldRight 必须是顺序的。
然而,假设每个A
都可以变成B
,让我们用!
来写它,a!
的类型是B
。此外假设有一个+
操作(B,B) => B
,并且@
使得b @ a
实际上是b + a!
。与其用@组合元素,不如先用!
将它们全部转换为B,然后用+
组合它们。那将是as.map(!).reduceLeft(+)
。如果+
是关联的,那么可以使用reduce 来完成,而不是顺序的:as.map(!).reduce(+)。可能有一个假设的方法 as.associativeFold(b, !, +)。
Aggregate 非常接近这一点。然而,可能有一种比b+a!
更有效的方法来实现b@a
例如,如果类型B
是List[A]
,而b@a 是a::b,那么a!
将是a::Nil
和 b1 + b2
将是 b2 ::: b1
。 a::b 比 (a::Nil):::b 好得多。要从关联性中受益,但仍使用@
,首先将b + a1! + ... + an!
拆分为(b + a1! + ap!) + (ap+1! + ..+ an!)
,然后返回使用@
和(b @ a1 @ an) + (ap+1! @ @ an)
。一个仍然需要!在 ap+1 上,因为必须从一些 b 开始。 + 仍然是必要的,出现在括号之间。为此,可以将as.associativeFold(!, +)
更改为as.optimizedAssociativeFold(b, !, @, +)
。
返回+
。 +
是关联的,或者等效地,(B, +)
是一个半群。实际上,编程中使用的大多数半群恰好也是幺半群,即它们在 B 中包含一个中性元素 z
(对于 零),因此对于每个 b
、z + b
= b + z
= b
。在这种情况下,有意义的!
操作很可能是a! = z @ a
。此外,z 是一个中性元素b @ a1 ..@ an = (b + z) @ a1 @ an
,即b + (z + a1 @ an)
。所以总是可以用 z 开始聚合。如果需要b
,则在最后执行b + result
。有了所有这些假设,我们可以做 as.aggregate(z, @, +)
。这就是aggregate
所做的。 @
是 seqop
参数(应用于序列 z @ a1 @ a2 @ ap
),+
是 combop
(应用于已经部分合并的结果,如在(z + a1@...@ap) + (z + ap+1@...@an)
)。
总而言之,as.aggregate(z)(seqop, combop)
计算与 as.foldLeft(z)( seqop)
相同的东西,前提是
(B, combop, z)
是一个幺半群
seqop(b,a) = combop(b, seqop(z,a))
聚合实现可以使用组合的关联性来对计算进行分组(但是不交换元素,+ 不必是可交换的, ::: 不是)。它可以并行运行它们。
最后,使用aggregate
解决最初的问题留给读者作为练习。提示:使用foldLeft
实现,然后找到满足上述条件的z
和combo
。
【讨论】:
【参考方案3】:具有 A 类型元素的集合的签名是:
def aggregate [B] (z: B)(seqop: (B, A) ⇒ B, combop: (B, B) ⇒ B): B
z
是 B 类型的对象,充当中性元素。如果你想计算一些东西,你可以使用 0,如果你想构建一个列表,可以从一个空列表开始,等等。
segop
类似于您传递给 fold
方法的函数。它有两个参数,第一个与您传递的中性元素类型相同,表示在上一次迭代中已经聚合的内容,第二个是您集合的下一个元素。结果还必须是 B
类型。
combop
: 是将两个结果合二为一的函数。
在大多数集合中,聚合在TraversableOnce
中实现为:
def aggregate[B](z: B)(seqop: (B, A) => B, combop: (B, B) => B): B
= foldLeft(z)(seqop)
因此combop
被忽略。但是,对于并行集合是有意义的,因为seqop
将首先在本地并行应用,然后调用combop
来完成聚合。
因此,对于您的示例,您可以先尝试折叠:
val seqOp =
(map:Map[String,Set[String]],tuple: (String,String)) =>
map + ( tuple._1 -> ( map.getOrElse( tuple._1, Set[String]() ) + tuple._2 ) )
list.foldLeft( Map[String,Set[String]]() )( seqOp )
// returns: Map(one -> Set(i, 1), two -> Set(2, ii), four -> Set(iv))
那么你必须找到一种折叠两个多图的方法:
val combOp = (map1: Map[String,Set[String]], map2: Map[String,Set[String]]) =>
(map1.keySet ++ map2.keySet).foldLeft( Map[String,Set[String]]() )
(result,k) =>
result + ( k -> ( map1.getOrElse(k,Set[String]() ) ++ map2.getOrElse(k,Set[String]() ) ) )
现在,您可以并行使用聚合:
list.par.aggregate( Map[String,Set[String]]() )( seqOp, combOp )
//Returns: Map(one -> Set(i, 1), two -> Set(2, ii), four -> Set(iv))
将方法“par”应用于列表,从而使用列表的并行集合(scala.collection.parallel.immutable.ParSeq)来真正利用多核处理器。如果没有“par”,则不会有任何性能提升,因为聚合不是在并行集合上完成的。
【讨论】:
【参考方案4】:aggregate
类似于foldLeft
,但可以并行执行。
作为missingfactor says,aggregate(z)(seqop, combop)
的线性版本等价于foldleft(z)(seqop)
。然而,这在并行情况下是不切实际的,在这种情况下,我们不仅需要将下一个元素与前一个结果结合起来(就像在正常折叠中一样),而且我们希望将可迭代对象拆分为子可迭代对象,我们将其称为聚合并需要再次结合这些。 (按从左到右的顺序,但不是关联的,因为我们可能已经在可迭代的第一个部分之前组合了最后一部分。)这种重新组合通常是不平凡的,因此,需要一种方法 (S, S) => S
来做到这一点。
ParIterableLike
中的定义是:
def aggregate[S](z: S)(seqop: (S, T) => S, combop: (S, S) => S): S =
executeAndWaitResult(new Aggregate(z, seqop, combop, splitter))
确实使用了combop
。
供参考,Aggregate
定义为:
protected[this] class Aggregate[S](z: S, seqop: (S, T) => S, combop: (S, S) => S, protected[this] val pit: IterableSplitter[T])
extends Accessor[S, Aggregate[S]]
@volatile var result: S = null.asInstanceOf[S]
def leaf(prevr: Option[S]) = result = pit.foldLeft(z)(seqop)
protected[this] def newSubtask(p: IterableSplitter[T]) = new Aggregate(z, seqop, combop, p)
override def merge(that: Aggregate[S]) = result = combop(result, that.result)
重要的部分是merge
,其中combop
应用了两个子结果。
【讨论】:
这个是一个真正“明白”afaaggregate
及其在树状结构聚合中的有用性的答案。【参考方案5】:
这里是关于聚合如何在具有基准的多核处理器上启用性能的博客。 http://markusjais.com/scalas-parallel-collections-and-the-aggregate-method/
这是“Scala Days 2011”中有关“Scala 并行集合”演讲的视频。 http://days2011.scala-lang.org/node/138/272
视频说明
Scala 并行集合
亚历山大·普罗科佩克
随着处理器内核数量的增加,并行编程抽象变得越来越重要。高级编程模型使程序员能够更多地关注程序,而不是低级细节,例如同步和负载平衡。 Scala 并行集合扩展了 Scala 集合框架的编程模型,提供对数据集的并行操作。 演讲将描述并行收集框架的架构,解释它们的实现和设计决策。将描述具体的集合实现,例如并行哈希映射和并行哈希尝试。最后,将展示几个示例应用程序,在实践中演示编程模型。
【讨论】:
【参考方案6】:aggregate
在TraversableOnce
源码中的定义是:
def aggregate[B](z: B)(seqop: (B, A) => B, combop: (B, B) => B): B =
foldLeft(z)(seqop)
这与简单的foldLeft
没有什么不同。 combop
似乎没有在任何地方使用。我自己对这种方法的目的感到困惑。
【讨论】:
【参考方案7】:只是为了澄清我之前的解释,理论上这个想法是 聚合应该像这样工作,(我已经更改了参数的名称以使其更清晰):
Seq(1,2,3,4).aggragate(0)(
addToPrev = (prev,curr) => prev + curr,
combineSums = (sumA,sumB) => sumA + sumB)
应该在逻辑上翻译成
Seq(1,2,3,4)
.grouped(2) // split into groups of 2 members each
.map(prevAndCurrList => prevAndCurrList(0) + prevAndCurrList(1))
.foldLeft(0)(sumA,sumB => sumA + sumB)
由于聚合和映射是分开的,所以理论上可以将原始列表分成不同大小的不同组,并行运行,甚至在不同的机器上运行。 实际上,scala 当前的实现默认不支持此功能,但您可以在自己的代码中执行此操作。
【讨论】:
以上是关于Scala 聚合函数的示例的主要内容,如果未能解决你的问题,请参考以下文章
Spark多个动态聚合函数,countDistinct不起作用
2021年大数据常用语言Scala(二十七):函数式编程 聚合操作
如何在scala中获取分层数组的最终元素并在其上应用聚合函数?