大数据(8e)RDD常用算子
Posted 小基基o_O
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了大数据(8e)RDD常用算子相关的知识,希望对你有一定的参考价值。
发出火花的目录
0、概述
- RDD算子:RDD里面实现的方法
- 包括转换算子和行动算子
1、转换算子(Transformations)
- 转换算子执行后,会返回新RDD,但不会立即执行计算(惰性加载)
方法名(参数) | 说明 |
---|---|
map(func) | 将源RDD的每个元素传给函数处理,返回新RDD |
filter(func) | 将源RDD的每个元素传给函数,保留返回true的元素 |
flatMap(func) | 先map,然后扁平化处理 |
mapPartitions(func) | 以分区为单位进行map |
mapPartitionsWithIndex(func) | 以分区为单位,并且带分区编号,进行map |
sample(withReplacement, fraction, seed) | 抽样(有放回或无放回) |
distinct([numPartitions])) | 去重 |
groupByKey([numPartitions]) | (K,V)型RDD专用,对同 K 的 V 聚到同一组,返回(K,Iterable <V>)型RDD |
reduceByKey(func, [numPartitions]) | (K,V)型RDD专用,对同 K 的 V 进行reduce |
sortByKey([ascending], [numPartitions]) | (K,V)型RDD专用,按 K 升序或降序 |
pipe(command, [envVars]) | RDD的每个分区元素 通过shell命令(如Bash脚本)写入 stdin,输出到 stdout 的行 将返回新的字符串型RDD |
coalesce(numPartitions) | 重新分区,shuffle机制默认false(常用于缩减分区) |
repartition(numPartitions) | 重新分区,shuffle机制默认true(常用于扩增分区) |
mapValues | 只对KV型RDD中的Value进行操作,场景:页面单跳率 |
combineByKey | 场景:分组求平均 |
1.1、分区内元素汇聚成数组:glom
val r0 = sc.makeRDD(Range(0, 5), 2)
val r1 = r0.glom()
r1.mapPartitionsWithIndex(
(index, items) => items.map(a => ("分区" + index, a.toList))
).collect.foreach(println)
/*
(分区0,List(0, 1))
(分区1,List(2, 3, 4))
*/
1.2、两个RDD运算
方法名(参数) | 说明 | 1D | 2D |
---|---|---|---|
union(otherDataset) | 两RDD拼接 | 1 | 1 |
intersection(otherDataset) | 交集(不是高中数学的那个逻辑) | 1 | 1 |
subtract(otherDataset) | 差集(不是高中数学书那个逻辑) | 1 | 1 |
zip | 拉链 | 1 | 1 |
cartesian(otherDataset) | 笛卡尔 | 1 | 1 |
subtractByKey | 按Key取差集 | 1 | |
join(otherDataset, [numPartitions]) | 内联 | 1 | |
fullOuterJoin | 全联 | 1 | |
leftOuterJoin | 左联 | 1 | |
rightOuterJoin | 右联 | 1 | |
cogroup(otherDataset, [numPartitions]) | 1 | ||
groupWith | 1 |
一维
val rdd0 = sc.makeRDD(Seq(1, 2, 2, 3))
val rdd1 = sc.makeRDD(Seq(4, 3, 2))
println(rdd0.union(rdd1).collect.toList)
//List(1, 2, 3, 4, 3, 2)
println(rdd0.intersection(rdd1).collect.toList)
//List(3, 2)
println(rdd0.subtract(rdd1).collect.toList)
//List(1)
println(rdd0.zip(rdd1).collect.toList)
//List((1,4), (2,3), (3,2))
println(rdd0.cartesian(rdd1).collect.toList)
//List((1,4), (1,3), (1,2), (2,4), (2,3), (2,2), (3,4), (3,3), (3,2))
二维
val rdd0 = sc.makeRDD(Seq((1, 1), (2, 2), (3, 3)))
val rdd1 = sc.makeRDD(Seq((4, -4), (3, -3), (2, -2)))
println(rdd0.join(rdd1).collect.toList)
//List((3,(3,-3)), (2,(2,-2)))
println(rdd0.fullOuterJoin(rdd1).collect.toList)
//List((4,(None,Some(-4))), (1,(Some(1),None)), (3,(Some(3),Some(-3))), (2,(Some(2),Some(-2))))
println(rdd0.leftOuterJoin(rdd1).collect.toList)
//List((1,(1,None)), (3,(3,Some(-3))), (2,(2,Some(-2))))
println(rdd0.rightOuterJoin(rdd1).collect.toList)
//List((4,(None,-4)), (3,(Some(3),-3)), (2,(Some(2),-2)))
println(rdd0.subtractByKey(rdd1).collect.toList)
//List((1,1))
println(rdd1.subtractByKey(rdd0).collect.toList)
//List((4,-4))
2、行动算子(Actions)
- 行动算子执行后,会触发计算
方法名(参数) | 说明 |
---|---|
reduce(func) | Aggregate the elements of the dataset using a function func (which takes two arguments and returns one). The function should be commutative and associative so that it can be computed correctly in parallel. |
count() | 统计元素个数 |
first() | 返回RDD中第1个元素 |
take(n) | 以数据形式返回RDD中前n个元素 |
takeSample(withReplacement, num, [seed]) | 抽样,返回num个元素的数组 |
takeOrdered(n, [ordering]) | 自定义排序后,返回前n个元素的数据 |
saveAsTextFile(path) | 将RDD元素以文本形式保存,可存本地或HDFS |
countByKey() | (K,V)型RDD专用,统计各 K 下 V 个数,返回(K,Int) |
foreach(func) | 对RDD每个元素传到函数执行 |
2.1、collect
- 以数组形式返回RDD中所有元素,数据量大情况下慎用!
collect
源码:调用SparkContext
对象sc
的runJob
方法
def collect(): Array[T] = withScope {
val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
Array.concat(results: _*)
}
2.2、reduce、take、sum、countByValue
val r1 = sc.makeRDD(Seq(9, 7, 5, 3, 1), numSlices = 2)
println(r1.reduce(_ + _)) // 25
println(r1.fold(25)(_ + _)) // 100
println(r1.aggregate(25)(_ + _, _ + _)) // 100
println(r1.first) // 9
println(r1.take(3).toList) // List(9, 7, 5)
println(r1.takeOrdered(3).toList) // List(1, 3, 5)
println(r1.takeSample(withReplacement = true, num = 4).toList)
// List(5, 9, 9, 5) 有重复
println(r1.takeSample(withReplacement = false, num = 4).toList)
// List(1, 5, 7, 3)
println(r1.sum) // 25.0
println(r1.count) // 5
println(r1.countByValue()) // Map(5 -> 1, 1 -> 1, 9 -> 1, 7 -> 1, 3 -> 1)
2.3、countByKey
val r2 = r1.map((2, _))
println(r2.countByKey()) // Map(2 -> 5)
r2.foreach(print) // (2,5)(2,3)(2,1)(2,9)(2,7)
以上是关于大数据(8e)RDD常用算子的主要内容,如果未能解决你的问题,请参考以下文章