浅谈Spark RDD

Posted 山人彤

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了浅谈Spark RDD相关的知识,希望对你有一定的参考价值。

1 RDD是什么

RDD, 全称为 Resilient Distributed Datasets, 是一个容错的, 并行的数据结构, 可以让用户显式地将数据存储到磁盘和内存中, 并能控制数据的分区。同时, RDD 还提供了一组丰富的操作来操作这些数据。在这些操作中, 诸如 map,flatMap,filter 等转换操作实现了 Monad 模式, 很好地契合了 Scala 的集合操作. 除此之外, RDD 还提供了诸如 join, groupBy, reduceByKey 等更为方便的操作, 以支持常见的数据运算。

通常来讲, 针对数据处理有几种常见模型, 包括: Iterative Algorithms, Relational Queries, MapReduce, Stream Processing。例如 Hadoop MapReduce 采用了 MapReduce 模型, Storm 则采用了 Stream Processing 模型. RDD 混合了这四种模型, 使得 Spark 可以应用于各种大数据处理场景。

2 RDD为什么会出现

在 RDD 出现之前, 当时 MapReduce 是比较主流的, 而 MapReduce 如何执行迭代计算的任务呢?

多个 MapReduce 任务之间没有基于内存的数据共享方式, 只能通过磁盘来进行共享,这种方式明显比较低效。RDD 如何解决迭代计算非常低效的问题呢?

浅谈Spark RDD

在 Spark 中, 其实最终 Job3 从逻辑上的计算过程是: Job3 = (Job1.map).filter, 整个过程是共享内存的, 而不需要将中间结果存放在可靠的分布式文件系统中

这种方式可以在保证容错的前提下, 提供更多的灵活, 更快的执行速度, RDD 在执行迭代型任务时候的表现可以通过下面代码体现:

// 线性回归
val points = sc.textFile(...)
 .map(...)
 .persist(...)
val w = randomValue
for (i <- 1 to 10000) 
{
    val gradient = points.map(p => p.x * (1 / (1 + exp(-p.y * (w dot p.x))) - 1) * p.y)
     .reduce(_ + _)
    w -= gradient
}

在这个例子中, 进行了大致 10000 次迭代, 如果在 MapReduce 中实现, 可能需要运行很多 Job, 每个 Job 之间都要通过 HDFS 共享结果, 熟快熟慢一窥便知

3 RDD特点

3.1 RDD 是一个数据模型

  1. RDD 允许用户显式的指定数据存放在内存或者磁盘
  2. RDD 是分布式的, 用户可以控制 RDD 的分区

3.2 RDD 是一个编程模型

  1. RDD 提供了丰富的操作
  2. RDD 提供了 map, flatMap, filter 等操作符, 用以实现 Monad 模式
  3. RDD 提供了 reduceByKey, groupByKey 等操作符, 用以操作 Key-Value 型数据
  4. RDD 提供了 max, min, mean 等操作符, 用以操作数字型的数据

3.3  RDD 可以分区

浅谈Spark RDD

RDD 是一个分布式计算框架, 所以, 一定是要能够进行分区计算的, 只有分区了, 才能利用集群的并行计算能力

同时, RDD 不需要始终被具体化, 也就是说: RDD 中可以没有数据, 只要有足够的信息知道自己是从谁计算得来的就可以, 这是一种非常高效的容错方式

3.4 RDD 是只读的

RDD 是只读的, 不允许任何形式的修改. 虽说不能因为 RDD 和 HDFS 是只读的, 就认为分布式存储系统必须设计为只读的. 但是设计为只读的, 会显著降低问题的复杂度, 因为 RDD 需要可以容错, 可以惰性求值, 可以移动计算, 所以很难支持修改。

  • RDD2 中可能没有数据, 只是保留了依赖关系和计算函数, 那修改啥?
  • 如果因为支持修改, 而必须保存数据的话, 怎么容错?
  • 如果允许修改, 如何定位要修改的那一行? RDD 的转换是粗粒度的, 也就是说, RDD 并不感知具体每一行在哪.

3.5 RDD 是可以容错的

  • RDD 的容错有两种方式

    1. 保存 RDD 之间的依赖关系, 以及计算函数, 出现错误重新计算

    2. 直接将 RDD 的数据存放在外部存储系统, 出现错误直接读取, Checkpoint

4 创建RDD

简略的说, RDD 有三种创建方式

  • RDD 可以通过本地集合直接创建

      def rddCreationLocal():Unit={
        //创建配置信息,设置主机名 local[2]表示本地虚拟两个线程的方式 设置应用名称
        // 1. 创建SparkConf
        val conf = new SparkConf().setMaster("local[2]").setAppName("demo01")
        // 2. 使用SparkConf创建conf
        val sc = new SparkContext(conf)
        // SparkConf作为大入口API,能创建RDD,并且设置参数,设置Jar包等

        val list = List(123456)
        //第一种方式 parallelize 可以不指定分区数
        // 第一个参数是本地集合 第二个参数是分区数量
        val rdd1:RDD[Int] = sc.parallelize(list, 2)
        //第二种方式 makeRDD
        val rdd2:RDD[Int] = sc.makeRDD(list, 2)
      }

  • RDD 也可以通过读取外部数据集来创建

      def rddCreationFiles():Unit={
        val conf = new SparkConf().setMaster("local[2]").setAppName("demo01")
        val sc = new SparkContext(conf)
        /**
         * 1.textFile传入的是什么?
         * 传入的是一个路径,读取路径
         * hdfs:// file:// /.../...(这种方式为在集群中执行还是在本地执行,在集群中,读取的是hdfs,本地是文件)
         * 2 是否支持分区?
         *  *假如传入的是path是 hdfs:///... 分区由hdfs文件的block决定的
         * 3. 支持什么平台
         *  支持aws和阿里云
         */

        val rdd1:RDD[String]= sc.textFile("file:///....")
      }
  • RDD 也可以通过其它的 RDD 衍生而来

    def rddCreateFromRDD():Unit={
        val conf = new SparkConf().setMaster("local[2]").setAppName("demo01")
        val sc = new SparkContext(conf)
        val rdd1 = sc.parallelize(Seq(1,2,3))

        /**
         * 通过在rdd上执行算子操作,会生成新的rdd
         * 原地计算
         * str.substr 返回新的字符串,非原地计算
         * 和字符串的方式很像,字符串是不可变的,rdd也是不可变的
         **/

        val rdd2 = rdd1.map(item=>item)
      }


以上是关于浅谈Spark RDD的主要内容,如果未能解决你的问题,请参考以下文章

spark浅谈:RDD

浅谈Spark算子

浅谈Spark内部运行机制

spark浅谈:SPARK核心编程

spark浅谈:

Spark——RDD算子