Spark核心-RDD

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark核心-RDD相关的知识,希望对你有一定的参考价值。

参考技术A

RDD是Spark中的数据抽象,全称 弹性分布式数据集(Resilient Distributed Datasets) 。RDD可以理解为将一个大的数据集合以分布式的形式保存在集群服务器的内存中。RDD是一个容错的、并行的数据结构,可以让用户显式地将数据存储到磁盘和内存中,并能控制数据的分区。

RDD是Spark的核心,也是整个Spark的架构基础。

RDD的特点:

RDD的5个主要属性:

可以通过两种方式创建RDD:

转换操作指的是在原RDD实例上进行计算,然后创建一个新的RDD实例。

RDD中的所有的转换操作都是 惰性 的,在执行RDD的转换操作的时候,并不会直接计算结果,而是记住这些应用到基础数据集上的转换动作,只有行动操作时,这些转换才会真正的去执行。这样设计的好处是更加有效率的运行。

行动操作指的是向驱动器程序返回结果或把结果写入外部系统的操作。

Spark在调用RDD的行动操作的时候,会触发Spark中的连锁反应。当调用的行动操作的时候,Spark会尝试创建作为调用者的RDD。如果这个RDD是从文件中创建的,那么Spark会在worker节点上读取文件至内存中。如果这个RDD是通过其他RDD的转换得到的,Spark会尝试创建其父RDD。这个过程会一直持续下去,直到Spark找到根RDD。然后Spark就会真正执行这些生成RDD所必须的转换计算。最后完成行动操作,将结果返回给驱动程序或者写入外部存储。

Spark速度非常快的原因之一,就是在不同操作中在内存中持久化一个数据集。当持久化一个RDD后,每一个节点都将把计算的分片结果保存在内存中,并在对此数据集进行的其他动作中重用。这使得后续的动作变得更加迅速。缓存是Spark构建迭代算法和快速交互式查询的关键。所以我们在开发过程中,对经常使用的RDD要进行缓存操作,以提升程序运行效率。

RDD缓存的方法

RDD类提供了两种缓存方法:

cache方法其实是将RDD存储在集群中Worker的内存中。

persist是一个通用的cache方法。它可以将RDD存储在内存中或硬盘上或者二者皆有。

缓存的容错

缓存是有可能丢失(如机器宕机),或者存储于内存的数据由于内存不足而被删除。RDD的缓存的容错机制保证了即使缓存丢失也能保证计算的正确执行。通过基于RDD的一系列的转换,丢失的数据会被重新计算。因为RDD的各个Partition是相对独立的,所以在重新计算的时候只需要计算丢失部分Partition即可,不需要重新计算全部的Partition。因此,在一个缓存RDD的节点出现故障的时候,Spark会在另外的节点上自动重新创建出现故障的节点中存储的分区。

RDD的缓存能够在第一次计算完成后,将计算结果保存到内存、本地文件系统或者Tachyon中。通过缓存,Spark避免了RDD上的重复计算,能够极大地提升计算速度。但是,如果缓存丢失了,则需要重新计算。如果计算特别复杂或者计算特别耗时,那么缓存丢失对于整个Job的影响是不容忽视的。为了避免缓存丢失重新计算带来的开销,所以Spark引入了检查点(checkpoint)机制。

缓存是在计算结束后,直接将计算结果通过用户定义的存储级别写入不同的介质。而检查点不同,它是在计算完成后,重新建立一个Job来计算。所以为了避免重复计算,推荐先将RDD缓存,这样在进行检查点操作时就可以快速完成。

Spark会根据用户提交的计算逻辑中的RDD的转换和动作来生动RDD之间的依赖关系,同时这个计算链也就生成了逻辑上的DAG。

RDD之间的依赖关系包括:

Spark中的依赖关系主要体现为两种形式:

Spark 核心数据结构:弹性分布式数据集 RDD

大家好,我是勾叔。今天和大家分享Spark 核心数据结构:弹性分布式数据集 RDD。内容分为两块:RDD 的核心概念以及实践环节如何创建 RDD。


RDD 的核心概念
RDD是Spark 最核心的数据结构,RDD(Resilient Distributed Dataset)全称为弹性分布式数据集,是 Spark 对数据的核心抽象,也是最关键的抽象,它实质上是一组分布式的 JVM 不可变对象集合,不可变决定了它是只读的,所以 RDD 在经过变换产生新的 RDD 时,(如下图中 A-B),原有 RDD 不会改变。

弹性主要表现在两个方面:

在面对出错情况(例如任意一台节点宕机)时,Spark 能通过 RDD 之间的依赖关系恢复任意出错的 RDD(如 B 和 D 可以算出最后的 RDD),RDD 就像一块海绵一样,无论怎么挤压,都像海绵一样完整;

在经过转换算子处理时,RDD 中的分区数以及分区所在的位置随时都有可能改变。


每个 RDD 都有如下几个成员:

分区的集合;

用来基于分区进行计算的函数(算子);

依赖(与其他 RDD)的集合;

对于键-值型的 RDD 的散列分区器(可选);


如下图所示,RDD_0 根据 HDFS 上的块地址生成,块地址集合是 RDD_0 的成员变量,RDD_1由 RDD_0 与转换(transform)函数(算子)转换而成,该算子其实是 RDD_0 内部成员。从这个角度上来说,RDD_1 依赖于 RDD_0,这种依赖关系集合也作为 RDD_1 的成员变量而保存。

Spark 核心数据结构:弹性分布式数据集 RDD


在 Spark 源码中,RDD 是一个抽象类,根据具体的情况有不同的实现,比如 RDD_0 可以是 MapPartitionRDD,而 RDD_1 由于产生了 Shuffle,则是 ShuffledRDD。

下面我们来看一下 RDD 的源码:
// 表示RDD之间的依赖关系的成员变量@transient private var deps: Seq[Dependency[_]]// 分区器成员变量@transient val partitioner: Option[Partitioner] = None// 该RDD所引用的分区集合成员变量@transient private var partitions_ : Array[Partition] = null// 得到该RDD与其他RDD之间的依赖关系protected def getDependencies: Seq[Dependency[_]] = deps// 得到该RDD所引用的分区protected def getPartitions: Array[Partition]// 得到每个分区地址protected def getPreferredLocations(split: Partition): Seq[String] = Nil// distinct算子def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope { map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1)}
其中,你需要特别注意这一行代码:
@transient private var partitions_ : Array[Partition] = null

它说明了一个重要的问题,RDD 是分区的集合,本质上还是一个集合,所以在理解时,你可以用分区之类的概念去理解,但是在使用时,就可以忘记这些,把其当做是一个普通的集合。


实践环节:创建 RDD

Spark编程是一件不难的工作,而事实也确实如此。现在我们可以通过已有的 SparkSession 直接创建 RDD。创建RDD的方式有以下几类:通过并行集合创建RDD;从HDFS中加载数据创建RDD;从linux本地文件系统加载数据创建RDD。


了解了RDD的创建方式,接下来,我们逐个进行演示介绍:


通过并行集合创建RDD
这种 RDD 纯粹是为了学习,将内存中的集合变量转换为 RDD,没太大实际意义。
//val spark: SparkSession = .......val rdd = spark.sparkcontext.parallelize(Seq(1, 2, 3))

从HDFS中加载数据创建RDD
这种生成 RDD 的方式是非常常用的:
//val spark: SparkSession = .......val rdd = spark.sparkcontext.textFile("hdfs://namenode:8020/user/me/wiki.txt")


从HDFS中加载数据创建RDD
Spark 从 MySQL 中读取数据返回的 RDD 类型是 JdbcRDD,顾名思义,是基于 JDBC 读取数据的,这点与 Sqoop 是相似的,但不同的是 JdbcRDD 必须手动指定数据的上下界,也就是以 MySQL 表某一列的最值作为切分分区的依据。
//val spark: SparkSession = .......val lowerBound = 1val upperBound = 1000val numPartition = 10val rdd = new JdbcRDD(spark.sparkcontext,() => { Class.forName("com.mysql.jdbc.Driver").newInstance() DriverManager.getConnection("jdbc:mysql://localhost:3306/db", "root", "123456") }, "SELECT content FROM mysqltable WHERE ID >= ? AND ID <= ?", lowerBound,  upperBound,  numPartition, r => r.getString(1))

既然是基于 JDBC 进行读取,那么所有支持 JDBC 的数据库都可以通过这种方式进行读取,也包括支持 JDBC 的分布式数据库,但是你需要注意的是,从代码可以看出,这种方式的原理是利用多个 Executor 同时查询互不交叉的数据范围,从而达到并行抽取的目的。但是这种方式的抽取性能受限于 MySQL 的并发读性能,单纯提高 Executor 的数量到某一阈值后,再提升对性能影响不大。

上面介绍的是通过 JDBC 读取数据库的方式,对于 HBase 这种分布式数据库来说,情况有些不同,HBase 这种分布式数据库,在数据存储时也采用了分区的思想,HBase 的分区名为 Region,那么基于 Region 进行导入这种方式的性能就会比上面那种方式快很多,是真正的并行导入。
//val spark: SparkSession = .......val sc = spark.sparkcontextval tablename = "your_hbasetable"val conf = HBaseConfiguration.create()conf.set("hbase.zookeeper.quorum", "zk1,zk2,zk3")conf.set("hbase.zookeeper.property.clientPort", "2181")conf.set(TableInputFormat.INPUT_TABLE, tablename)val rdd= sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],classOf[org.apache.hadoop.hbase.client.Result]) // 利用HBase API解析出行键与列值rdd_three.foreach{case (_,result) => { val rowkey = Bytes.toString(result.getRow) val value1 = Bytes.toString(result.getValue("cf".getBytes,"c1".getBytes))}


值得一提的是 HBase 有一个第三方组件叫 Phoenix,可以让 HBase 支持 SQL 和 JDBC,在这个组件的配合下,第一种方式也可以用来抽取 HBase 的数据,此外,Spark 也可以读取 HBase 的底层文件 HFile,从而直接绕过 HBase 读取数据。说这么多,无非是想告诉你,读取数据的方法有很多,可以根据自己的需求进行选择。

通过第三方库的支持,Spark 几乎能够读取所有的数据源,例如 Elasticsearch,所以你如果要尝试的话,尽量选用 Maven 来管理依赖。

小结
RDD,本质上它可以看成是一个分布式的数据集合,目的就是隔离分布式数据集的复杂性。在实际情况中,大家经常会遇到从外部数据源读取成为RDD,如果理解了读取的本质,那么无论是什么数据源都能够轻松应对了。



推荐阅读:



以上是关于Spark核心-RDD的主要内容,如果未能解决你的问题,请参考以下文章

spark 教程一 RDD和核心概念

Spark编程模型(核心篇 一)

spark核心RDD的概念解析创建以及相关操作

spark系列-2Spark 核心数据结构:弹性分布式数据集 RDD

七spark核心数据集RDD

Spark笔记整理:RDD与spark核心概念名词