Spark核心-RDD
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark核心-RDD相关的知识,希望对你有一定的参考价值。
参考技术ARDD是Spark中的数据抽象,全称 弹性分布式数据集(Resilient Distributed Datasets) 。RDD可以理解为将一个大的数据集合以分布式的形式保存在集群服务器的内存中。RDD是一个容错的、并行的数据结构,可以让用户显式地将数据存储到磁盘和内存中,并能控制数据的分区。
RDD是Spark的核心,也是整个Spark的架构基础。
RDD的特点:
RDD的5个主要属性:
可以通过两种方式创建RDD:
转换操作指的是在原RDD实例上进行计算,然后创建一个新的RDD实例。
RDD中的所有的转换操作都是 惰性 的,在执行RDD的转换操作的时候,并不会直接计算结果,而是记住这些应用到基础数据集上的转换动作,只有行动操作时,这些转换才会真正的去执行。这样设计的好处是更加有效率的运行。
行动操作指的是向驱动器程序返回结果或把结果写入外部系统的操作。
Spark在调用RDD的行动操作的时候,会触发Spark中的连锁反应。当调用的行动操作的时候,Spark会尝试创建作为调用者的RDD。如果这个RDD是从文件中创建的,那么Spark会在worker节点上读取文件至内存中。如果这个RDD是通过其他RDD的转换得到的,Spark会尝试创建其父RDD。这个过程会一直持续下去,直到Spark找到根RDD。然后Spark就会真正执行这些生成RDD所必须的转换计算。最后完成行动操作,将结果返回给驱动程序或者写入外部存储。
Spark速度非常快的原因之一,就是在不同操作中在内存中持久化一个数据集。当持久化一个RDD后,每一个节点都将把计算的分片结果保存在内存中,并在对此数据集进行的其他动作中重用。这使得后续的动作变得更加迅速。缓存是Spark构建迭代算法和快速交互式查询的关键。所以我们在开发过程中,对经常使用的RDD要进行缓存操作,以提升程序运行效率。
RDD缓存的方法
RDD类提供了两种缓存方法:
cache方法其实是将RDD存储在集群中Worker的内存中。
persist是一个通用的cache方法。它可以将RDD存储在内存中或硬盘上或者二者皆有。
缓存的容错
缓存是有可能丢失(如机器宕机),或者存储于内存的数据由于内存不足而被删除。RDD的缓存的容错机制保证了即使缓存丢失也能保证计算的正确执行。通过基于RDD的一系列的转换,丢失的数据会被重新计算。因为RDD的各个Partition是相对独立的,所以在重新计算的时候只需要计算丢失部分Partition即可,不需要重新计算全部的Partition。因此,在一个缓存RDD的节点出现故障的时候,Spark会在另外的节点上自动重新创建出现故障的节点中存储的分区。
RDD的缓存能够在第一次计算完成后,将计算结果保存到内存、本地文件系统或者Tachyon中。通过缓存,Spark避免了RDD上的重复计算,能够极大地提升计算速度。但是,如果缓存丢失了,则需要重新计算。如果计算特别复杂或者计算特别耗时,那么缓存丢失对于整个Job的影响是不容忽视的。为了避免缓存丢失重新计算带来的开销,所以Spark引入了检查点(checkpoint)机制。
缓存是在计算结束后,直接将计算结果通过用户定义的存储级别写入不同的介质。而检查点不同,它是在计算完成后,重新建立一个Job来计算。所以为了避免重复计算,推荐先将RDD缓存,这样在进行检查点操作时就可以快速完成。
Spark会根据用户提交的计算逻辑中的RDD的转换和动作来生动RDD之间的依赖关系,同时这个计算链也就生成了逻辑上的DAG。
RDD之间的依赖关系包括:
Spark中的依赖关系主要体现为两种形式:
Spark 核心数据结构:弹性分布式数据集 RDD
在面对出错情况(例如任意一台节点宕机)时,Spark 能通过 RDD 之间的依赖关系恢复任意出错的 RDD(如 B 和 D 可以算出最后的 RDD),RDD 就像一块海绵一样,无论怎么挤压,都像海绵一样完整;
在经过转换算子处理时,RDD 中的分区数以及分区所在的位置随时都有可能改变。
分区的集合;
用来基于分区进行计算的函数(算子);
依赖(与其他 RDD)的集合;
对于键-值型的 RDD 的散列分区器(可选);
// 表示RDD之间的依赖关系的成员变量
private var deps: Seq[Dependency[_]]
// 分区器成员变量
val partitioner: Option[Partitioner] = None
// 该RDD所引用的分区集合成员变量
private var partitions_ : Array[Partition] = null
// 得到该RDD与其他RDD之间的依赖关系
protected def getDependencies: Seq[Dependency[_]] = deps
// 得到该RDD所引用的分区
protected def getPartitions: Array[Partition]
// 得到每个分区地址
protected def getPreferredLocations(split: Partition): Seq[String] = Nil
// distinct算子
def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] =
withScope {
map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1)
}
@transient private var partitions_ : Array[Partition] = null
Spark编程是一件不难的工作,而事实也确实如此。现在我们可以通过已有的 SparkSession 直接创建 RDD。创建RDD的方式有以下几类:通过并行集合创建RDD;从HDFS中加载数据创建RDD;从linux本地文件系统加载数据创建RDD。
了解了RDD的创建方式,接下来,我们逐个进行演示介绍:
//val spark: SparkSession = .......
val rdd = spark.sparkcontext.parallelize(Seq(1, 2, 3))
//val spark: SparkSession = .......
val rdd = spark.sparkcontext.textFile("hdfs://namenode:8020/user/me/wiki.txt")
//val spark: SparkSession = .......
val lowerBound = 1
val upperBound = 1000
val numPartition = 10
val rdd = new JdbcRDD(spark.sparkcontext,() => {
Class.forName("com.mysql.jdbc.Driver").newInstance()
DriverManager.getConnection("jdbc:mysql://localhost:3306/db", "root", "123456")
},
"SELECT content FROM mysqltable WHERE ID >= ? AND ID <= ?",
lowerBound,
upperBound,
numPartition,
r => r.getString(1)
)
//val spark: SparkSession = .......
val sc = spark.sparkcontext
val tablename = "your_hbasetable"
val conf = HBaseConfiguration.create()
conf.set("hbase.zookeeper.quorum", "zk1,zk2,zk3")
conf.set("hbase.zookeeper.property.clientPort", "2181")
conf.set(TableInputFormat.INPUT_TABLE, tablename)
val rdd= sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
classOf[org.apache.hadoop.hbase.client.Result])
// 利用HBase API解析出行键与列值
rdd_three.foreach{case (_,result) => {
val rowkey = Bytes.toString(result.getRow)
val value1 = Bytes.toString(result.getValue("cf".getBytes,"c1".getBytes))
}
以上是关于Spark核心-RDD的主要内容,如果未能解决你的问题,请参考以下文章