为啥我的spark累加器一直为0
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了为啥我的spark累加器一直为0相关的知识,希望对你有一定的参考价值。
参考技术A 累加器(accumulator)是Spark中提供的一种分布式的变量机制,其原理类似于mapreduce,即分布式的改变,然后聚合这些改变。累加器的一个常见用途是在调试时对作业执行过程中的事件进行计数, 累加器值未累加 我们都知道,spark中的rdd是惰性计算的,在没有遇到action算子之前是并没有真正开始计算的,也就是说累加器没有真正的进行累加。累加器值进行多次累加
Spark 累加器
由于spark是分布式的计算,所以使得每个task间不存在共享的变量,而为了实现共享变量spark实现了两种类型 - 累加器与广播变量,
对于其概念与理解可以参考:共享变量(广播变量和累加器) 。可能需要注意:Spark累加器(Accumulator)陷阱及解决办法
因此,我们便可以利用累加器与广播变量来构造一些比较常用的关系,以Map的形式广播出去,提高效率。
如下通过累加器构造了一个DF数据间
的映射关系,
defgetMap(spark:SparkSession,data:DataFrame){ //通过collectionAccumulator构造Map关系 valmyAccumulator=spark.sparkContext.collectionAccumulator[(String,Long)] data.foreach( row=>{ valname=row.getAs[String]("name") valage=row.getAs[Long]("age") myAccumulator.add(name,age) } ) valaiterator:util.Iterator[(String,Long)]=myAccumulator.value.iterator() varnewMap:Map[String,Long]=Map() while(aiterator.hasNext){ vala=aiterator.next() valkey=a._1 valvalue=a._2 if(!newMap.contains(key)){ newMap+=(key->value) } else{ valoldvalue=newMap(key) newMap+=(key->(oldvalue+value)) } } }
以上是关于为啥我的spark累加器一直为0的主要内容,如果未能解决你的问题,请参考以下文章
Spark Streamming 共享变量之_ 如何正确使用累加器