[Spark]-RDD详解之变量&操作

Posted nightpxy

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了[Spark]-RDD详解之变量&操作相关的知识,希望对你有一定的参考价值。

RDD的操作

  1.1 概述

      RDD整体包含两大类操作

      transformation 从现有中创建一个新的数据集

      action 在对数据集做一定程度的计算后将结果返回

         以MapReduce来说,Map就是一个transformation ,它是从每个文件块上执行一个方法来抽取转换,最终形成一个新的数据集.而Reduce就是一个action,它在对数据集执行一个函数进行计算后返回一个结果

    对于所有的transformation,都是Lazy的,也就是说它不会立即执行,只是单纯的记住怎么样从原来的数据集进行转换的逻辑而已,它仅在某一个计算需要的情况下,才会被真正执行.

    因为transformation 的Lazy性,RDD支持在每次计算时都进行重新计算,当然你可以将这个RDD保存下来 (persist  or cache方法)避免每次重计算

      这种保存既可以是硬盘,也可以是内存,甚至可以选择同步多个副本到多个节点中

    1.2 集群环境下的操作

    集群环境,所有操作最终会交给executors去执行.而变量,会以数据副本的形式交给executors.很多时候,这与我们非集群环境下的开发思维有非常大的不同.

    1.2.1 集群下的闭包

        RDD是支持闭包操作的.但务必注意的是Spark不保证对闭包之外的对象引用进行的变化.

        原因是闭包的会被序列化发生给每一个executor,对于闭包的之外的对象引用会拷贝一个副本给executor.这时多个executor执行至少是跨JVM的

        这时对这个副本对象的变更没有任何意义,因为每个JVM(executor)的副本都是独立的.

    1.2.2 集群下的print

      集群环境下,print不会在driver端有任何输出.

      原因也是一样,print最终是在每个executor执行,其输出也是在每个executor的stdout上,在driver端,是不会有这些输出的.

      如果想在driver输出,一个比较简单的办法是调用collect()将结果发送到driver端在进行print,但这样可能会造成driver内存爆掉(所有executor的数据涌入).比较推荐的做法是rdd.take(100).foreach(println)

     1.2.3 共享变量

      因为集群下,变量只会以副本方式交给executor,而不会将变量的值变化传回driver,所以一个可读写共享的变量是非常有用的.

      Spark提供了两种共享变量 broadcast(广播变量) 和 accumulators(累加器)

      1.2.3.1 广播变量(broadcast)

        广播变量允许将一个只读变量的副本发送到每个机器上(executor机器),而不是对每一个任务发送一个副本.这样在同一机器上的多个任务,就可以反复使用这个变量了.      

        注意:

          广播变量只会对每个节点分发一次,所以一般来说,广播变量不应该再被修改了.以保证每个广播变量的副本的值都是一致的

          如果广播变量被修改,则需要将广播变量重新分发

        另:

          举个例子:Spark的action操作本身是通过一系列的stage来完成的,这些Stage是通过分布式的shuffle操作来进行切分的.而在每个Stage之间,Spark自动使用广播变量.

          这里用法说明,只有数据本身会在多个Stage的多个任务中反复使用,或者说缓存这个数据是非常重要且非常必要的情况下,使用广播变量才有意义.

        广播变量的使用如下:      

          // SparkContext.broadcast(v)进行创建,返回的是广播变量的封装器,以.value读取广播变量的的值
          val broadcastVar = sc.broadcast(Array(1, 2, 3))
          val v = broadcastVar.value

      1.2.3.2 累加器(accumulators) 

        累加器变量仅支持累加操作,因为可以在并行计算执行一些特殊的计算(比计数或者求和).并且累加器的变化是可以在UI的Task界面上看见的(注,不支持Python)

        累加器操作,依然遵循RDD的Lazy原则:

          累加器更新操作是在Action中,并且在每个任务中只会执行一次(如果任务失败重启了,累加器更新不会执行)

          而在transformation中,累加器依然不会立即执行更新,如果transformation被重新执行了,则累加器操作会重复执行

        对于累加器变量,Spark原生支持数值类型.一个使用例子如下        

          val accum = sc.longAccumulator("My Accumulator")
          sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))
          println(accum.value)

         也可以创建继承AccumulatorV2的类型,来实现自定义类型的累加器,例子如下:          

          //两个泛型参数->累加的元素类型和结果类型可以不同的
          class VectorAccumulatorV2 extends AccumulatorV2[MyVector, MyVector] {

            private val myVector: MyVector = MyVector.createZeroVector

            def reset(): Unit = {
              myVector.reset()
            }

            def add(v: MyVector): Unit = {
              myVector.add(v)
            }
            ...
          }

          // 创建一个自定义的累加器类型:
          val myVectorAcc = new VectorAccumulatorV2
          //将这个触发器注册到SparkContext:
          sc.register(myVectorAcc, "MyVectorAcc1")

  1.3 RDD的一些基本操作

    1.3.1 Transformations 操作

      map 将原来RDD中的每个项,用自定义的map函数进行映射转变为新的元素,并返回一个新的RDD

      filter 对原来RDD进行过滤,将过滤的结果返回为一个新的RDD

      flatMap 与map类似

    1.3.2 Action 操作

  1.4 Shuffle过程

    Spark的某些操作,会引起一个Shuffle过程.Shuffle是指不同节点上的不同分区数据整合重新分区分组的机制.

    所以Shuffle是一个代价很高的操作,因为它会导致executor和不同的机器节点之间进行数据复制.

    1.4.1 Shuffle简述

      以reduceByKey为例,将原始数据中key相同的记录聚合为一个组.这里挑战是原始数据很可能是存在不同分区不同机器的(参考MapReduce执行过程)

      Spark-Shuffle与MapReduce-Shuffle的区别

        MapReduce-Shuffle结果是分区有序,分区内再按Key排序

        Spark-Shuffle结果是分区有序,但分区内Key无序.

          要对Spark-Shuffle的分区内再排序,有以下方法:

           mapPartitions 在已有的每个分区上再使用.sort排序

           repartitionAndSortWithinPartitions  重建分区,并排序

           sortBy提前对RDD本身做一个全范围排序

    1.4.2 RDD中引起Shuffle的操作

       repartition类操作 例如:repartitioncoalesce

       _ByKey操作(除了counting相关操作)例如:groupByKeyreduceByKey

       join 例如:cogroupjoin

      1.4.3 Shuffle的性能影响

      Shuffle本身是同时高耗内存,高耗磁盘IO,高耗网络IO的昂贵操作.

        Spark会启动一系列的MapReduce(Hadoop MapReduce),产生大量的数据缓冲区与归并排序,大量的pill文件与归并Merge等等

以上是关于[Spark]-RDD详解之变量&操作的主要内容,如果未能解决你的问题,请参考以下文章

Spark算子执行流程详解之七

Spark函数详解系列之RDD基本转换

Spark存储系统详解

spark之RDD详解----五大特性

大数据之Spark:Spark Core

Spark Streaming源码解读之No Receivers详解