Spark的fold()和aggregate()函数

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark的fold()和aggregate()函数相关的知识,希望对你有一定的参考价值。

参考技术A

最近在学习spark,理解这两个函数时候费了一些劲,现在记录一下。

说到 fold() 函数,就不得不提一下 reduce() 函数,他俩的区别就在于一个初始值。
reduce() 函数是这样写的:

参数是一个函数,这个函数的对rdd中的所有数据进行某种操作,比如:

对于这个 x ,它代指的是返回值,而 y 是对rdd各元素的遍历。
意思是对 l 中的数据进行累加。
flod() 函数相比 reduce() 加了一个初始值参数:

scala的语法确实是比较奇怪的,既然有两个参数,你就不能把两个参数放在一个括号里吗?也是醉了,这种写法确实容易让人迷惑。

这个计算其实 0 + 1 + 2 + 3 + 4 ,而 reduce() 的计算是: 1 + 2 + 3 + 4 ,没有初始值,或者说rdd的第一个元素值是它的初始值。

刚才说到 reduce() 和 fold() ,这两个函数有一个问题,那就是它们的返回值必须与rdd的数据类型相同,啥意思呢?比如刚才那个例子, l 的数据是 Int ,那么 reduce() 和 flod() 返回的也必须是 Int 。
aggregate() 函数就打破了这个限制。比如我返回 (Int, Int) 。这很有用,比如我要计算平均值的时候。
要算平均值,我就有两个值是要求的,一个是rdd的各元素的累加和,另一个是元素计数,我初始化为 (0, 0) 。
那么就是:

那么 seqOp 和 combOp 怎么写呢?而combOp又是啥意思呢?
我们将seqOp写为:

这啥意思?
在讲到 reduce() 函数的时候我说:

对于这个 x ,它代指的是返回值,而 y 是对rdd各元素的遍历。
在 aggregate() 这也一样, x 不是返回值吗,我返回值是 (Int, Int) 啊,它有两个元素啊,我可以用 x._1 和 x._2 来代指这两个元素的, y 不是rdd的元素遍历吗,那我 x._1 + y 就是各个元素的累加和啊, x._2 + 1 就是元素计数啊。遍历完成后返回的 (Int, Int) 就是累加和和元素计数啊。
按理说有这么一个函数就应该结束了,后边那个 combOp 是干嘛的?
因为我们的计算是分布式计算,这个函数是将累加器进行合并的。
例如第一个节点遍历1和2, 返回的是 (3, 2) ,第二个节点遍历3和4, 返回的是 (7, 2) ,那么将它们合并的话就是 3 + 7, 2 + 2 ,用程序写就是

最后程序是这样的:

m 就是所要求的均值。

scala - fold,aggregate,iterator

  import org.json4s._
  import org.json4s.jackson._
  import org.json4s.jackson.JsonMethods._
  import org.json4s.JsonDSL._
  import org.json4s.JsonDSL.map2jvalue
  // or
  //import org.json4s.JsonDSL.WithDouble._
  //import org.json4s.JsonDSL.WithBigDecimal._
  //import scala.io.Source
  //import scala.collection.JavaConverters._
  //import scala.collection.mutable.Buffer
  import java.io.{File,PrintWriter}
  import java.util.Date
  implicit val formats = Serialization.formats(ShortTypeHints(List()))

使用json4s进行json操作,踩了scala一个坑。

    val data = readConf(file)
    val t = data.head
    val r = "Response"
    val n = "Title"
    val p = "Plot"
    val contents = data.filter( row => (row \ r).extract[String].toBoolean).filter(row => (row \ p).extract[String] != "N/A")
    val plots = contents.map{
      row =>
        val title = (row \ n).values.toString
        val plots = (row \ p)
        title -> plots
    }

 

三种方式实现类元素的拼接:要注意操作符函数的参数顺序。

    // val plotsIter = plots.iterator
    // var rstObj = plotsIter.next ~ plotsIter.next
    // while(plotsIter.hasNext) rstObj = plotsIter.next ~ rstObj
    val (head,tail) = plots.splitAt(2)
    val rstObj = tail.foldRight(head.head ~ head.last)(_ ~ _)
    val rstObj = tail.aggregate(head.head ~ head.last)(_ ~ _, _ ~ _)

fold对操作符参数传参顺序和定义顺序一致。

fold(x)((a,b)=>foo(a,b))

先占坑,等忙过这阵子,具体给出试错经过和结论分析。

以上是关于Spark的fold()和aggregate()函数的主要内容,如果未能解决你的问题,请参考以下文章

Spark_RDD之简单Java函数接口

Spark:Reduce()与Fold()之间的区别[重复]

Spark MLlib 之 aggregate和treeAggregate从原理到应用

Spark操作:Aggregate和AggregateByKey

Scala 学习之 aggregate函数

Spark K-fold 交叉验证