Spark基础学习笔记21:RDD检查点与共享变量
Posted howard2005
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark基础学习笔记21:RDD检查点与共享变量相关的知识,希望对你有一定的参考价值。
文章目录
零、本讲学习目标
- 理解RDD检查点机制的特点与用处
- 理解共享变量的类别、特点与使用
一、RDD检查点
(一)RDD检查点机制
- RDD的检查点机制(Checkpoint)相当于对RDD数据进行快照,可以将经常使用的RDD快照到指定的文件系统中,最好是共享文件系统,例如HDFS。当机器发生故障导致内存或磁盘中的RDD数据丢失时,可以快速从快照中对指定的RDD进行恢复,而不需要根据RDD的依赖关系从头进行计算,大大提高了计算效率。
(二)与RDD持久化的区别
- cache()或者persist()是将数据存储于机器本地的内存或磁盘,当机器发生故障时无法进行数据恢复,而检查点是将RDD数据存储于外部的共享文件系统(例如HDFS),共享文件系统的副本机制保证了数据的可靠性。
- 在Spark应用程序执行结束后,cache()或者persist()存储的数据将被清空,而检查点存储的数据不会受影响,将永久存在,除非手动将其移除。因此,检查点数据可以被下一个Spark应用程序使用,而cache()或者persist()数据只能被当前Spark应用程序使用。
(三)RDD检查点案例演示
- 在
net.huawei.rdd
包里创建CheckpointDemo
对象
package net.huawei.rdd
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkConf, SparkContext
/**
* 功能:RDD检查点演示
* 作者:华卫
* 日期:2022年04月16日
*/
object CheckpointDemo
def main(args: Array[String]): Unit =
// 设置系统属性(本地运行必须设置,否则无权访问HDFS)
System.setProperty("HADOOP_USER_NAME", "root")
// 创建SparkConf对象
val conf = new SparkConf()
// 设置应用程序名称,可在Spark WebUI里显示
conf.setAppName("Spark-CheckpointDemo")
// 设置集群Master节点访问地址
conf.setMaster("local[2]")
// 设置测试内存
conf.set("spark.testing.memory", "2147480000")
// 基于SparkConf对象创建SparkContext对象,该对象是提交Spark应用程序的入口
val sc = new SparkContext(conf)
// 设置检查点数据存储路径
sc.setCheckpointDir("hdfs://master:9000/spark-ck")
// 创建模拟数据RDD
val rdd: RDD[Int] = sc.parallelize(List(1, 1, 2, 3, 5, 8, 13))
// 过滤结果
val resultRDD = rdd.filter(_ >= 5)
// 持久化RDD到内存中
resultRDD.cache()
// 将resultRDD标记为检查点
resultRDD.checkpoint()
// 第一次行动算子计算时,将把标记为检查点的RDD数据存储到文件系统指定路径中
val result: String = resultRDD.collect().mkString(", ")
println(result)
// 第二次行动算子计算时,将直接从文件系统读取resultRDD数据,而不需要从头计算
val count = resultRDD.count()
println(count)
// 停止Spark容器
sc.stop()
- 上述代码使用checkpoint()方法将RDD标记为检查点(只是标记,遇到行动算子才会执行)。在第一次行动计算时,被标记为检查点的RDD的数据将以文件的形式保存在setCheckpointDir()方法指定的文件系统目录中,并且该RDD的所有父RDD依赖关系将被移除,因为下一次对该RDD计算时将直接从文件系统中读取数据,而不需要根据依赖关系重新计算。
- Spark建议,在将RDD标记为检查点之前,最好将RDD持久化到内存,因为Spark会单独启动一个任务将标记为检查点的RDD的数据写入文件系统,如果RDD的数据已经持久化到了内存,将直接从内存中读取数据,然后进行写入,提高数据写入效率,否则需要重复计算一遍RDD的数据。
- 创建检查点保存数据的目录
- 运行程序,在控制台查看结果
- 查看HDFS检查点目录,执行命令:
hdfs dfs -ls -R /spark-ck
二、共享变量
- 通常情况下,Spark应用程序运行的时候,Spark算子(例如map(func)或filter(func))中的函数func会被发送到远程的多个Worker节点上执行,如果一个算子中使用了某个
外部变量
,该变量就会复制到Worker节点的每一个Task任务中,各个Task任务对变量的操作相互独立。当变量所存储的数据量非常大时(例如一个大型集合)将增加网络传输及内存的开销。因此,Spark提供了两种共享变量:广播变量和累加器。
(一)广播变量
- 广播变量是将一个变量通过广播的形式发送到每个Worker节点的缓存中,而不是发送到每个Task任务中,各个Task任务可以共享该变量的数据。因此,广播变量是只读的。
1、默认情况下变量的传递
- map()算子传入的函数中使用外部变量arr
scala> val arr = Array(1, 2, 3, 4, 5)
scala> val lines = sc.textFile("data.txt")
scala> val result = lines.map((_, arr))
scala> result.collect()
- 上述代码中,传递给map()算子的函数
(_, arr)
会被发送到Executor端执行,而变量arr
将发送到Worker节点
的所有Task任务
中。变量arr传递的流程如下图所示。
- 假设变量
arr
存储的数据量大小有100MB
,则每一个Task任务都需要维护100MB
的副本,若某一个Executor中启动了3
个Task任务,则该Executor将消耗300MB
内存。
2、使用广播变量时变量的传递
- 广播变量其实是对普通变量的封装,在分布式函数中可以通过Broadcast对象的
value
方法访问广播变量的值
- 使用广播变量将数组arr传递给map()算子
scala> val arr = Array(1, 2, 3, 4, 5)
scala> val broadcastVar = sc.broadcast(arr)
scala> val lines = sc.textFile("data.txt")
scala> val result = lines.map((_, broadcastVar))
scala> result.collect()
- 上述代码使用
broadcast()
方法向集群发送(广播)了一个只读变量,该方法只发送一次,并返回一个广播变量broadcastVar
,该变量是一个org.apache.spark.broadcast.Broadcast
对象。Broadcast对象是只读的,缓存在集群的每个Worker节点中。使用广播变量进行变量传递的流程如下图所示。
- Worker节点的每个Task任务共享唯一的一份广播变量,大大减少了网络传输和内存开销。
- 输出result的数据
(二)累加器
1、累加器功能
- 累加器提供了将Worker节点的值聚合到Driver的功能,可以用于实现计数和求和。
2、不使用累加器
- 对一个整型数组求和
- 上述代码由于
sum
变量在Driver
中定义,而累加操作sum = sum + x
会发送到Executor
中执行,因此输出结果不正确。
3、使用累加器
- 对一个整型数组求和
scala> val myacc = sc.longAccumulator("My Accumulator") // 声明累加器
scala> val rdd = sc.makeRDD(Array(1, 2, 3, 4, 5))
scala> rdd.foreach(x => myacc.add(x)) // 向累加器添加值
scala> println("sum = " + myacc.value) // 在Driver端输出结果
- 上述代码通过调用SparkContext对象的
longAccumulator ()
方法创建了一个Long
类型的累加器,默认初始值为0
。也可以使用doubleAccumulator()
方法创建Double
类型的累加器。 - 累加器只能在
Driver端定义
,在Executor端更新
。Executor端不能读取累加器的值,需要在Driver端
使用value
属性读取。
以上是关于Spark基础学习笔记21:RDD检查点与共享变量的主要内容,如果未能解决你的问题,请参考以下文章
学习笔记Spark—— Spark编程基础(创建RDDRDD算子文件读取与存储)