Spark API 详解/大白话解释 之 groupBy、groupByKey

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark API 详解/大白话解释 之 groupBy、groupByKey相关的知识,希望对你有一定的参考价值。

参考技术A groupByKey实例分析Spark Hash Shuffle - 郭同jet · 静心 - 博客频道 - CSDN.NET
http://blog.csdn.net/guotong1988/article/details/49465847

Spark API 详解/大白话解释 之 groupBy、groupByKey - 郭同jet · 静心 - 博客频道 - CSDN.NET
http://blog.csdn.net/guotong1988/article/details/50556871

groupBy(function) function返回key,传入的RDD的各个元素根据这个key进行分组
val a = sc.parallelize(1 to 9, 3)a.groupBy(x => if (x % 2 == 0) "even" else "odd" ).collect//分成两组/ 结果 Array((even,ArrayBuffer(2, 4, 6, 8)),(odd,ArrayBuffer(1, 3, 5, 7, 9))) /
1
2
3
4
5
6
7
8

1
2
3
4
5
6
7
8

val a = sc.parallelize(1 to 9, 3)def myfunc(a: Int) : Int = a % 2//分成两组a.groupBy(myfunc).collect
1
2
3
4
5
6

1
2
3
4
5
6

/* 结果 Array( (0,ArrayBuffer(2, 4, 6, 8)), (1,ArrayBuffer(1, 3, 5, 7, 9)) ) */
groupByKey( )
val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "spider", "eagle"), 2)val b = a.keyBy(_.length)//给value加上key,key为对应string的长度b.groupByKey.collect//结果 Array((4,ArrayBuffer(lion)), (6,ArrayBuffer(spider)), (3,ArrayBuffer(dog, cat)), (5,ArrayBuffer(tiger, eagle)))
1
2
3
4

1
2
3
4


0

大数据之Spark:Spark Core

目录

1. RDD 详解

1) 为什么要有 RDD?

许多迭代式算法(比如机器学习、图算法等)和交互式数据挖掘中,不同计算阶段之间会重用中间结果,即一个阶段的输出结果会作为下一个阶段的输入。

MapReduce 框架采用非循环式的数据流模型,把中间结果写入到 HDFS 中,带来了大量的数据复制、磁盘 IO 和序列化开销。且这些框架只能支持一些特定的计算模式(map/reduce),并没有提供一种通用的数据抽象。

RDD 提供了一个抽象的数据模型,让我们不必担心底层数据的分布式特性,只需将具体的应用逻辑表达为一系列转换操作(函数),不同 RDD 之间的转换操作之间还可以形成依赖关系,进而实现管道化,从而避免了中间结果的存储,大大降低了数据复制、磁盘 IO 和序列化开销,并且还提供了更多的 API(map/reduec/filter/groupBy…)。

2) RDD 是什么?

RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是 Spark 中最基本的数据抽象,代表一个不可变、可分区、里面的元素可并行计算的集合。单词拆解:

Resilient :它是弹性的,RDD 里面的中的数据可以保存在内存中或者磁盘里面;

Distributed :它里面的元素是分布式存储的,可以用于分布式计算;

Dataset: 它是一个集合,可以存放很多元素

3) RDD 主要属性

进入 RDD 的源码中看下:

在源码中可以看到有对 RDD 介绍的注释,我们来翻译下:

A list of partitions :一组分片(Partition)/一个分区(Partition)列表,即数据集的基本组成单位。对于 RDD 来说,每个分片都会被一个计算任务处理,分片数决定并行度。用户可以在创建 RDD 时指定 RDD 的分片个数,如果没有指定,那么就会采用默认值。

A function for computing each split :一个函数会被作用在每一个分区。Spark 中 RDD 的计算是以分片为单位的,compute 函数会被作用到每个分区上。

A list of dependencies on other RDDs :一个 RDD 会依赖于其他多个 RDD。RDD 的每次转换都会生成一个新的 RDD,所以 RDD 之间就会形成类似于流水线一样的前后依赖关系。在部分分区数据丢失时,Spark 可以通过这个依赖关系重新计算丢失的分区数据,而不是对 RDD 的所有分区进行重新计算。(Spark 的容错机制)

Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned):可选项,对于 KV 类型的 RDD 会有一个 Partitioner,即 RDD 的分区函数,默认为 HashPartitioner。

Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file):可选项,一个列表,存储存取每个 Partition 的优先位置(preferred location)。对于一个 HDFS 文件来说,这个列表保存的就是每个 Partition 所在的块的位置。按照"移动数据不如移动计算"的理念,Spark 在进行任务调度的时候,会尽可能选择那些存有数据的 worker 节点来进行任务计算。

总结

RDD 是一个数据集的表示,不仅表示了数据集,还表示了这个数据集从哪来,如何计算,主要属性包括:

1.分区列表
2.计算函数
3.依赖关系
4.分区函数(默认是 hash)
5.最佳位置
分区列表、分区函数、最佳位置,这三个属性其实说的就是数据集在哪,在哪计算更合适,如何分区;
计算函数、依赖关系,这两个属性其实说的是数据集怎么来的。

2. RDD-API

1) RDD 的创建方式

1、由外部存储系统的数据集创建,包括本地的文件系统,还有所有 Hadoop 支持的数据集,比如 HDFS、Cassandra、HBase 等:

val rdd1 = sc.textFile("hdfs://node1:8020/wordcount/input/words.txt")

2、通过已有的 RDD 经过算子转换生成新的 RDD:

val rdd2=rdd1.flatMap(_.split(" "))

3、由一个已经存在的 Scala 集合创建:

val rdd3 = sc.parallelize(Array(1,2,3,4,5,6,7,8))
val rdd4 = sc.makeRDD(List(1,2,3,4,5,6,7,8))

makeRDD 方法底层调用了 parallelize 方法:

2) RDD 的算子分类

RDD 的算子分为两类:

1、Transformation转换操作:返回一个新的 RDD

2、Action动作操作:返回值不是 RDD(无返回值或返回其他的)

❣️ 注意:
1、RDD 不实际存储真正要计算的数据,而是记录了数据的位置在哪里,数据的转换关系(调用了什么方法,传入什么函数)。
2、RDD 中的所有转换都是惰性求值/延迟执行的,也就是说并不会直接计算。只有当发生一个要求返回结果给 Driver 的 Action 动作时,这些转换才会真正运行。
3、之所以使用惰性求值/延迟执行,是因为这样可以在 Action 时对 RDD 操作形成 DAG 有向无环图进行 Stage 的划分和并行优化,这种设计让 Spark 更加有效率地运行。

3) Transformation 转换算子

转换算子含义
map(func)返回一个新的 RDD,该 RDD 由每一个输入元素经过 func 函数转换后组成
filter(func)返回一个新的 RDD,该 RDD 由经过 func 函数计算后返回值为 true 的输入元素组成
flatMap(func)类似于 map,但是每一个输入元素可以被映射为 0 或多个输出元素(所以 func 应该返回一个序列,而不是单一元素)
mapPartitions(func)类似于 map,但独立地在 RDD 的每一个分片上运行,因此在类型为 T 的 RDD 上运行时,func 的函数类型必须是 Iterator[T] => Iterator[U]
mapPartitionsWithIndex(func)类似于 mapPartitions,但 func 带有一个整数参数表示分片的索引值,因此在类型为 T 的 RDD 上运行时,func 的函数类型必须是(Int, Interator[T]) => Iterator[U]
sample(withReplacement, fraction, seed)根据 fraction 指定的比例对数据进行采样,可以选择是否使用随机数进行替换,seed 用于指定随机数生成器种子
union(otherDataset)对源 RDD 和参数 RDD 求并集后返回一个新的 RDD
intersection(otherDataset)对源 RDD 和参数 RDD 求交集后返回一个新的 RDD
distinct([numTasks]))对源 RDD 进行去重后返回一个新的 RDD
groupByKey([numTasks])在一个(K,V)的 RDD 上调用,返回一个(K, Iterator[V])的 RDD
reduceByKey(func, [numTasks])在一个(K,V)的 RDD 上调用,返回一个(K,V)的 RDD,使用指定的 reduce 函数,将相同 key 的值聚合到一起,与 groupByKey 类似,reduce 任务的个数可以通过第二个可选的参数来设置
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])对 PairRDD 中相同的 Key 值进行聚合操作,在聚合过程中同样使用了一个中立的初始值。和 aggregate 函数类似,aggregateByKey 返回值的类型不需要和 RDD 中 value 的类型一致
sortByKey([ascending], [numTasks])在一个(K,V)的 RDD 上调用,K 必须实现 Ordered 接口,返回一个按照 key 进行排序的(K,V)的 RDD
sortBy(func,[ascending], [numTasks])与 sortByKey 类似,但是更灵活
join(otherDataset, [numTasks])在类型为(K,V)和(K,W)的 RDD 上调用,返回一个相同 key 对应的所有元素对在一起的(K,(V,W))的 RDD
cogroup(otherDataset, [numTasks])在类型为(K,V)和(K,W)的 RDD 上调用,返回一个(K,(Iterable,Iterable))类型的 RDD
cartesian(otherDataset)笛卡尔积
pipe(command, [envVars])对 rdd 进行管道操作
coalesce(numPartitions)减少 RDD 的分区数到指定值。在过滤大量数据之后,可以执行此操作
repartition(numPartitions)重新给 RDD 分区

其中map、filter、flatMap、mapPartitions、mapPartitionsWithIndex、union、distinct、groupByKey、reduceByKey、sortByKey、join、coalesce、repartition算子应用较为普遍,需特别留意;

4) Action 动作算子

动作算子含义
reduce(func)通过 func 函数聚集 RDD 中的所有元素,这个功能必须是可交换且可并联的
collect()在驱动程序中,以数组的形式返回数据集的所有元素
count()返回 RDD 的元素个数
first()返回 RDD 的第一个元素(类似于 take(1))
take(n)返回一个由数据集的前 n 个元素组成的数组
takeSample(withReplacement,num, [seed])返回一个数组,该数组由从数据集中随机采样的 num 个元素组成,可以选择是否用随机数替换不足的部分,seed 用于指定随机数生成器种子
takeOrdered(n, [ordering])返回自然顺序或者自定义顺序的前 n 个元素
saveAsTextFile(path)将数据集的元素以 textfile 的形式保存到 HDFS 文件系统或者其他支持的文件系统,对于每个元素,Spark 将会调用 toString 方法,将它装换为文件中的文本
saveAsSequenceFile(path)将数据集中的元素以 Hadoop sequencefile 的格式保存到指定的目录下,可以使 HDFS 或者其他 Hadoop 支持的文件系统
saveAsObjectFile(path)将数据集的元素,以 Java 序列化的方式保存到指定的目录下
countByKey()针对(K,V)类型的 RDD,返回一个(K,Int)的 map,表示每一个 key 对应的元素个数
foreach(func)在数据集的每一个元素上,运行函数 func 进行更新
foreachPartition(func)在数据集的每一个分区上,运行函数 func

其中saveAsTextFile、saveAsSequenceFile、countByKey、foreachPartition、collect、count、take、reduce算子应用较为普遍,需特别留意;

4) RDD 算子练习

需求:
给定一个键值对 RDD:

val rdd = sc.parallelize(Array(("spark",2),("hadoop",6),("hadoop",4),("spark",6)))

key 表示图书名称,value 表示某天图书销量

请计算每个键对应的平均值,也就是计算每种图书的每天平均销量。

最终结果:(“spark”,4),(“hadoop”,5)。
答案 1:

val rdd = sc.parallelize(Array(("spark",2),("hadoop",6),("hadoop",4),("spark",6)))
val rdd2 = rdd.groupByKey()
rdd2.collect
//Array[(String, Iterable[Int])] = Array((spark,CompactBuffer(2, 6)), (hadoop,CompactBuffer(6, 4)))
rdd2.mapValues(v=>v.sum/v.size).collect
Array[(String, Int)] = Array((spark,4), (hadoop,5))

答案 2:

val rdd = sc.parallelize(Array(("spark",2),("hadoop",6),("hadoop",4),("spark",6)))
val rdd2 = rdd.groupByKey()
rdd2.collect
//Array[(String, Iterable[Int])] = Array((spark,CompactBuffer(2, 6)), (hadoop,CompactBuffer(6, 4)))

val rdd3 = rdd2.map(t=>(t._1,t._2.sum /t._2.size))
rdd3.collect
//Array[(String, Int)] = Array((spark,4), (hadoop,5))

3. RDD 的持久化/缓存

在实际开发中某些 RDD 的计算或转换可能会比较耗费时间,如果这些 RDD 后续还会频繁的被使用到,那么可以将这些 RDD 进行持久化/缓存,这样下次再使用到的时候就不用再重新计算了,提高了程序运行的效率。

val rdd1 = sc.textFile("hdfs://node01:8020/words.txt")
val rdd2 = rdd1.flatMap(x=>x.split(" ")).map((_,1)).reduceByKey(_+_)
rdd2.cache //缓存/持久化
rdd2.sortBy(_._2,false).collect//触发action,会去读取HDFS的文件,rdd2会真正执行持久化
rdd2.sortBy(_._2,false).collect//触发action,会去读缓存中的数据,执行速度会比之前快,因为rdd2已经持久化到内存中了

持久化/缓存 API 详解

1、persist 方法和 cache 方法
RDD 通过 persist 或 cache 方法可以将前面的计算结果缓存,但是并不是这两个方法被调用时立即缓存,而是触发后面的 action 时,该 RDD 将会被缓存在计算节点的内存中,并供后面重用。
通过查看 RDD 的源码发现 cache 最终也是调用了 persist 无参方法(默认存储只存在内存中):

2、存储级别
默认的存储级别都是仅在内存存储一份,Spark 的存储级别还有好多种,存储级别在 object StorageLevel 中定义的。

持久化级别说明
MORY_ONLY(默认)将 RDD 以非序列化的 Java 对象存储在 JVM 中。如果没有足够的内存存储 RDD,则某些分区将不会被缓存,每次需要时都会重新计算。这是默认级别
MORY_AND_DISK(开发中可以使用这个)将 RDD 以非序列化的 Java 对象存储在 JVM 中。如果数据在内存中放不下,则溢写到磁盘上.需要时则会从磁盘上读取
MEMORY_ONLY_SER (Java and Scala)将 RDD 以序列化的 Java 对象(每个分区一个字节数组)的方式存储.这通常比非序列化对象(deserialized objects)更具空间效率,特别是在使用快速序列化的情况下,但是这种方式读取数据会消耗更多的 CPU
MEMORY_AND_DISK_SER (Java and Scala)与 MEMORY_ONLY_SER 类似,但如果数据在内存中放不下,则溢写到磁盘上,而不是每次需要重新计算它们
DISK_ONLY将 RDD 分区存储在磁盘上
MEMORY_ONLY_2, MEMORY_AND_DISK_2 等与上面的储存级别相同,只不过将持久化数据存为两份,备份每个分区存储在两个集群节点上
OFF_HEAP(实验中)与 MEMORY_ONLY_SER 类似,但将数据存储在堆外内存中。(即不是直接存储在 JVM 内存中)

总结:

1、RDD 持久化/缓存的目的是为了提高后续操作的速度
2、缓存的级别有很多,默认只存在内存中,开发中使用 memory_and_disk
3、只有执行 action 操作的时候才会真正将 RDD 数据进行持久化/缓存
4、实际开发中如果某一个 RDD 后续会被频繁的使用,可以将该 RDD 进行持久化/缓存

以上是关于Spark API 详解/大白话解释 之 groupBy、groupByKey的主要内容,如果未能解决你的问题,请参考以下文章

spark2.x由浅入深深到底系列六之RDD java api详解四

Spark API 之 combineByKey

Spark Structured Streaming框架之窗口管理详解

spark 内存管理机制

新版白话空间统计:莫兰指数之计算详解

Spark RDD API详解(转)