spark-day04-依赖关系-持久化-分区器-数据结构

Posted 总会有天明

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了spark-day04-依赖关系-持久化-分区器-数据结构相关的知识,希望对你有一定的参考价值。

一:依赖关系

1:依赖和血缘关系介绍

        rdd.todebugstring:打印血缘关系

        rdd.dependencies:打印依赖关系

2:保存血缘关系

 3:OneToOne依赖---窄依赖

 4:shuffle依赖--宽依赖

        新的RDD的一个分区的数据依赖于旧的RDD多个分区的数据,这个依赖称之为shuffle依赖。

5:窄依赖的任务

 6:宽依赖的任务

 7:任务分类

1: 一个main方法里面可能有多个行动算子,比如collect,所以会有多个job

2:一个job可能会有多个阶段,比如上图宽依赖

3:一个阶段可能会有多个task,比如上图一个阶段中的多个分区

 二:持久化

1:RDD自身并不会保存数据,重复读取对象

 2:引入持久化进行优化(文件、内存均可)

3:持久化操作必须在行动算子执行时完成的。不然没有数据,没办法进行持久化。 

4:RDD对象的持久化操作并不一定是为了重用,在数据执行较长,或数据比较重要的场合也可以采用持久化操作。

5:CheckPoint检查点

所谓的检查点,就是通过将RDD中间结果写入磁盘。

由于血缘依赖过长会造成容错成本过高,这样就不如在中间阶段做检查点容错,如果检查点之后出现问题,可以从检查点开始重做血缘,减少了开销。

对RDD进行checkpoint操作并不会马上被执行,必须执行action操作才能触发。

6: 缓存和检查点的区别

1:cache缓存只是将数据保存起来,不切断血缘依赖。checkpoint检查点切断血缘依赖。

2:cache缓存的数据通常存储在磁盘、内存等地方,可靠性低。checkpoint的数据通常存储在hdfs等容错、高可用的文件系统,可靠性高。

3:建议对checkpoin的rdd使用cache缓存,这样checkpoint的job只需从cache缓存中读取数据即可,否则需要再从头计算一次rdd

cache:将数据临时存储在内存中进行数据重用

                会在血缘关系中添加新的依赖。一旦出现问题,可以重头读取数据。

persist:将数据临时存储在磁盘文件中进行数据重用

                涉及到磁盘IO,性能较低,但是数据安全

                如果作业执行完毕,临时保存的数据文件就会丢失

checkpoint:将数据长久的保存在磁盘文件中进行数据重用

                涉及到磁盘IO,性能较低,但是数据安全

                为了保证数据安全,所以一般情况下,会独立执行作业

                为了能够提高效率,一般情况下,是需要和cache联合使用

                执行过程中,会切断血缘关系,重新建立新的血缘关系。因为保存的数据比较安全,所以就是数据源的保存地址发生了改变。导致血缘关系发生改变。

三:分区器

1:自定义分区器:根据设置的规则,将同一规则的数据放在同一分区内

package com.atguigu.bigdata.spark.rdd.part

import org.apache.spark.Partitioner, SparkConf, SparkContext

object Spark01_RDD_Part 
  def main(args: Array[String]): Unit = 
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
    val sc = new SparkContext(sparkConf)

    val rdd = sc.makeRDD(List(
      ("nba","************"),
      ("cba","************"),
      ("wnba","************"),
      ("nba","************")
    ),3)

    val value = rdd.partitionBy(new MyPartitioner)
    value.saveAsTextFile("output")
    sc.stop()
  

  class MyPartitioner extends Partitioner
    //分区数量
    override def numPartitions: Int = 3

    //根据数据的key值,返回数据的分区索引(从0开始)
    override def getPartition(key: Any): Int = 
      key match 
        case "nba" => 0
        case "wnba" => 1
        case _ => 2
      
    
  

四:文件的读取与保存

1:保存

package com.atguigu.bigdata.spark.rdd.IO

import org.apache.spark.SparkConf, SparkContext

object Spark01_RDD_IO_Save 
  def main(args: Array[String]): Unit = 
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
    val sc = new SparkContext(sparkConf)

    val rdd = sc.makeRDD(
      List(
        ("a",1),
        ("b",2),
        ("c",3)
      )
    )
    rdd.saveAsTextFile("output1")
    rdd.saveAsObjectFile("output2")
    rdd.saveAsSequenceFile("output3")

    sc.stop()
  

2:读取

package com.atguigu.bigdata.spark.rdd.IO

import org.apache.spark.SparkConf, SparkContext

object Spark02_RDD_IO_Load 
  def main(args: Array[String]): Unit = 
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
    val sc = new SparkContext(sparkConf)

    val rdd = sc.textFile("output1")
    println(rdd.collect().mkString(","))

    val rdd1 = sc.objectFile[(String,Int)]("output2")
    println(rdd1.collect().mkString(","))

    val rdd2 = sc.sequenceFile[String,Int]("output3")
    println(rdd2.collect().mkString(","))

    sc.stop()
  

五:数据结构--累加器(分布式的共享只写变量)

1:概念

累加器用来将executor端变量信息聚合到driver端。在driver程序中定义的变量,在executor端的每个task都会得到这个变量的一份新的副本,每个task更新这些副本的值后,传回driver端进行merge

package com.atguigu.bigdata.spark.acc

import org.apache.spark.SparkConf, SparkContext

object Spark01_Acc 
  def main(args: Array[String]): Unit = 
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
    val sc = new SparkContext(sparkConf)
    val rdd = sc.makeRDD(List(1,2,3,4))

    //获取系统累加器,spark默认提供了简单数据聚合的累加器
    val sumAcc = sc.longAccumulator("sum")
    rdd.foreach(
      num => 
        sumAcc.add(num)
      
    )

    println(sumAcc.value)
    sc.stop()
  

 2:累加器的少加和多加

package com.atguigu.bigdata.spark.acc

import org.apache.spark.SparkConf, SparkContext

object Spark02_Acc 
  def main(args: Array[String]): Unit = 
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
    val sc = new SparkContext(sparkConf)
    val rdd = sc.makeRDD(List(1,2,3,4))

    //获取系统累加器,spark默认提供了简单数据聚合的累加器
    val sumAcc = sc.longAccumulator("sum")
    val mapRDD = rdd.map(
      num => 
        sumAcc.add(num)
        num
      
    )

    //少加:转换算子中调用累加器,如果没有行动算子的话,那么不会执行
    mapRDD.collect()
    mapRDD.collect()
    //多加:多次执行
    println(sumAcc.value)
    sc.stop()
  

3:自定义累加器

package com.atguigu.bigdata.spark.acc

import org.apache.spark.util.AccumulatorV2
import org.apache.spark.SparkConf, SparkContext

object Spark03_Acc 
  def main(args: Array[String]): Unit = 
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
    val sc = new SparkContext(sparkConf)
    val rdd = sc.makeRDD(List("hello","spark","hello"))

    //累加器:word count
    //创建累加器对象
    val wcAcc = new MyAccumulator()
    //向spark进行注册
    sc.register(wcAcc,"wordCountAcc")
    rdd.foreach(
      word => 
        //数据的累加(使用累加器)
        wcAcc.add(word)
      
    )
    println(wcAcc.value)
    sc.stop()
  

  /*
 自定义累加器
 1.继承:AccumulatorV2 定义泛型
    IN:累加器输入的数据类型
    OUT:累加器返回的数据类型

    2.重写方法
   */
  class MyAccumulator extends AccumulatorV2[String,Map[String,Long]] 

    private var wcMap = Map[String,Long]()

    //判断是否初始状态
    override def isZero: Boolean = 
      wcMap.isEmpty
    

    override def copy(): AccumulatorV2[String, Map[String, Long]] = 
      new MyAccumulator()
    

    override def reset(): Unit = 
      wcMap.clear()
    

    //获取累加器需要计算的值
    override def add(word: String): Unit = 
      val newCnt = wcMap.getOrElse(word,0L) + 1
      wcMap.updated(word,newCnt)
    

    //driver合并多个累加器
    override def merge(other: AccumulatorV2[String, Map[String, Long]]): Unit = 
      val map1 = this.wcMap
      val map2 = other.value
      map2.foreach
        case (word,count) => 
          val newCount = map1.getOrElse(word,0L) + count
          map1.updated(word,newCount)
        
      
    

    //累加器结果
    override def value: Map[String, Long] = 
      wcMap
    
  


六:广播变量

        Task的量,是由driver的分区数决定的,和executor的个数无关

        转换为

       只能访问不能修改

        spark中的广播变量就可以将闭包的数据保存到executor的内存中,不能进行更改。

package com.atguigu.bigdata.spark.acc

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkConf, SparkContext
import org.apache.spark.util.AccumulatorV2

import scala.collection.mutable

object Spark04_Bc 
  def main(args: Array[String]): Unit = 
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
    val sc = new SparkContext(sparkConf)
    val rdd1 = sc.makeRDD(List(
      ("a",1),("b",2),("c",3)
    ))

    /*val rdd2 = sc.makeRDD(List(
      ("a",4),("b",5),("c",6)
    ))

    //join会导致数据量几何增长,并且会影响shuffle大的性能,不推荐使用
    val value:RDD[(String,(Int,Int))] = rdd1.join(rdd2)

    value.collect().foreach(println)*/

    /*val map = mutable.Map(("a",4),("b",5),("c",6))
    rdd1.map
      case (w,c) => 
        val l:Int = map.getOrElse(w,0)
        (w,(c,l))
      
    .collect().foreach(println)*/

    val map = mutable.Map(("a",4),("b",5),("c",6))
    //封装广播变量
    val bc:Broadcast[mutable.Map[String,Int]] = sc.broadcast(map)
    rdd1.map
      case (w,c) => 
        //访问广播变量
        val l:Int = bc.value.getOrElse(w,0)
        (w,(c,l))
      
    .collect().foreach(println)

    sc.stop()
  


 

        

以上是关于spark-day04-依赖关系-持久化-分区器-数据结构的主要内容,如果未能解决你的问题,请参考以下文章

RDD的分区依赖关系机制

04 Spring的依赖注入

类型安全激活器依赖错误

Spark-序列化依赖关系持久化

Ubuntu18.04boot空间不足,删除旧内核又显示有未满足的依赖关系?

Spark DAG 依赖关系 Stage