Spark-RDD创建并行分区

Posted open_test01

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark-RDD创建并行分区相关的知识,希望对你有一定的参考价值。

集合(内存)中创建 RDD

外部存储(文件)创建 RDD

并行与分区

分区的设定


集合(内存)中创建 RDD

        从集合中创建 RDD,Spark 主要提供了两个方法:parallelize 和 makeRDD,从底层代码实现来讲,makeRDD 方法其实就是 parallelize 方法

parallelize方法代表并行处理

def main(args: Array[String]): Unit = 
    //准备环境
    //"*"代表线程的核数   应用程序名称"RDD"
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
    val sc = new SparkContext(sparkConf)

    //创建RDD
    //从内存中创建RDD 将内存中集合的数据最为处理的数据源
    //定义一个集合
    val seq = Seq[Int](1,2,3,4,5,6,7)
    //val rdd: RDD[Int] = sc.parallelize(seq)
    val rdd: RDD[Int] = sc.makeRDD(seq)

    rdd.collect().foreach(println)

    //关闭环境
    sc.stop()
  

外部存储(文件)创建 RDD

def main(args: Array[String]): Unit = 
    //准备环境
    //"*"代表线程的核数   应用程序名称"RDD"
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
    val sc = new SparkContext(sparkConf)

    //创建RDD
     //从文件中创建RDD 将文件中的数据最为处理的数据源
        //path路径默认以当前环境的根路径为基准。可以写绝对路径也可以写相对路径
        //也可以是目录名
        //也可以是分布式路径HDFS
    //sc.textFile("D:\\\\spark.test\\\\datas")
    val rdd: RDD[String] = sc.textFile("datas\\\\a.txt")

    rdd.collect().foreach(println)

    //关闭环境
    sc.stop()
  

        读取目录是如果想知道具体文件来源于哪个文件用wholeTextFiles方法

def main(args: Array[String]): Unit = 
    //准备环境
    //"*"代表线程的核数   应用程序名称"RDD"
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
    val sc = new SparkContext(sparkConf)

    //创建RDD
     //从文件中创建RDD 将文件中的数据最为处理的数据源
      //wholeTextFiles方法读取的内容为元组 第一个参数表路径 第二个参数为文件内容
    val rdd: RDD[(String, String)] = sc.wholeTextFiles("datas")//以文件为单位读取数据

    rdd.collect().foreach(println)

    //关闭环境
    sc.stop()
  

 并行与分区

        默认情况下,Spark 可以将一个作业切分多个任务后,发送给 Executor 节点并行计算,而能 够并行计算的任务数量我们称之为并行度。这个数量可以在构建 RDD 时指定。记住,这里 的并行执行的任务数量,并不是指的切分任务的数量,不要混淆了。

def main(args: Array[String]): Unit = 
    //准备环境
    //"*"代表线程的核数   应用程序名称"RDD"
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
    val sc = new SparkContext(sparkConf)

    //创建RDD
      //RDD并行度&分区 makeRDD(集合,切片数量)
    val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5, 6), 3)

    //将处理数据保存成分区文件
    rdd.saveAsTextFile("output")

    //关闭环境
    sc.stop()

运行结束后产生的分区文件在output目录 中

分区的设定

def main(args: Array[String]): Unit = 
    //准备环境
    //"*"代表线程的核数   应用程序名称"RDD"
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
    val sc = new SparkContext(sparkConf)

    //创建RDD
      //textFile可以将文件作为数据处理的数据源,默认也可以设定分区 textFile(路径,分区数量)
        //minPartitions:最小分区数量 默认分区为2 
    val rdd: RDD[String] = sc.textFile("datas\\\\a.txt",3)
    rdd.saveAsTextFile("output")
    //关闭环境
    sc.stop()
  

 分区数据的分配

  1. 数据以行为单位进行读取 spark采用hadoop的方式读取数据
  2. 数据读取时以偏移量为单位 偏移量不会被重复读取
  3. 数据分区的偏移量范围的计算 行=>[行第一个元素偏移量,结尾不包含元素的偏移量 ]

以上是关于Spark-RDD创建并行分区的主要内容,如果未能解决你的问题,请参考以下文章

Spark-RDD

Spark-RDD 模型 以及运行原理

Spark-RDD/DataFrame/DateSet

入门大数据---Spark_RDD

Spark 并行度

有没有办法在分区的 spark 数据集上并行运行操作?