Spark中自定义累加器和广播变量的使用

Posted 爱码农爱生活

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark中自定义累加器和广播变量的使用相关的知识,希望对你有一定的参考价值。

自定义累加器

累加器和普通的变量相比,会将executor端的结果,收集到driver端进行汇总

def main(args: Array[String]): Unit = {
   //创建spark并设置app名称
   val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
   //创建sparkcontext ,该对象是提交spark app的入口
   val sc: SparkContext = new SparkContext(conf)

   val rdd: RDD[String] = sc.makeRDD(List("hello", "abc", "hi", "hello", "hi", "ok", "111", "xxx", "hello"))
   val accumlator = new MyAccumlator
   sc.register(accumlator)
   rdd.foreach{word=>{accumlator.add(word)}}
   println(accumlator.value)  //Map(hello -> 3, hi -> 2)
   //关闭连接
   sc.stop()
}

//定义一个类来继承AccumulatorV2
class MyAccumlator extends AccumulatorV2[String,mutable.Map[String,Int]]{
 //定义一个集合,集合单词以及出现次数
 var map: mutable.Map[String, Int] = mutable.Map[String, Int]()
 //是否为初始状态
 override def isZero: Boolean =map.isEmpty
 //拷贝
 override def copy(): AccumulatorV2[String, mutable.Map[String, Int]] = {
   val acc = new MyAccumlator
   acc.map=this.map
   acc
}
 //reset
 override def reset(): Unit = map.clear()
 //向累加器中添加元素
 override def add(v: String): Unit = {
   if(v.startsWith("h")){
     //向可变元素中添加或更新元素
     map(v)= map.getOrElse(v,0)+1
  }
}
 override def merge(other: AccumulatorV2[String, mutable.Map[String, Int]]): Unit = {
   var map1=map
   var map2=other.value;
   map=map1.foldLeft(map2){
    (mm,kv)=>{
       val k: String = kv._1
       val v: Int = kv._2
       mm(k)=mm.getOrElse(k,0)+v
       mm
    }
  }
}
 override def value: mutable.Map[String, Int] = map
}

累加器执行原理

累加器:分布式共享只写变量

先调用copy,再调用reset ,再调用iszero

merge是计算完毕后自动调用的。

广播变量

分布式共享只读变量

多个executor中使用同一个变量。


def main(args: Array[String]): Unit = {
   //创建spark并设置app名称
   val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
   //创建sparkcontext ,该对象是提交spark app的入口
   val sc: SparkContext = new SparkContext(conf)
//采用集合的方式,实现rdd1,rdd2的join
val rdd1: RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("b", 1), ("c", 1)))
val rdd2 = List(("a", 4), ("b", 5), ("c", 6))
//声明广播变量
val broadCastList: Broadcast[List[(String, Int)]] = sc.broadcast(rdd2)
val resRDD: RDD[(String, (Int, Int))] = rdd1.map {
 case (k, v) => {
   var v3 = 0
   for ((k2, v2) <- broadCastList.value) {
     if (k == k2) {
       v3 = v2
    }
  }
  (k, (v, v3))
}
}
resRDD.collect().foreach(println)

//关闭连接
sc.stop();


以上是关于Spark中自定义累加器和广播变量的使用的主要内容,如果未能解决你的问题,请参考以下文章

Spark入门3(累加器和广播变量)

Spark(20)——广播变量和累加器

Spark 累加器与广播变量

Spark 系列—— 累加器与广播变量

Spark 系列—— 累加器与广播变量

Spark编程进阶