Spark中自定义累加器

Posted shun7man

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark中自定义累加器相关的知识,希望对你有一定的参考价值。

通过继承AccumulatorV2可以实现自定义累加器。

官方案例可参考:http://spark.apache.org/docs/latest/rdd-programming-guide.html#accumulators

下面是我自己写的一个统计卡种数量的案例。

package com.shuai7boy.myscalacode

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.util.AccumulatorV2

case class Card(var card1Count: Int, var card2Count: Int)

class CalcCardCount extends AccumulatorV2[Card, Card] {
  var result = new Card(0, 0)

  /** *
   * 判断,这个要和reset设定值一致
   *
   * @return
   */
  override def isZero: Boolean = {
    result.card1Count == 0 && result.card2Count == 0
  }

  /** *
   * 复制一个新的对象
   *
   * @return
   */
  override def copy(): AccumulatorV2[Card, Card] = {
    val newCalcCardCount = new CalcCardCount()
    newCalcCardCount.result = this.result
    newCalcCardCount
  }

  /** *
   * 重置每个分区的数值
   */
  override def reset(): Unit = {
    result.card1Count = 0
    result.card2Count = 0
  }

  /**
   * 每个分区累加自己的数值
   *
   * @param v
   */
  override def add(v: Card): Unit = {
    result.card1Count += v.card1Count
    result.card2Count += v.card2Count
  }

  /** *
   * 合并分区值,求得总值
   *
   * @param other
   */
  override def merge(other: AccumulatorV2[Card, Card]): Unit = other match {
    case o: CalcCardCount => {
      result.card1Count += o.result.card1Count
      result.card2Count += o.result.card2Count
    }

  }

  //返回结果
  override def value: Card = result
}


object CardCount {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("calcCardCountDemo").setMaster("local")
    val sc = new SparkContext(conf)
    val cc = new CalcCardCount
    sc.register(cc)
    val cardList = sc.parallelize(List[String]("card1 1", "card1 3", "card1 7", "card2 5", "card2 2"), 2)
    val cardMapRDD = cardList.map(card => {
      var cardInfo = new Card(0, 0)
      card.split(" ")(0) match {
        case "card1" => cardInfo = Card(card.split(" ")(1).toInt, 0)
        case "card2" => cardInfo = Card(0, card.split(" ")(1).toInt)
        case _ => Card(0, 0)
      }
      cc.add(cardInfo)
    })
    cardMapRDD.count() //执行action,触发上面的累加操作
    println("card1总数量为:" + cc.result.card1Count + ",card2总数量为:" + cc.result.card2Count)
  }
}

打印结果是:

card1总数量为:11,card2总数量为:7 

通过上面代码,就可以同时统计两个变量的值了,当然如果需要更多,可以扩展。默认的累加器只实现了一个。 

以上是关于Spark中自定义累加器的主要内容,如果未能解决你的问题,请参考以下文章

VS Code中自定义Emmet代码片段

活动中自定义对话框内的 ANDROID 片段

Spark篇---SparkSQL中自定义UDF和UDAF,开窗函数的应用

Spark学习之路 Spark的广播变量和累加器

spark源码系列之累加器实现机制及自定义累加器

在 Apache Spark Python 中自定义 K-means 的距离公式