Spark RDD 初探——Spark开发学习笔记

Posted katus

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark RDD 初探——Spark开发学习笔记相关的知识,希望对你有一定的参考价值。

Spark RDD 初探

弹性分布式数据集(Resilient Distributed Dataset,RDD)是Spark中的核心概念,基本上所有的Spark运算操作对象都是RDD,我们今天就来简单认识一下这个RDD。
说明:由于本文的开发基于Java,因此所有的观点都是基于Java的。

一、RDD 创建

RDD是一种数据集,初步我们可以将其想象成一个数组类似的数据结构,先不去管实际的存储结构。RDD的创建方式可以划分成两大类。

(一)从Driver程序的数据集生成RDD

直说就是程序本身数据生成RDD,不是从外部导入的数据。一般是通过SparkContext对象(实际为JavaSparkContext)的parallelize方法来创建RDD。

SparkSession session = SparkSession.builder().getOrCreate();
JavaSparkContext sparkContext = JavaSparkContext.fromSparkContext(session.sparkContext());
List<Integer> list1 = Arrays.asList(1, 2, 3, 4, 5);
List<String> list2 = Arrays.asList("hello", "RDD", "Spark");
JavaRDD<Integer> myRDD1 = sparkContext.parallelize(list1);
JavaRDD<String> myRDD2 = sparkContext.parallelize(list2);

上述代码中myRDD1和myRDD2均为通过程序的数据集生成的RDD。

Java开发中RDD(实际为JavaRDD)只能通过List数据类型来生成。

(二)从外部数据集生成RDD

从外部数据集加载的相关方法有很多,包括(可能不限于)如下列举的方法。

  • textFile方法,从文本文件加载。
  • hadoopFile方法,从Hadoop文件加载。
  • sequenceFile方法。
  • objectFile方法,读取序列化文件。
  • binaryFiles方法,以二进制格式直接读取Hadoop MapReduce计算的结果文件。
  • hadoopRDD方法,读取HBase文件。
JavaRDD<Integer> myRDD = sparkContext.textFile(inputFile);

二、RDD 操作

RDD的操作分为两大类,分别是Transformation(转换)和Action(动作)。Spark进行Transformation时采用lazy模式,即计算不是立刻执行,只有当Action操作触发时才会进行启动运算。

  • Transformation:由一个RDD生成另一个RDD的过程。
    • map(function)
      • 对RDD中的每个元素进行function操作,生成新元素构成的新RDD返回。
    • filter(function)
      • 对RDD中的元素进行过滤,如果调用函数返回true则保留,返回过滤后的RDD。
    • flatMap(function)
      • 与map类似,但是每个元素调用之后可能会产生0至多个元素,将这些所有的元素全部扁平化构成一个新的RDD,要求function的返回类型为Seq类型。
    • mapPartitions(function)
      • 与map类似,但是function的作用对象是一整个分区,即Iterator -> Iterator。
    • mapPartitionsWithIndex(function)
      • 与mapPartitions类似,但是function的输入会多一个整形变量,表示分区编号,即Int, Iterator -> Iterator。
    • sample(withReplacement, fraction, seed)
      • 对RDD中的元素进行抽样,withReplacement(布尔值)表示是否放回,fraction(浮点型)表示抽样比例,seed表示随机数种子。
    • union(otherDataSet)
      • 合并两个RDD,不去重,要求类型完全一致。
    • intersection(otherDataSet)
    • distinct([numTasks])
      • 对RDD进行去重操作。
    • groupByKey([numTasks])
      • 对键值对形式的RDD进行按照键分组的操作,返回RDD形式为<key, Iterable>。
    • reduceByKey(function, [numTasks])
      • 对键值对形式的RDD进行按照键聚合的操作,function必须为V, V -> V的形式。
    • aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])
    • sortByKey([ascending], [numTasks])
      • 对键值对形式的RDD进行按照键排序的操作。
    • join(otherDataSet, [numTasks])
      • 对(K, V)和(K, W)进行join操作得到(K, (V, W)),其他连接操作包括leftOuterJoin、rightOuterJoin和fullOuterJoin。
    • cogroup(otherDataSet, [numTasks])
    • cartesian(otherDataSet)
    • pipe(command, [envVars])
    • coalesce(numPartitions)
    • repartition(numPartitions)
    • repartitionAndSortWithinPartitions(partitioner)
  • Action:返回结果到Driver程序中,常表示运算完成。
    • reduce(function)
      • 对RDD进行reduce操作最终返回一个值。
    • collect()
      • 将RDD返回到Driver程序,类型为Array。
    • count()
      • 返回RDD的元素数量。
    • first()
      • 返回RDD的第一个元素,等价于take(1)。
    • takeSample(withReplacement, num, [seed])
    • take(n)
      • 返回RDD的前n个元素。
    • takeOrdered(n, [ordering])
    • saveAsTextFile(path)
      • 将RDD转化为文本格式存储在path下。
    • saveAsSequenceFile(path)
      • 与saveAsTextFile类似,但是存储格式为SequenceFile。
    • saveAsObjectFile(path)
    • countByKey()
      • 对键值对形式的RDD返回key的数量。
    • foreach(function)
      • 对RDD中的每个成员执行function,对RDD没有影响。(常用于更新计数器之类的操作)

下面的代码是一段从文本文件读入然后统计每个单词数量并写出到文件的代码。

import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;

import scala.Tuple2;
import java.util.ArrayList;
import java.util.List;

public class WordCount 
    public static void main(String[] args) 
        String inputFile = "D:\\\\Data\\\\words.txt";
        String outputFile = "D:\\\\Data\\\\result";
        SparkSession session = SparkSession.builder().getOrCreate();
        JavaSparkContext sparkContext = JavaSparkContext.fromSparkContext(session.sparkContext());
        sparkContext.textFile(inputFile)
                // 对于每个段落,进行split操作,然后将单词与1配对,扁平化,最终形成JavaPairRDD
                .flatMapToPair(s -> 
                    String[] words = s.split("\\\\s");
                    List<Tuple2<String, Integer>> r = new ArrayList<>();
                    for (String word : words) 
                        r.add(new Tuple2<>(word, 1));
                    
                    return r.iterator();
                )
                // 对于每个元组对,按照key合并,后面的数字累加
                .reduceByKey(Integer::sum)
                // 将结果保存为文本文件
                .saveAsTextFile(outputFile);
        session.stop();
    

如果我想把结果输出显示在控制台,而不输出到文件。

import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;

import scala.Tuple2;
import java.util.ArrayList;
import java.util.List;

public class WordCount 
    public static void main(String[] args) 
        String inputFile = "D:\\\\Data\\\\words.txt";
        SparkSession session = SparkSession.builder().getOrCreate();
        JavaSparkContext sparkContext = JavaSparkContext.fromSparkContext(session.sparkContext());
        sparkContext.textFile(inputFile)
                // 对于每个段落,进行split操作,然后将单词与1配对,扁平化,最终形成JavaPairRDD
                .flatMapToPair(section -> 
                    String[] words = section.split("\\\\s");
                    List<Tuple2<String, Integer>> list = new ArrayList<>();
                    for (String word : words) 
                        list.add(new Tuple2<>(word, 1));
                    
                    return list.iterator();
                )
                // 对于每个元组对,按照key合并,后面的数字累加
                .reduceByKey(Integer::sum)
                // 将RDD返回到Driver程序
                .collect()
                // 对于RDD中的每个成员调用输出函数
                .forEach(WordCount::print1);
        session.stop();
    

    // 自定义的静态输出函数
    private static void print1(Tuple2<String, Integer> tu) 
        System.out.println(tu.toString());
    

上述RDD操作都是用了函数作为参数,事实上,Spark严重依赖于传递函数类型的参数,常见的RDD操作都需要提供一个函数作为操作方法的参数。

Spark运算的核心是RDD,而RDD的运算又是分布式的,因此虽然代码看上去都是在本地运行,但是实际上都不在本地计算。中心结点会将计算所依赖的全部变量、方法打包在一起序列化发送到各个结点,各个结点各自进行反序列化,然后进行运算,最终将运算结果发送到中心结点。在这种机制下,RDD操作是绝对不能嵌套调用的,只能进行顺序操作。

在各种运算场景下,有部分场景要求RDD是键值对的形式,即<key, value>的形式。这种形式的RDD为PairRDD,经常进行shuffle操作,比如按key进行分组或者聚合。

三、RDD持久化和共享变量

(一)RDD持久化

通常来说,如果一个中间结果RDD被多次利用,将其存入缓存可以极大程度上提高运算效率,在开发中RDD持久化通常是通过persist方法来实现的。

sparkContext.textFile(inputFile).persist(StorageLevel.MEMORY_ONLY());

这种持久化有多种类型,包括如下几种。

StorageLevel意义
MEMORY_ONLY仅持久化到内存
MEMORY_AND_DISK持久化到内存,不够时使用磁盘
MEMORY_ONLY_SER序列化数据后仅持久化到内存
MEMORY_AND_DISK_SER序列化数据后持久化到内存,不够时使用磁盘
DISK_ONLY仅持久化到磁盘

另外可以使用unpersist函数进行去持久化。

(二)共享变量

1.广播变量

在每个结点上都有一份缓存的变量,并且是只读的,没有修改其值的意义。

Broadcast<Integer> broadcastVar1 = sparkContext.broadcast(1);
Broadcast<ArrayList> broadcastVar2 = sparkContext.broadcast(new ArrayList<String>() 
            add("first");
            add("second");
            add("third");
        );

2.计数器

计数器只能增加,通常用来计数或者求和,Spark在Java中没有直接的计数器支持,需要自己重写。

对于Spark RDD的更进一步的学习,我们放在以后进行。

以上是关于Spark RDD 初探——Spark开发学习笔记的主要内容,如果未能解决你的问题,请参考以下文章

scala spark 机器学习初探

Spark RDD初探

Spark基础学习笔记16:创建RDD

Spark笔记:RDD基本操作(上)

Spark笔记:复杂RDD的API的理解(上)

Spark基础学习笔记17:掌握RDD算子