scala 流计算之 aggregate()
Posted smile-yan
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了scala 流计算之 aggregate()相关的知识,希望对你有一定的参考价值。
函数参数详解
def aggregate[B](z: =>B)(seqop: (B, A) => B, combop: (B, B) => B): B
B
: 函数返回结果的数据类型;z
:聚类前的参数的初始化值;seqop
:是用于序列运算的运算符,用于计算所述集合中每个元素的总和,并枚举集合中元素的总数。combop
:是一个组合运算符,用于组合通过集合的并行计算获得的结果。
例子 1
可能还是有些难以理解,这里摘录一个例子:
// Scala program of aggregate()
// function
// Creating an object
object GfG
// Main method
def main(args: Array[String])
// Creating a list of numbers
val s = List(1, 2, 3, 4)
// Applying aggregate function
val r = s.par.aggregate((0, 0))((s, r) =>(s._1 + r, s._2 + 1),
(s,r) => (s._1 + r._1, s._2 + r._2))
// Displays summation of all the
// elements in the list and also
// total number of elements
println("(Sum of all the elements , total number of elements) = "+r)
输出结果为:
(10, 4)
这里我们说明一下整个计算步骤过程:
s
的初始化值为(0, 0)
,r
是指所求数组的某个元素(注意由于是并行计算,所以并不清楚取数的顺序)- 计算符合
1
如图所示,序号 1
为 s._1
的 第一次
的更新 s._1
的公式,假设第一次取出的数 r=1
,那么 s._1 = s._1 + r = 1
。注意序号 1
的 +
与后面的序号 3
的框里面的 +
没有关系
,序号3框里的 +
,是指我们代码中 (s,r) => (s._1 + r._1, s._2 + r._2))
,这个式子里第一个 +
号,表示取出数以后怎么计算。所以整个流程左边可以理解为 s._1
初始值为 0
,首次更新后为 1
,接着运行序号 3
框里的运算符,最终结果为 10
。
如图所示,序号 2
为 s._2
的更新方法,也就是不断 +1
,这个与序号 1
框里类似,序号 1
每次都取出数组里的一个数,这里每次都取 1
。同样地需要注意
的是,序号 2
的 +
符号只运行一次,序号 4
框里的 +
均来自于代码中 (s,r) => (s._1 + r._1, s._2 + r._2))
右边一部分。
为了更加直观,我们直接看 例 2。
例子 2
注意注释的地方,我们把之前的 +
改成了 *
。
object StreamCalc
def main(args: Array[String]): Unit =
// Creating a list of numbers
val s = List(1, 2, 3, 4)
// Applying aggregate function
val r = s.par.aggregate((0, 0))((s, r) =>
(s._1 + r, s._2 + 1)
, (p, q) =>
// 这个地方把两个 + 改成 *
(p._1 * q._1, p._2 * q._2)
)
// Displays summation of all the
// elements in the list and also
// total number of elements
println("(Sum of all the elements , total number of elements) = " + r)
输出的结果为:
(Sum of all the elements , total number of elements) = (24,1)
总结
这两个例子分别可以用来求数组的和以及数组中各个元素的乘积,尽管这个操作非常好用,而且在数值数目较大时可以大大提高运算效率。因而如果使用的是 scala 语言计算大数组的和、差、绝对值之和、所有元素的乘积等等,都可以考虑使用这个函数。
Smileyan
2023.01.09 21:00
以上是关于scala 流计算之 aggregate()的主要内容,如果未能解决你的问题,请参考以下文章
Spark SQL functions.scala 源码解析Aggregate functions(基于 Spark 3.3.0)