Spark操作:Aggregate和AggregateByKey

Posted 代码空间

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark操作:Aggregate和AggregateByKey相关的知识,希望对你有一定的参考价值。

1. Aggregate

Aggregate即聚合操作。直接上代码:

import org.apache.spark.{SparkConf, SparkContext}

object AggregateTest {

  def main(args:Array[String]) = {

    // 设置运行环境
    val conf = new SparkConf().setAppName("Aggregate Test").setMaster("spark://master:7077").setJars(Seq("E:\\Intellij\\Projects\\SimpleGraphX\\SimpleGraphX.jar"))
    val sc = new SparkContext(conf)

    var data = List(2,5,8,1,2,6,9,4,3,5)
    var res = data.par.aggregate((0,0))(
      // seqOp
      (acc, number) => (acc._1+number, acc._2+1),
      // combOp
      (par1, par2) => (par1._1+par2._1, par1._2+par2._2)
    )

    println(res)

    sc.stop
  }

}

acc即(0,0),number即data,seqOp将data的值累加到Tuple的第一个元素,将data的个数累加到Tuple的第二个元素。由于没有分区,所以combOp是不起作用的,这个例子里面即使分区了,combOp起作用了,结果也是一样的。

运行结果:

(45,10)

2. AggregateByKey

AggregateByKey和Aggregate差不多,也是聚合,不过它是根据Key的值来聚合。

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
  * Created by Administrator on 2017/6/13.
  */
object AggregateByKeyTest {

  def main(args:Array[String]) = {

    // 设置运行环境
    val conf = new SparkConf().setAppName("AggregateByKey Test").setMaster("spark://master:7077").setJars(Seq("E:\\Intellij\\Projects\\SimpleGraphX\\SimpleGraphX.jar"))
    val sc = new SparkContext(conf)

    val data = List((1,3),(1,2),(1,4),(2,3),(3,6),(3,8))
    val rdd = sc.parallelize(data)

    val res : RDD[(Int,Int)] = rdd.aggregateByKey(0)(
      // seqOp
      math.max(_,_),
      // combOp
      _+_
    )

    res.collect.foreach(println)
    sc.stop
  }

}

根据Key值的不同,可以分为3个组:

(1)  (1,3),(1,2),(1,4);

(2)  (2,3);

(3)  (3,6),(3,8)。

这3个组分别进行seqOp,也就是(K,V)里面的V和0进行math.max()运算,运算结果和下一个V继续运算,以第一个组为例,运算过程是这样的:

0, 3 => 3

3, 2 => 3

3, 4 => 4

所以最终结果是(1,4)。combOp是对把各分区的V加起来,由于这里并没有分区,所以实际上是不起作用的。

运行结果:

(2,3)
(1,4)
(3,8)

如果生成RDD时分成3个区:

val rdd = sc.parallelize(data,3)

运行结果就变成了:

(3,8)
(1,7)
(2,3)

这是因为一个分区返回(1,3),另一个分区返回(1,4),combOp将这两个V加起来,就得到了(1,7)。

以上是关于Spark操作:Aggregate和AggregateByKey的主要内容,如果未能解决你的问题,请参考以下文章

spark aggregate函数

如何使用 db.command 获取 Python Mongodb Aggregate 解释?

Spark MLlib 之 aggregate和treeAggregate从原理到应用

Django的注释和聚合方法之间的区别?

轻松理解 Spark 的 aggregate 方法

Spark SQL functions.scala 源码解析Aggregate functions(基于 Spark 3.3.0)