3天掌握Spark--RDD概念及WordCount案例

Posted 一只楠喃

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了3天掌握Spark--RDD概念及WordCount案例相关的知识,希望对你有一定的参考价值。

Spark之RDD

RDD 概念

对于大量的数据,Spark 在内部保存计算的时候,都是用一种叫做弹性分布式数据集(ResilientDistributed Datasets,RDD)的数据结构来保存的,所有的运算以及操作都建立在 RDD 数据结构的基础之上

在Spark框架中,将数据封装到集合中:RDD,如果要处理数据,调用集合RDD中函数即可。
在这里插入图片描述
(论文找不到想看的私我私我私我)
也就是说RDD设计的核心点为:
在这里插入图片描述

RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据抽象,代表一个不可变可分区、里面的元素可并行计算的集合。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-BLBQnFZj-1621602042472)(/img/image-20210421105136603.png)]

拆分核心要点三个方面:

在这里插入图片描述

可以认为RDD是分布式的列表List或数组Array,抽象的数据结构,RDD是一个抽象类AbstractClass和泛型Generic Type

在这里插入图片描述
RDD弹性分布式数据集核心点示意图如下:
在这里插入图片描述

RDD 概念之5大特性

RDD 数据结构内部有五个特性(摘录RDD 源码):前3个特性,必须包含的;后2个特性,可选的。
在这里插入图片描述
第一个:a list of partitions

词频统计WordCount中RDD

以词频统计WordCount程序为例,查看整个Job中各个RDD类型及依赖关系,WordCount程序
在这里插入图片描述
运行程序结束后,查看WEB UI监控页面,此Job(RDD调用foreach触发)执行DAG图
在这里插入图片描述

RDD 创建的两种方式

如何将数据封装到RDD集合中,主要有两种方式:并行化本地集合(Driver Program中)和引用加载外部存储系统(如HDFS、Hive、HBase、Kafka、Elasticsearch等)数据集。
在这里插入图片描述

  • 并行化集合

    • 由一个已经存在的 Scala 集合创建,集合并行化,集合必须时Seq本身或者子类对象。
    • 在这里插入图片描述
      只能将Scala中Seq对象或者子类对象,并行化RDD
/**
 * Spark 采用并行化的方式构建Scala集合Seq中的数据为RDD
	 * - 将Scala集合转换为RDD
	 *      sc.parallelize(seq)
	 * - 将RDD转换为Scala中集合
		 * rdd.collect() -> Array
		 * rdd.collectAsMap() - Map,要求RDD数据类型为二元组
 */
object _01SparkParallelizeTest {
	
	def main(args: Array[String]): Unit = {
		
		val sc: SparkContext = {
			// sparkConf对象
			val sparkConf = new SparkConf()
				// _01SparkParallelizeTest$  ->(.stripSuffix("$"))   ->  _01SparkParallelizeTest
    			.setAppName(this.getClass.getSimpleName.stripSuffix("$"))
    			.setMaster("local[2]")
			// sc 实例对象
			SparkContext.getOrCreate(sparkConf)
		}
		
		// TODO: 1、Scala中集合Seq序列存储数据
		val linesSeq: Seq[String] = Seq(
			"hadoop scala hive spark scala sql sql", //
			"hadoop scala spark hdfs hive spark", //
			"spark hdfs spark hdfs scala hive spark" //
		)
		
		// TODO: 2、并行化集合
		val inputRDD: RDD[String] = sc.parallelize(linesSeq, numSlices = 2)
		
		// TODO: 3、词频统计
		val resultRDD = inputRDD
			.flatMap(line => line.split("\\\\s+"))
			.map(word => (word, 1))
			.reduceByKey((tmp, item) => tmp + item)
		
		// TODO: 4、输出结果
		resultRDD.foreach(println)
		
		// 应用结束,关闭资源
		sc.stop()
	}
	
	
}
  • 外部存储系统

实际使用最多的方法:textFile,读取HDFS或LocalFS上文本文件,指定文件路径和RDD分区数目。

在这里插入图片描述
实际项目中如果从HDFS读取海量数据,应用运行在YARN上,默认情况下,RDD分区数目等于HDFS上Block块数目。

创建RDD时小文件读取

在实际项目中,有时往往处理的数据文件属于小文件(每个文件数据数据量很小,比如KB,几十MB等),文件数量又很大,如果一个个文件读取为RDD的一个个分区,计算数据时很耗时性能低下,使用SparkContext中提供:wholeTextFiles类,专门读取小文件数据。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-5ISlS434-1621602465191)(/img/image-20210421114928772.png)]
在这里插入图片描述
范例演示:读取100个小文件数据,每个文件大小小于1MB,设置RDD分区数目为2。

/**
 * 采用SparkContext#wholeTextFiles()方法读取小文件
 */
object _02SparkWholeTextFileTest {
	
	def main(args: Array[String]): Unit = {
		val sc: SparkContext = {
			// sparkConf对象
			val sparkConf = new SparkConf()
				// _01SparkParallelizeTest$  ->(.stripSuffix("$"))   ->  _01SparkParallelizeTest
				.setAppName(this.getClass.getSimpleName.stripSuffix("$"))
				.setMaster("local[2]")
			// sc 实例对象
			SparkContext.getOrCreate(sparkConf)
		}
		
		/*
		  def wholeTextFiles(
		      path: String,
		      minPartitions: Int = defaultMinPartitions
		  ): RDD[(String, String)]
		  Key: 每个小文件名称路径
		  Value:每个小文件的内容
		 */
		val inputRDD: RDD[(String, String)] = sc.wholeTextFiles("datas/ratings100", minPartitions = 2)
		
		println(s"RDD 分区数目 = ${inputRDD.getNumPartitions}")
		
		inputRDD.take(2).foreach(tuple => println(tuple))
		
		// 应用结束,关闭资源
		sc.stop()
		
	}
	
}

点个赞嘛!

在这里插入图片描述

以上是关于3天掌握Spark--RDD概念及WordCount案例的主要内容,如果未能解决你的问题,请参考以下文章

3天掌握Spark--RDD 共享变量

3天掌握Spark-- RDD持久化

3天掌握Spark-- RDD Checkpoint

spark之RDD详解----五大特性

一周掌握Flask框架学习笔记Flask概念及基础

一周掌握Flask框架学习笔记Flask概念及基础