Spark RDD 详细介绍
Posted 常生果
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark RDD 详细介绍相关的知识,希望对你有一定的参考价值。
RDD(Resilent Distributed Datasets)俗称弹性分布式数据集,是Spark 底层的分布式存储的数据结构,可以说是 Spark 的核心, Spark API 的所有操作都是基于 RDD 的. 有容错机制并可以被并行操作的元素集合,具有只读、分区、容错、高效、无需物化、可以缓存、RDD依赖等特征。RDD只是数据集的抽象,分区内部并不会存储具体的数据。
RDD数据不只存储在一台机器上,而是分布在多台机器上,实现数据计算的并行化.弹性表明数据丢失时,可以进行重建.在Spark 1.5版以后,新增了数据结构 Spark-DataFrame,仿造的 R 和 python 的类 SQL 结构-DataFrame, 底层为 RDD, 能够让数据从业人员更好的操作 RDD.
在Spark 的设计思想中,为了减少网络及磁盘 IO 开销,需要设计出一种新的容错方式,于是才诞生了新的数据结构 RDD. RDD 是一种只读的数据块,可以从外部数据转换而来,你可以对RDD 进行函数操作(Operation),包括 Transformation 和 Action. 在这里只读表示当你对一个 RDD 进行了操作,那么结果将会是一个新的 RDD, 这种情况放在代码里,假设变换前后都是使用同一个变量表示这一 RDD,RDD里面的数据并不是真实的数据,而是一些元数据信息,记录了该 RDD 是通过哪些 Transformation 得到的,在计算机中使用 lineage 来表示这种血缘结构,lineage 形成一个有向无环图 DAG, 整个计算过程中,将不需要将中间结果落地到 HDFS 进行容错,加入某个节点出错,则只需要通过 lineage 关系(血统关系图)重新计算即可.
RDD之所以称之抽象数据集,是因为rdd存放的不是实际数据集,而是数据转化的步骤规则,简称DAG
DAG真正被触发执行实际的数据处理,需要一个导火索,该导火索就是Action计算子,才会被真正唤醒。
高容错性主要是数据以Hadoop作为数据源存储,Hadoop是数据副本
RDD 主要具有如下特点:
- 1.它是在集群节点上的不可变的、已分区的集合对象;
- 2.通过并行转换的方式来创建(如 Map、 filter、join 等);
- 3.失败自动重建;
- 4.可以控制存储级别(内存、磁盘等)来进行重用;
- 5.必须是可序列化的;
- 6.是静态类型的(只读)。
-
有一个分片列表,就是能被切分,和hadoop一样,能够切分的数据才能并行计算。由一个函数计算每一个分片。
RDD 的创建方式主要有2种:
1,并行化(Parallelizing)一个已经存在与驱动程序(Driver Program)中的集合如set、list;
2,读取外部存储系统上的一个数据集,比如HDFS、Hive、HBase,或者任何提供了Hadoop InputFormat的数据源.也可以从本地读取 txt、csv 等数据集
注意:在读取二进制文件时候,如果文件是空数据,则rdd会报错
RDD 的操作函数(operation)主要分为2种类型 Transformation 和 Action.
类别 | 函数 | 区别 |
---|---|---|
Transformation | Map,filter,groupBy,join, union,reduce,sort,partitionBy | 返回值是RDD ,不 提交 Spark 集群运行 |
Action | count,collect,take,save, show | 返回值不是RDD ,会形成 DAG 图,提交 Spark 集群运行 并立即返回结果 |
Transformation 不触发提交作业,完成作业中间处理过程
Spark 遇到 Transformation 操作只记录操作,并去执行,需要等到有 Action 操作的时候才会真正启动计算过程进行计算.
针对每个 Action,Spark 会生成一个 Job, 从数据的创建开始,经过 Transformation, 结尾是 Action 操作.这些操作对应形成一个有向无环图(DAG),形成 DAG 的先决条件是最后的函数操作是一个Action.
Spark RDD 算子介绍
Transformation算子
不触发提交作业,完成作业中间处理过程。
[1]Value数据类型
输入分区与输出分区一对一型:map、flatMap、mapPartitions、glom
输入分区与输出分区多对一型:union、cartesian
输入分区与输出分区一对多型:groupBy
输出分区为输入分区子集型:filter、distinct、subtract、sample、takeSample
Cache型:cache、persist
[2]Key-Value数据类型
一对一:mapValues
聚集:combineByKey、reduceByKey、partitionBy、cogroup
连接:join、leftOutJoin、rightOutJoin
Action算子
会触发SparkContext提交Job作业。
无输出类型:foreach
HDFS类型:saveAsTextFile、saveAsObjectFile
Scala类型:collect、collectAsMap、reduceByKeyLocally、lookup、count、top、reduce、fold、aggregate
RDD可以相互依赖。
若RDD的每个分区最多只能被一个Child RDD的一个分区使用,则称之为narrow dependency;若多个Child RDD分区都可以依赖,则称之为wide dependency。
不同操作依据其特性,可能会产生不同的依赖,例如map操作会产生narrow dep,而join操作则产生wide dep。
例子:
val arr = Array("cat", "dog", "lion", "monkey", "mouse")
// create RDD by collection
val rdd = sc.parallize(arr)
// Map: "cat" -> c, cat
val rdd1 = rdd.Map(x => (x.charAt(0), x))
// groupby same key and count
val rdd2 = rdd1.groupBy(x => x._1).
Map(x => (x._1, x._2.toList.length))
val result = rdd2.collect()
print(result)
// output:Array((d,1), (l,1), (m,2))
首先,当你在解释器里一行行输入的时候,实际上 Spark 并不会立即执行函数,而是当你输入了val result = rdd2.collect()
的时候, Spark 才会开始计算,从 sc.parallize(arr)
到最后的 collect
,形成一个 Job.
以上是关于Spark RDD 详细介绍的主要内容,如果未能解决你的问题,请参考以下文章