3天掌握Spark--RDD概念及WordCount案例
Posted 一只楠喃
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了3天掌握Spark--RDD概念及WordCount案例相关的知识,希望对你有一定的参考价值。
Spark之RDD
RDD 概念
对于大量的数据,Spark 在内部保存计算的时候,都是用一种叫做弹性分布式数据集(ResilientDistributed Datasets,RDD)的数据结构来保存的,所有的运算以及操作都建立在 RDD 数据结构的基础之上
在Spark框架中,将数据封装到集合中:RDD
,如果要处理数据,调用集合RDD中函数即可。
(论文找不到想看的私我私我私我)
也就是说RDD设计的核心点为:
RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据抽象,代表一个不可变、可分区、里面的元素可并行计算的集合。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-BLBQnFZj-1621602042472)(/img/image-20210421105136603.png)]
拆分核心要点三个方面:
可以认为RDD是分布式的列表List或数组Array,抽象的数据结构,RDD是一个抽象类AbstractClass和泛型
Generic Type
:
RDD弹性分布式数据集核心点示意图如下:
RDD 概念之5大特性
RDD 数据结构内部有五个特性(摘录RDD 源码):前3个特性,必须包含的;后2个特性,可选的。
第一个:a list of partitions
-
第二个:A function for computing each split
-
第三个:A list of dependencies on other RDDs
-
在RDD类中,对应一个方法:
第四个:Optionally, a Partitioner for key-value RDDs
-
第五个:Optionally, a list of preferred locations to compute each split on
-
对数据计算时,考虑数据本地行,数据在哪里,尽量将Task放在哪里,快速读取数据进行处理
-
RDD 是一个数据集的表示,不仅表示了数据集,还表示了这个数据集从哪来、如何计算,主要属性包括五个方面(必须牢记,通过编码加深理解,面试常问):
词频统计WordCount中RDD
以词频统计WordCount程序为例,查看整个Job中各个RDD类型及依赖关系,WordCount程序
运行程序结束后,查看WEB UI监控页面,此Job(RDD调用foreach触发)执行DAG图
RDD 创建的两种方式
如何将数据封装到RDD集合中,主要有两种方式:并行化本地集合
(Driver Program中)和引用加载外部存储系统
(如HDFS、Hive、HBase、Kafka、Elasticsearch等)数据集。
-
并行化集合:
- 由一个已经存在的 Scala 集合创建,集合并行化,集合必须时Seq本身或者子类对象。
只能将Scala中Seq对象或者子类对象,并行化RDD
/**
* Spark 采用并行化的方式构建Scala集合Seq中的数据为RDD
* - 将Scala集合转换为RDD
* sc.parallelize(seq)
* - 将RDD转换为Scala中集合
* rdd.collect() -> Array
* rdd.collectAsMap() - Map,要求RDD数据类型为二元组
*/
object _01SparkParallelizeTest {
def main(args: Array[String]): Unit = {
val sc: SparkContext = {
// sparkConf对象
val sparkConf = new SparkConf()
// _01SparkParallelizeTest$ ->(.stripSuffix("$")) -> _01SparkParallelizeTest
.setAppName(this.getClass.getSimpleName.stripSuffix("$"))
.setMaster("local[2]")
// sc 实例对象
SparkContext.getOrCreate(sparkConf)
}
// TODO: 1、Scala中集合Seq序列存储数据
val linesSeq: Seq[String] = Seq(
"hadoop scala hive spark scala sql sql", //
"hadoop scala spark hdfs hive spark", //
"spark hdfs spark hdfs scala hive spark" //
)
// TODO: 2、并行化集合
val inputRDD: RDD[String] = sc.parallelize(linesSeq, numSlices = 2)
// TODO: 3、词频统计
val resultRDD = inputRDD
.flatMap(line => line.split("\\\\s+"))
.map(word => (word, 1))
.reduceByKey((tmp, item) => tmp + item)
// TODO: 4、输出结果
resultRDD.foreach(println)
// 应用结束,关闭资源
sc.stop()
}
}
- 外部存储系统
实际使用最多的方法:textFile,读取HDFS或LocalFS上文本文件,指定文件路径和RDD分区数目。
实际项目中如果从HDFS读取海量数据,应用运行在YARN上,默认情况下,RDD分区数目等于HDFS上Block块数目。
创建RDD时小文件读取
在实际项目中,有时往往处理的数据文件属于小文件(每个文件数据数据量很小,比如KB,几十MB等),文件数量又很大,如果一个个文件读取为RDD的一个个分区,计算数据时很耗时性能低下,使用SparkContext中提供:
wholeTextFiles
类,专门读取小文件数据。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-5ISlS434-1621602465191)(/img/image-20210421114928772.png)]
范例演示:读取100个小文件数据,每个文件大小小于1MB,设置RDD分区数目为2。
/**
* 采用SparkContext#wholeTextFiles()方法读取小文件
*/
object _02SparkWholeTextFileTest {
def main(args: Array[String]): Unit = {
val sc: SparkContext = {
// sparkConf对象
val sparkConf = new SparkConf()
// _01SparkParallelizeTest$ ->(.stripSuffix("$")) -> _01SparkParallelizeTest
.setAppName(this.getClass.getSimpleName.stripSuffix("$"))
.setMaster("local[2]")
// sc 实例对象
SparkContext.getOrCreate(sparkConf)
}
/*
def wholeTextFiles(
path: String,
minPartitions: Int = defaultMinPartitions
): RDD[(String, String)]
Key: 每个小文件名称路径
Value:每个小文件的内容
*/
val inputRDD: RDD[(String, String)] = sc.wholeTextFiles("datas/ratings100", minPartitions = 2)
println(s"RDD 分区数目 = ${inputRDD.getNumPartitions}")
inputRDD.take(2).foreach(tuple => println(tuple))
// 应用结束,关闭资源
sc.stop()
}
}
点个赞嘛!
以上是关于3天掌握Spark--RDD概念及WordCount案例的主要内容,如果未能解决你的问题,请参考以下文章