Spark2.4.8 共享变量之累加器

Posted 若兰幽竹

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark2.4.8 共享变量之累加器相关的知识,希望对你有一定的参考价值。

一、共享变量


通常,当传递给Spark操作(例如map或reduce)的函数在远程集群节点上执行时,它会在函数中使用的所有变量的单独副本上工作。这些变量被复制到每台机器上,远程机器上变量的更新不会传播回驱动程序。支持跨任务的通用、读写共享变量将是低效的。但是,Spark为两种常见的使用模式提供了两种有限的共享变量类型:广播变量和累加器。

spark通过广播变量和累加器实现共享变量。

二、累加器


累加器是只能通过关联和交换操作添加的变量,因此可以有效地并行支持。它们可以用于实现计数器(如MapReduce)或求和。Spark本地支持数字类型的累加器,程序员可以添加对新类型的支持。作为用户,您可以创建命名或未命名的累加器。如下图所示,一个命名的累加器(在这个实例计数器中)将显示在修改累加器的stage的web UI中。Spark在Tasks表中显示任务修改的每个累加器的值。

三、基础演示


  • 需求描述:利用累加器统计出worker上所有task运行次数
  • 演示环境:三台虚拟机组成的集群
  • 运行环境:spark-shell
  • 步骤:
    • 启动spark-shell:
      bin/spark-shell --master spark://niit01:7077
    • 编写如下代码:
      scala> val ac1 = sc.longAccumulator("ac1")
      ac1: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 0, name: Some(ac1), value: 0)
      scala> sc.parallelize(1 to 20).map(x => {ac1.add(1);x+1}).reduce(_+_) // 在map中统计出当前的map task 运行的次数
      res0: Int = 230  // reduce的结果
      
      scala> ac1.value // 获取累加器的值
      res1: Long = 20 // 累加器进行了20次
      

四、实验案例


  1. 实验要求:利用累加器完成对特定条件下的task任务运行次数统计,如统计出某数列中的奇数项被计算的次数
  2. 代码如下:
    	
    scala> val ac1 = sc.longAccumulator("ac1") // 创建累加器
    scala> ac1.value  // 获取累加器初始值
    res0: Long = 0
    
    scala> sc.parallelize(1 to 20).map(x => { // 统计奇数被执行的次数
         | if (x % 2 != 0)
         |   {ac1.add(1) }
         | x}).reduce(_+_)
         
    scala> ac1.value // 获取累加器的数值,只被累加了10次
    res2: Long = 10
    
  3. 在webui上查看,如下:

五、简要分析


  • 上述的实验案例在webui上查看发现两台worker节点上每台节点上各自执行了5次。故,对于任意一个job而言,其所有的task的总数是集群中所有task之和。另外,对于累加器而言,每个task都可以共享累加器,底层利用分布式锁来确保每个task在使用累加器前其值都是最新的,每个task在原来的数值上进行累计。下面我们来查看下每个task运行时累加器的数值变化:
    把上面的代码改写成如下:
    scala> val ac2 = sc.longAccumulator("ac2") // 创建累加器
    scala> sc.parallelize(1 to 20).map(x => {
         | if( x % 2 != 0)
         | {
         |
         |
         |   var hostname = java.net.InetAddress.getLocalHost.getHostName;
         |   val ac2Val = ac2.value
         |   var str = hostname + " : " + Thread.currentThread().getName + " => ac2 before add: " + ac2Val;
         |   ac2.add(1)
         |   val ac2Val2 = ac2.value
         |   str = str + " => ac2 after add: " + ac2Val2 + "\\r\\n"
         |   val socket = new java.net.Socket(hostname,9999);
         |   val out = socket.getOutputStream;
         |   out.write(str.getBytes())
         |   out.flush()
         |   out.close()
         |   socket.close()
         | }
         | x }).reduce(_+_)
    
    注意:上述加入了Socket,故需要在worker节点上按照nc进行查看效果,关于nc的按照有两种,推荐使用在线安装,执行: yum install -y nc
  • 运行上述代码后查看其他两台节点控制台输出,查看前需要分别在两台从节点上启动nc,执行: nc -lk 9999 回车,效果如下:
    • niit02从节点
    • niit03从节点

      在web ui上查看,如下所示:

      每个Task对累加器更新了5次。
      至此,对累加器的介绍就到这里,如对你有所帮助,你的赞将是对我最大的鼓励,谢谢~!!

以上是关于Spark2.4.8 共享变量之累加器的主要内容,如果未能解决你的问题,请参考以下文章

Spark基础学习笔记21:RDD检查点与共享变量

Spark 累加器

Spark(20)——广播变量和累加器

Spark 累加器与广播变量

Spark 系列—— 累加器与广播变量

Spark 系列—— 累加器与广播变量