SparkSpark Core Day04

Posted Maynor大数据

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SparkSpark Core Day04相关的知识,希望对你有一定的参考价值。

Spark Day04:Spark Core

02-[了解]-今日课程内容提纲

主要讲解RDD函数,分为2类:Transformation转换函数和Action触发函数

RDD中函数:
	- 函数分类,不同类型函数功能
	- 常见函数概述
	- 5种类型RDD函数
		实际项目中使用最多的,必须要掌握
	- RDD 持久化函数
		可以将RDD分布式集合数据进行缓存,比如缓存到Executor内存中,再次处理数据时,直接从内存读取
	- RDD Checkpoint
		将RDD数据保存到可靠文件系统中,比如HDFS

首先创建Maven Module模块,编写好代码模块,讲解某个知识点时,在编写核心代码

03-[掌握]-RDD 函数分类

RDD 的操作主要可以分为 TransformationAction 两种。

  • Transformation 转换,将1个RDD转换为另一个RDD
  • Action 触发,当1个RDD调用函数以后,触发一个Job执行(调用Action函数以后,返回值不是RDD)

官方文档:http://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-operations

RDD中2种类型操作函数:Transformation(lazy)和Action(eager)函数

  • Transformation转换函数

  • Action触发函数,触发一个Job执行

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-a1fQcH5e-1638793130130)(/img/image-20210422150349862.png)]

04-[了解]-RDD 中常见函数概述

RDD中包含很多函数,主要可以分为两类:Transformation转换函数和Action函数。

主要常见使用函数如下,每个函数通过演示范例讲解。

1、分区操作函数
	对RDD中每个分区数据进行操作
	
2、重分区函数
	调整RDD中分区数目,要么变大,要么变小

3、聚合函数
	对RDD中数据进行聚合统计,比如使用reduce、redueBykey等
	
4、关联函数
	对2个RDD进行JOIN操作,类似SQL中JOIN,分为:等值JOIN、左外连接和右外连接、全外连接fullOuterJoin

RDD函数练习:运行spark-shell命令行,在本地模式运行,执行函数使用

05-[掌握]-RDD 函数之基本函数使用

​ RDD中map、filter、flatMap及foreach等函数为最基本函数,都是对RDD中每个元素进行操作,将元素传递到函数中进行转换

编写词频统计WordCount程序,使用基本函数

package cn.itcast.spark.func.basic

import org.apache.spark.rdd.RDD
import org.apache.spark.SparkConf, SparkContext

/**
 * 演示RDD中基本函数使用
 */
object _01SparkBasicTest 
	
	def main(args: Array[String]): Unit = 
		// 创建SparkContext实例对象,传递SparkConf对象,设置应用配置信息
		val sc: SparkContext = 
			// a. 创建SparkConf对象
			val sparkConf = new SparkConf()
				.setAppName(this.getClass.getSimpleName.stripSuffix("$"))
				.setMaster("local[2]")
			// b. 传递sparkConf对象,构建SparkContext实例
			SparkContext.getOrCreate(sparkConf)
		
		
		// step1. 读取数据
		val inputRDD: RDD[String] = sc.textFile("datas/wordcount/input.data", minPartitions = 2)
		
		// step2. 处理数据
		val resultRDD: RDD[(String, Int)] = inputRDD
			// 过滤数据
			.filter(line => null != line && line.trim.length > 0)
			// 分割单词
			.flatMap(line => line.trim.split("\\\\s+"))
			// 转换为二元组
			.map(word => word -> 1)
			// 按照单词分组,对组内数据进行聚合求和
			.reduceByKey((tmp, item) => tmp + item) // TODO: 隐式转换,将RDD对象抓好为PairRDDFunctions对象,调用方法
		
		// step3. 输出数据
		resultRDD.foreach(item => println(item))
		
		// 应用结束,关闭资源
		sc.stop()
	
	

06-[掌握]-RDD 函数之分区操作函数

每个RDD由多分区组成的,实际开发建议对每个分区数据的进行操作,map函数使用mapPartitions代替foreach函数使用foreachPartition代替

前面编写WordCount词频统计代码中,使用map函数和forearch函数,针对RDD中每个元素操作,并不是针对每个分区数据操作的,如果针对分区操作:mapPartitions和foreachPartition

针对分区数据进行操作时,函数的参数类型:迭代器Iterator,封装分区中所有数据

针对词频统计WordCount代码进行修改,针对分区数据操作,范例代码如下:

package cn.itcast.spark.func.iter

import org.apache.spark.rdd.RDD
import org.apache.spark.SparkConf, SparkContext

/**
 * 分区操作函数:mapPartitions和foreachPartition
 */
object _02SparkIterTest 
	
	def main(args: Array[String]): Unit = 
		// 创建SparkContext实例对象,传递SparkConf对象,设置应用配置信息
		val sc: SparkContext = 
			// a. 创建SparkConf对象
			val sparkConf = new SparkConf()
				.setAppName(this.getClass.getSimpleName.stripSuffix("$"))
				.setMaster("local[2]")
			// b. 传递sparkConf对象,构建SparkContext实例
			SparkContext.getOrCreate(sparkConf)
		
		
		// step1. 读取数据
		val inputRDD: RDD[String] = sc.textFile("datas/wordcount.data", minPartitions = 2)
		
		// step2. 处理数据
		val resultRDD: RDD[(String, Int)] = inputRDD
			// 过滤数据
			.filter(line => line.trim.length != 0 )
			// 对每行数据进行单词分割
			.flatMap(line => line.trim.split("\\\\s+"))
			// 转换为二元组
    		//.map(word => word -> 1)
			/*
			  def mapPartitions[U: ClassTag](
			      f: Iterator[T] => Iterator[U],
			      preservesPartitioning: Boolean = false
			  ): RDD[U]
			 */
    		.mapPartitions(iter => iter.map(word => (word, 1)))
			// 分组聚合
			.reduceByKey((tmp, item) => tmp + item)
		
		// step3. 输出数据
		//resultRDD.foreach(item => println(item))
		/*
		  def foreachPartition(f: Iterator[T] => Unit): Unit
		 */
		resultRDD.foreachPartition(iter => iter.foreach(item => println(item)))
		
		// 应用结束,关闭资源
		sc.stop()
	
	

为什么要对分区操作,而不是对每个数据操作,好处在哪里呢???

07-[掌握]-RDD 函数之重分区函数

如何对RDD中分区数目进行调整(增加分区或减少分区),在RDD函数中主要有如下三个函数。

上述2个函数最为关键:
	- 增加RDD分区数目:repartition
	- 减少RDD分区数目:coalesce,不产生Shuffle
package cn.itcast.spark.func.iter

import org.apache.spark.rdd.RDD
import org.apache.spark.SparkConf, SparkContext

/**
 * 分区操作函数:mapPartitions和foreachPartition
 */
object _02SparkPartitionTest 
	
	def main(args: Array[String]): Unit = 
		// 创建SparkContext实例对象,传递SparkConf对象,设置应用配置信息
		val sc: SparkContext = 
			// a. 创建SparkConf对象
			val sparkConf = new SparkConf()
				.setAppName(this.getClass.getSimpleName.stripSuffix("$"))
				.setMaster("local[2]")
			// b. 传递sparkConf对象,构建SparkContext实例
			SparkContext.getOrCreate(sparkConf)
		
		
		// step1. 读取数据
		val inputRDD: RDD[String] = sc.textFile("datas/wordcount.data", minPartitions = 2)
		println(s"raw rdd partitions = $inputRDD.getNumPartitions")
		
		// TODO: 增加RDD分区数目
		val etlRDD: RDD[String] = inputRDD.repartition(3)
		println(s"etl rdd partitions = $etlRDD.getNumPartitions")
		
		// step2. 处理数据
		val resultRDD: RDD[(String, Int)] = inputRDD
			// 过滤数据
			.filter(line => line.trim.length != 0 )
			// 对每行数据进行单词分割
			.flatMap(line => line.trim.split("\\\\s+"))
			// 转换为二元组
    		//.map(word => word -> 1)
			/*
			  def mapPartitions[U: ClassTag](
			      f: Iterator[T] => Iterator[U],
			      preservesPartitioning: Boolean = false
			  ): RDD[U]
			 */
    		.mapPartitions(iter => iter.map(word => (word, 1)))
			// 分组聚合
			.reduceByKey((tmp, item) => tmp + item)
		
		// step3. 输出数据
		//resultRDD.foreach(item => println(item))
		/*
		  def foreachPartition(f: Iterator[T] => Unit): Unit
		 */
		// TODO: 降低结果RDD分区数目
		val outputRDD: RDD[(String, Int)] = resultRDD.coalesce(1)
		println(s"output rdd partitions = $outputRDD.getNumPartitions")
		outputRDD.foreachPartition(iter => iter.foreach(item => println(item)))
		
		// 应用结束,关闭资源
		sc.stop()
	
	

在实际开发中,什么时候适当调整RDD的分区数目呢?让程序性能更好好呢????

08-[掌握]-RDD 函数之RDD 中聚合函数

​ 回顾列表List中reduce聚合函数核心概念:聚合的时候,往往需要聚合中间临时变量。查看列表List中聚合函数reduce和fold源码如下:

通过代码,看看列表List中聚合函数使用:

运行截图如下所示:

fold聚合函数,比reduce聚合函数,多提供一个可以初始化聚合中间临时变量的值参数:

聚合操作时,往往聚合过程中需要中间临时变量(到底时几个变量,具体业务而定),如下案例:

在RDD中提供类似列表List中聚合函数reduce和fold,查看如下:

案例演示:求列表List中元素之和,RDD中分区数目为2,核心业务代码如下:

运行结果解析如下:

查看RDD中高级聚合函数aggregate,函数声明如下:

业务需求:对RDD中数据进行求和sum。

		// TODO:aggregate函数,累计求和
		/*
		def aggregate[U: ClassTag]
		(zeroValue: U)
		(
		   seqOp: (U, T) => U,
		   combOp: (U, U) => U
		): U
		 */
		val aggSum: Int = datasRDD.aggregate(0)(
			// seqOp: (U, T) => U    分区内数据聚合
			(tmp: Int, item: Int) => 
				println(s"seq -> p-$TaskContext.getPartitionId(): tmp = $tmp, item = $item, sum = $tmp + item")
				tmp + item
			,
			// combOp: (U, U) => U    分区间数据聚合
			(tmp, item) => 
				println(s"comb -> p-$TaskContext.getPartitionId(): tmp = $tmp, item = $item, sum = $tmp + item")
				tmp + item
			
		)
		println(s"aggSum = $aggSum")

09-[掌握]-RDD 函数之PairRDDFunctions 聚合函数

​ 在Spark中有一个object对象PairRDDFunctions,主要针对RDD的数据类型是Key/Value对的数据提供函数,方便数据分析处理。比如使用过的函数:reduceByKey、groupByKey等。

*ByKey函数将相同Key的Value进行聚合操作的,省去先分组再聚合。

  • 第一类:分组函数groupByKey

  • 第二类:分组聚合函数reduceByKey和foldByKey

  • 第三类:分组聚合函数aggregateByKey

​ 在企业中如果对数据聚合使用,不能使用reduceByKey完成时,考虑使用aggregateByKey函数,基本上都能完成任意聚合功能。

10-[掌握]-RDD 函数之关联JOIN函数

当两个RDD的数据类型为二元组Key/Value对时,可以依据Key进行关联Join。

RDD中关联JOIN函数都在PairRDDFunctions中,具体截图如下:

具体看一下join(等值连接)函数说明:

范例演示代码:

package cn.itcast.spark.func.join

import org.apache.spark.rdd.RDD
import org.apache.spark.SparkConf, SparkContext

/**
 * RDD中关联函数Join,针对RDD中数据类型为Key/Value对
 */
object _04SparkJoinTest 
	
	def main(args: Array[String]): Unit = 
		// 创建SparkContext实例对象,传递SparkConf对象,设置应用配置信息
		val sc: SparkContext = 
			// a. 创建SparkConf对象
			val sparkConf = new SparkConf()
				.setAppName(this.getClass.getSimpleName.stripSuffix("$"))
				.setMaster("local[2]")
			// b. 传递sparkConf对象,构建SparkContext实例
			SparkContext.getOrCreate(sparkConf)
		
		
		// 模拟数据集
		val empRDD: RDD[(Int, String)] = sc.parallelize(
			Seq((1001, "zhangsan"), (1002, "lisi"), (1003, "wangwu"), (1004, "zhaoliu"))
		)
		val deptRDD: RDD[(Int, String)] = sc.parallelize(
			Seq((1001, "sales"), (1002, "tech"))
		)
		
		// TODO: 等值连接
		//                deptno  empname  deptname
		val joinRDD: RDD[(Int, (String, String))] = empRDD.join(deptRDD)
		joinRDD.foreachcase (deptno, (empname, deptname)) =>
			println(s"deptno = $deptno, empname = $empname, deptname = $deptname")
		
		
		println("======================================================")
		// TODO:左外连接
		val leftRDD: RDD[(Int, (String, Option[String]))] = empRDD.leftOuterJoin(deptRDD)
		leftRDD.foreachcase (deptno, (empname, option)) =>
			val deptname: String = option match 
				case Some(name) => name
				case None => "未知"
			
			println(s"deptno = $deptno, empname = $empname, deptname = $deptname")
		
		
		// 应用结束,关闭资源
		sc.stop()
	
	

11-[掌握]-RDD 持久化

​ 在实际开发中某些RDD的计算或转换可能会比较耗费时间,如果这些RDD后续还会频繁的被使用到,那么可以将这些RDD进行持久化/缓存,这样下次再使用到的时候就不用再重新计算了,提高了程序运行的效率。

将RDD数据进行缓存时,本质上就是将RDD各个分区数据进行缓存

  • 缓存函数

可以将RDD数据直接缓存到内存中,函数声明如下:

​ 但是实际项目中,不会直接使用上述的缓存函数,RDD数据量往往很多,内存放不下的。在实际的项目中缓存RDD数据时,往往使用如下函数,依据具体的业务和数据量,指定缓存的级别:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-bxNtlFD7-1638793130145)(/img/image-20210422172215367.png)]

  • 缓存级别

在Spark框架中对数据缓存可以指定不同的级别,对于开发来说至关重要,如下所示:

实际项目中缓存数据时,往往选择如下两种级别:

缓存函数与Transformation函数一样,都是Lazy操作,需要Action函数触发,通常使用count函数触发

  • 释放缓存

缓存的RDD数据,不再被使用时,考虑释资源,使用如下函数:

此函数属于eager,立即执行。

  • 何时缓存数据

在实际项目开发中,什么时候缓存RDD数据,最好呢???

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-TKk5WJgJ-1638793130147)(img/image-20210422172821282.png)]

12-[了解]-RDD Checkpoint

​ RDD 数据可以持久化,但是持久化/缓存可以把数据放在内存中,虽然是快速的,但是也是最不可靠的;也可以把数据放在磁盘上,也不是完全可靠的!例如磁盘会损坏等。

Checkpoint的产生就是为了更加可靠的数据持久化,在Checkpoint的时候一般把数据放在在HDFS上,这就天然的借助了HDFS天生的高容错、高可靠来实现数据最大程度上的安全,实现了RDD的容错和高可用。

​ 在Spark Core中对RDD做checkpoint,可以切断做checkpoint RDD的依赖关系,将RDD数据保存到可靠存储(如HDFS)以便数据恢复;

案例演示代码如下:

package cn.itcast.spark.ckpt

import org.apache.spark.SparkConf, SparkContext

/**
 * RDD数据Checkpoint设置,案例演示
 */
object _06SparkCkptTest 
	
	def main(args: Array[String]): Unit = 
		// 创建应用程序入口SparkContext实例对象
		val sc: SparkContext = 
			// 1.a 创建SparkConf对象,设置应用的配置信息
			val sparkConf: SparkConf = new SparkConf()
				.setAppName(this.getClass.getSimpleName.stripSuffix("$"))
				.setMaster("local[2]")
			// 1.b 传递SparkConf对象,构建Context实例
			new SparkContext(sparkConf)
		
		
		// TODO: 设置检查点目录,将RDD数据保存到那个目录
		sc.setCheckpointDir("datas/ckpt/")
		
		// 读取文件数据
		val datasRDD = sc.textFile("datas/wordcount.data")
		
		// TODO: 调用checkpoint函数,将RDD进行备份,需要RDD中Action函数触发
		datasRDD.checkpoint()
		datasRDD.count()
		
		
		// TODO: 再次执行count函数, 此时从checkpoint读取数据
		println(datasRDD.count())
		
		
		// 应用程序运行结束,关闭资源
		Thread.sleep(1000000000)
		sc.stop()
	
	


面试题:持久化和Checkpoint的区别:

以上是关于SparkSpark Core Day04的主要内容,如果未能解决你的问题,请参考以下文章

markdown [Apereo CAS 3.5 CORE] Apereo CAS 3.5 #CAS的核心代码片段

如何使用模块化代码片段中的LeakCanary检测内存泄漏?

django.core.exceptions.ImproperlyConfigured: Requested setting DEFAULT_INDEX_TABLESPACE的解决办法(转)(代码片段

django.core.exceptions.ImproperlyConfigured: Requested setting DEFAULT_INDEX_TABLESPACE的解决办法(转)(代码片段

day06-jsp

SparkSpark的一个案例 Encountered removing nulls from dataset or using handleInvalid = “keep“ or “skip“(代码