Spark Streamming 共享变量之_ 如何正确使用累加器

Posted brentboys

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark Streamming 共享变量之_ 如何正确使用累加器相关的知识,希望对你有一定的参考价值。

累加器:

  场景:各种计数问题,这个计算需要在driver端合并。

  作用:解决Driver端和Execute端数据共享问题。

    技术图片

 

   如图,需要将Driver端变量备份到Executor端,那么copy到Executor端的变量一定要是Executor级别的变量。那么如何自定义一个累加器呢,如何将累加器数据类型定义为Executor级别呢?

  自定义累加器:

  首先要继承AccumulatorV2,然后重写如下6个方法

    add方法:指定元素相加。

    copy方法:指定了对自定义累加器的复制操作。

    isZero方法:返回该累加值是否为0

    merge方法:合并两个相同类型的累加器。

    reset方法:重置累加器。

    value方法:返回累加器当前的值。

    技术图片

 

     技术图片

 

    如何保证Executor端的变量级别是Executor级别的呢?

    那么就是实现copy方法的时候做如下操作。

    技术图片

    技术图片

 

 

    如何在spark streamming程序进行定义呢?

    第一行是new 对象,并且通过构造器初始化。

    第二行是将累加器注册到sc中,并且起别名为AccumulatorV2Day

    技术图片

 

 

    如何在spark streamming程序中更新操作呢?

    在action算子中进行更新

    foreachPartition就是一个action算子。

    在其中执行add操作。

    技术图片

 

    如果获取到累加器value的值呢?将累加器.value写在流级别行吗?

    下面这一行代码的位置

    累加器.value不能写在这里

    DS.foreachRDD(rdd=>{

      rdd.forPartition(p=>{

        累加器.value不能写在这里

      })

      累加器.value应该写在这里。

    })

    累加器.value不能写在这里

    技术图片

 

   如何在spark streaming UI查看累加器相关数据呢?

  点击有数据的日期

  技术图片

   点击1653

 

  技术图片

  点击图中箭头的地方。

 

  技术图片

 

 

 

  drive端集合后的值

 

  技术图片

 

  各个分区的累加的值

  技术图片

 

 

 

 

  写的不好,又不对的地方还请指教。

 

 

 

 

 

 

 

 

 

 

 

 

 

    

 

      

    

 

 

 

 

 

  

 

 

 

以上是关于Spark Streamming 共享变量之_ 如何正确使用累加器的主要内容,如果未能解决你的问题,请参考以下文章

[java][spark][spark streamming]java.util.concurrent.TimeoutException: Futures timed out

[java][spark streamming]java.lang.IllegalArgumentException: requirement failed: No output operations

Spark2.4.8 共享变量之累加器

9.spark core之共享变量

Spark共享变量

3天掌握Spark--RDD 共享变量