Spark - RDD 算子介绍及使用 ScalaJavaPython 三种语言演示

Posted 小毕超

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark - RDD 算子介绍及使用 ScalaJavaPython 三种语言演示相关的知识,希望对你有一定的参考价值。

一、RDD 的起源

RDD 出现之前, 当时 MapReduce 是比较主流的, 而 MapReduce 如何执行流程如下:

多个 MapReduce 任务之间只能通过磁盘来进行传递数据,很明显的效率低下,再来看 RDD 的处理方式:


整个过程是共享内存的, 而不需要将中间结果存放在分布式文件系统中,这种方式可以在保证容错的前提下, 提供更多的灵活, 更快的执行速度。

二、RDD 的特点

RDD 不仅是数据集, 也是编程模型,提供了上层 API, 同时 RDDAPIjdk8stream 流对集合运算的 API 非常类似,同样也都是各算子,如下:

textFile.filter(StringUtils.isNotBlank) //过滤空内容
  .flatMap(_.split(" ")) //根据空格拆分
  .map((_, 1)) // 构建新的返回
  .foreach(s => println(s._1 + "  " + s._2)) //循环

RDD 的算子大致分为两类:

  • Transformation 转换操作, 例如 map flatMap filter 等。
  • Action 动作操作, 例如 reduce collect show

注意:执行 RDD 的时候会进行惰性求值,执行到转换操作的时候,并不会立刻执行,直到遇见了 Action 操作,才会触发真正的执行。

创建 RDD

RDD 有三种创建方式,可以通过本地集合直接创建,也可以通过读取外部数据集来创建,还可以通过其它的 RDD 衍生而来:

首先声明 SparkContext

  • scala:
val conf = new SparkConf().setAppName("spark").setMaster("local[*]")
val sc = new SparkContext(conf)
  • java
SparkConf conf = new SparkConf().setAppName("spark").setMaster("local[*]");
JavaSparkContext sc = new JavaSparkContext(conf);
  • python
from pyspark import SparkConf, SparkContext, StorageLevel
import findspark

if __name__ == '__main__':
    findspark.init()
    conf = SparkConf().setAppName('spark').setMaster('local[*]')
    sc = SparkContext(conf=conf)

1. 通过集合创建

  • scala
val rdd1 = sc.parallelize(Seq("abc", "abc", "fff dd", "ee,pp", ""))
//指定分区
val rdd2 = sc.parallelize(Seq("abc", "abc", "fff dd", "ee,pp", ""), 5)
  • java
JavaRDD<String> rdd1 = sc.parallelize(Arrays.asList("abc", "abc", "fff dd", "ee,pp", ""));
//指定分区
JavaRDD<String> rdd2 = sc.parallelize(Arrays.asList("abc", "abc", "fff dd", "ee,pp", ""), 5);
  • python
rdd1 = sc.parallelize(["abc", "abc", "fff dd", "ee,pp", ""])
#
rdd2 = sc.parallelize(["abc", "abc", "fff dd", "ee,pp", ""], 5)

2. 通过文件创建

  • scala
 //读取本地文件
 val rdd3 = sc.textFile("D:/test/spark/input3/words.txt")
 //读取本地文件,指定分区
 val rdd4 = sc.textFile("D:/test/spark/input3/words.txt", 5)
 //读取 HDFS 文件
 val rdd5 = sc.textFile("hdfs://test/spark/input3/words.txt")
 //读取文件同时拿到文件名
 val rdd6 = sc.textFile("hdfs://test/spark/input3/")
  • java
//读取本地文件
JavaRDD<String> rdd3 = sc.textFile("D:/test/spark/input3/words.txt");
//读取本地文件,指定分区
JavaRDD<String> rdd4 = sc.textFile("D:/test/spark/input3/words.txt", 5);
//读取 HDFS 文件
JavaRDD<String> rdd5 = sc.textFile("hdfs://test/spark/input3/words.txt");
//读取文件同时拿到文件名
JavaRDD<String> rdd6 = sc.textFile("hdfs://test/spark/input3/");
  • python
# 读取本地文件
rdd3 = sc.textFile("D:/test/spark/input3/words.txt")
#读取本地文件,指定分区
rdd4 = sc.textFile("D:/test/spark/input3/words.txt", 5)
#读取 HDFS 文件
rdd5 = sc.textFile("hdfs://test/spark/input3/words.txt")
#读取文件同时拿到文件名
rdd6 = sc.textFile("hdfs://test/spark/input3/")

下面对相关常用算子进行演示。

三、Transformations 算子

1. map

RDD 中的数据 一对一 的转为另一种形式:

例如:

  • scala:
val num = sc.parallelize(Seq(1, 2, 3, 4, 5))
println(
  num.map(_+1).collect().toList
)
  • java:
JavaRDD<Integer> num = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
System.out.println(
       num.map(i -> i + 1).collect()
);
  • python:
num = sc.parallelize((1, 2, 3, 4, 5))
print(
    num.map(lambda i:i+1).collect()
)

2. flatMap

Map 算子类似,但是 FlatMap 是一对多,并都转化为一维数据:

例如:

  • scala:
val text = sc.parallelize(Seq("abc def", "hello word", "dfg,okh", "he,word"))
println(
  text.flatMap(_.split(" ")).flatMap(_.split(",")).collect().toList
)
  • java:
JavaRDD<String> text = sc.parallelize(Arrays.asList("abc def", "hello word", "dfg,okh", "he,word"));
System.out.println(
        text.flatMap(s ->Arrays.asList(s.split(" ")).iterator())
                .flatMap(s ->Arrays.asList(s.split(",")).iterator())
                .collect()
);
  • python:
text = sc.parallelize(("abc def", "hello word", "dfg,okh", "he,word"))
print(
    text.flatMap(lambda s: s.split(" ")).flatMap(lambda s: s.split(",")).collect()
)

3. filter

过滤掉不需要的内容:

例如:

  • scala:
val text = sc.parallelize(Seq("hello", "hello", "word", "word"))
println(
  text.filter(_.equals("hello")).collect().toList
)
  • java:
JavaRDD<String> text = sc.parallelize(Arrays.asList("hello", "hello", "word", "word"));
System.out.println(
        text.filter(s -> Objects.equals(s,"hello"))
                .collect()
);
  • python:
text = sc.parallelize(("hello", "hello", "word", "word"))
print(
    text.filter(lambda s: s == 'hello').collect()
)

4. mapPartitions

map 类似,针对整个分区的数据转换,拿到的是每个分区的集合:

例如:

  • scala:
val text = sc.parallelize(Seq("hello", "hello", "word", "word"), 2)
println(
  text.mapPartitions(iter => 
    iter.map(_ + "333")
  ).collect().toList
)
  • java:
JavaRDD<String> text = sc.parallelize(Arrays.asList("hello", "hello", "word", "word"), 2);
System.out.println(
        text.mapPartitions(iter -> 
            List<String> list = new ArrayList<>();
            iter.forEachRemaining(s -> list.add(s+"333"));
            return list.iterator();
        ).collect()
);
  • python:
 text = sc.parallelize(("hello", "hello", "word", "word"), 2)
 
 def partition(par):
     tmpArr = []
     for s in par:
         tmpArr.append(s + "333")
     return tmpArr

 print(
     text.mapPartitions(partition).collect()
 )

5. mapPartitionsWithIndex

mapPartitions 类似, 只是在函数中增加了分区的 Index

例如:

  • scala:
val text = sc.parallelize(Seq("hello", "hello", "word", "word"), 2)
println(
  text.mapPartitionsWithIndex((index, iter) => 
    println("当前分区" + index)
    iter.map(_ + "333")
  , true).collect().toList
)
  • java:
JavaRDD<String> text = sc.parallelize(Arrays.asList("hello", "hello", "word", "word"), 2);
System.out.println(
       text.mapPartitionsWithIndex((index, iter) -> 
           System.out.println("当前分区" + index);
           List<String> list = new ArrayList<>();
           iter.forEachRemaining(s -> list.add(s + "333"));
           return list.iterator();
       , true).collect()
);
  • python:
text = sc.parallelize(("hello", "hello", "word", "word"), 2)

def partition(index, par):
    print("当前分区" + str(index))
    tmpArr = []
    for s in par:
        tmpArr.append(s + "333")
    return tmpArr

print(
    text.mapPartitionsWithIndex(partition).collect()
)

6. mapValues

只能作用于 Key-Value 型数据, 和 Map 类似, 也是使用函数按照转换数据, 不同点是 MapValues 只转换 Key-Value 中的 Value

例如:

  • scala:
val text = sc.parallelize(Seq("abc", "bbb", "ccc", "dd"))
println(
  text.map((_, "v" + _))
    .mapValues(_ + "66")
    .collect().toList
)
  • java:
JavaRDD<String> text = sc.parallelize(Arrays.asList("abc", "bbb", "ccc", "dd"));
System.out.println(
       text.mapToPair(s -> new Tuple2<>(s, "v" + s))
               .mapValues(v -> v + "66").collect()
);
  • python:
text = sc.parallelize(("abc", "bbb", "ccc", "dd"))
print(
    text.map(lambda s: (s, "v" + s)).mapValues(lambda v: v + "66").collect()
)

7. sample

可以从一个数据集中抽样出来一部分, 常用作于减小数据集以保证运行速度, 并且尽可能少规律的损失:

第一个参数为withReplacement, 意为是否取样以后是否还放回原数据集供下次使用, 简单的说,如果这个参数的值为 true, 则抽样出来的数据集中可能会有重复。

第二个参数为fraction, 意为抽样的比例。

第三个参数为seed, 随机数种子, 用于 Sample 内部随机生成下标,一般不指定,使用默认值。

例如:

  • scala:
val num = sc.parallelize(Seq(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
println(
  num.sample(true,0.6,2)
    .collect().toList
)
  • java:
JavaRDD<Integer> num = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10));
System.out.println(
    num.sample(true, 0.6, 2).collect()
);
  • python:
num = sc.parallelize((1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
print(
    num.sample(True, 0.6, 2).collect()
)

8. union

两个数据并集,类似于数据库的 union

例如:

  • scala:
val text1 = sc.parallelize(Seq("aa", "bb"))
val text2 = sc.parallelize(Seq("cc", "dd"))
println(
  text1.union(text2).collect().toList
)
  • java:
JavaRDD<String> text1 = sc.parallelize(Arrays.asList("aa", "bb"));
JavaRDD<String> text2 = sc以上是关于Spark - RDD 算子介绍及使用 ScalaJavaPython 三种语言演示的主要内容,如果未能解决你的问题,请参考以下文章

Spark-Core RDD行动算子

Spark-Core RDD转换算子-双Value型交互

Spark-RDD算子

Spark RDD算子案例:两种方式计算学生总分

spark算子介绍

spark算子:partitionBy对数据进行分区