用一个例子告诉你 怎样在spark中创建RDD

Posted 广阔天地大有可为

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了用一个例子告诉你 怎样在spark中创建RDD相关的知识,希望对你有一定的参考价值。

目录

1. 前言

2. 分发驱动中scala集合中的数据

2.1 parallelize

2.2 makeRDD

2.3 range

3. 分发外部存储系统中的数据

3.1 textFile

3.2 wholeTextFiles


1. 前言

众所周知,spark是一种计算引擎(用来计算数据),但是数据从何而来呢?
       spark获取数据主要有两种方式:
             方式1:
                     分发驱动程序中scala集合中的数据
             方式2:
                     分发外部存储系统中的数据(HDFS、HBase、其他共享文件系统)

spark将读来的数据,分发到了哪里去?
       spark是一个分布式计算引擎,spark会将读取来的数据
                 按照指定的并行度,分发到不同的计算节点上去


2. 分发驱动中scala集合中的数据

spark提供了两个方法,用来将本地集合的数据(客户端JVM)切分成若干份

                                      然后再分发到不同的计算节点中去
    主要有两个参数:
             seq: Seq[T]       本地集合
             numSlices: Int    切片数(可选参数,不指定时使用默认切片数)

2.1 parallelize

  test("parallelize") 
    // 初始化 spark配置实例
    val sparkconf: SparkConf = new SparkConf().setMaster("local[4]").setAppName("")
    // 初始化 spark环境对象
    val sc: SparkContext = new SparkContext(sparkconf)

    val list = List("java", "scala", "c++", "c#")

    // 指定切片数
    val rdd1: RDD[String] = sc.parallelize(list, 2)
    // 使用默认切片
    val rdd2: RDD[String] = sc.parallelize(list)

    sc.stop()
  

2.2 makeRDD

  test("makeRDD") 
    /*
    * TODO : 源码中 makeRDD 调用的还是 parallelize方法
    *
    * */

    // 初始化 spark配置实例
    val sparkconf: SparkConf = new SparkConf().setMaster("local[4]").setAppName("")
    // 初始化 spark环境对象
    val sc: SparkContext = new SparkContext(sparkconf)

    val list = List("java", "scala", "c++", "c#")

    // 指定切片数
    val rdd1: RDD[String] = sc.makeRDD(list, 2)
    // 使用默认切片
    val rdd2: RDD[String] = sc.makeRDD(list)

    sc.stop()
  

2.3 range

def range(start: Long,  end: Long,step: Long = 1,numSlices: Int = defaultParallelism): RDD[Long]

功能:
           创建一个Long类型的RDD,元素内容为 start到end,公差为step 的等差数列

参数:
           start: Long, 起始位置
           end: Long, 结束位置,不包括该位置
           step: Long = 1, 数列公差,默认为1
           numSlices: Int = defaultParallelism 切片数,不指定时使用默认切片数

使用场景:
          常用来造数据使用

  test("range") 
    // 初始化 spark配置实例
    val sparkconf: SparkConf = new SparkConf().setMaster("local[4]").setAppName("")
    // 初始化 spark环境对象
    val sc: SparkContext = new SparkContext(sparkconf)

    val rdd: RDD[Long] = sc.range(0, 10)

    sc.stop()
  

3. 分发外部存储系统中的数据

spark提供了两个方法,用于将外部文件数据切片后,再分发到不同的计算节点上去
    主要有两个参数:
           path: String  指定文件系统URL
           minPartitions: Int  指定切片数(不指定时,使用默认切片数)


使用限制:
对文件系统的要求:
         读取的文件系统必须是 HDFS、本地文件系统、任何hadoop支持的文件系统
对读取文件的要求:
         文件格式必须是 text格式且UTF-8
对url的要求:
         支持单个文件  /my/directory/1.txt
         支持多个文件  /my/directory/*.txt
         支持目录         /my/directory (目录下的必须都是文件,不能有目录存在)
                 java.io.IOException: Path: /dir/dir2 is a directory, which is not supported by the record 
                 只能读取目录下的文件,不会对子目录进行遍历读取
         支持gz格式的压缩文件 /my/directory/*.txt


3.1 textFile

返回 RDD[String] 格式的rdd,每个元素内容为 读取到text文件的每行rdd的长度为所有文件的行数

  test("textFile") 
    // 初始化 spark配置实例
    val sparkconf: SparkConf = new SparkConf().setMaster("local[4]").setAppName("")
    // 初始化 spark环境对象
    val sc: SparkContext = new SparkContext(sparkconf)

    // 读取目录下的所有文件
    val rdd: RDD[String] = sc.textFile("src/main/resources/data/dir")
    // 读取gz格式的压缩文件
    //val rdd: RDD[String] = sc.textFile("src/main/resources/data/dir/1.txt.gz")
    rdd.foreach(println(_))

    sc.stop()
  

3.2 wholeTextFiles

返回 RDD[(String, String)] 格式的rdd,每个元素内容为 (文件名称,文件内容),rdd的长度为读取到的文件个数

  test("wholeTextFiles") 
    // 初始化 spark配置实例
    val sparkconf: SparkConf = new SparkConf().setMaster("local[4]").setAppName("")
    // 初始化 spark环境对象
    val sc: SparkContext = new SparkContext(sparkconf)

    /*
    * TODO data/dir目录下 虽然存在目录不会报错,但是读取时会过滤掉目录,并不会递归读取子目录
    * */
    val rdd: RDD[(String, String)] = sc.wholeTextFiles("src/main/resources/data/dir")
    rdd.foreach(e => println(s"fileName:$e._1   data:$e._2"))

    sc.stop()
  

 

以上是关于用一个例子告诉你 怎样在spark中创建RDD的主要内容,如果未能解决你的问题,请参考以下文章

用一个例子告诉你 怎样在Flink DataStream API 中读取数据源

火花在UDF中创建数据框

10 张图告诉你 RocketMQ 是怎样保存消息的

10 张图告诉你 RocketMQ 是怎样保存消息的

怎样用postgresql建表,建数据库

怎样在swift中创建一个CocoaPods