大数据(8e)RDD常用算子

Posted 小基基o_O

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了大数据(8e)RDD常用算子相关的知识,希望对你有一定的参考价值。

0、概述

  • RDD算子:RDD里面实现的方法
  • 包括转换算子行动算子

1、转换算子(Transformations)

  • 转换算子执行后,会返回新RDD,但不会立即执行计算(惰性加载)
方法名(参数)说明
map(func)将源RDD的每个元素传给函数处理,返回新RDD
filter(func)将源RDD的每个元素传给函数,保留返回true的元素
flatMap(func)map,然后扁平化处理
mapPartitions(func)以分区为单位进行map
mapPartitionsWithIndex(func)以分区为单位,并且带分区编号,进行map
sample(withReplacement, fraction, seed)抽样(有放回或无放回)
distinct([numPartitions]))去重
groupByKey([numPartitions])(K,V)型RDD专用,对同 K 的 V 聚到同一组,返回(K,Iterable <V>)型RDD
reduceByKey(func, [numPartitions])(K,V)型RDD专用,对同 K 的 V 进行reduce
sortByKey([ascending], [numPartitions])(K,V)型RDD专用,按 K 升序或降序
pipe(command, [envVars])RDD的每个分区元素 通过shell命令(如Bash脚本)写入 stdin,输出到 stdout 的行 将返回新的字符串型RDD
coalesce(numPartitions)重新分区,shuffle机制默认false(常用于缩减分区)
repartition(numPartitions)重新分区,shuffle机制默认true(常用于扩增分区)
mapValues只对KV型RDD中的Value进行操作,场景:页面单跳率
combineByKey场景:分组求平均

1.1、分区内元素汇聚成数组:glom

val r0 = sc.makeRDD(Range(0, 5), 2)
val r1 = r0.glom()
r1.mapPartitionsWithIndex(
  (index, items) => items.map(a => ("分区" + index, a.toList))
).collect.foreach(println)
/*
(分区0,List(0, 1))
(分区1,List(2, 3, 4))
*/

1.2、两个RDD运算

方法名(参数)说明1D2D
union(otherDataset)两RDD拼接11
intersection(otherDataset)交集(不是高中数学的那个逻辑)11
subtract(otherDataset)差集(不是高中数学书那个逻辑)11
zip拉链11
cartesian(otherDataset)笛卡尔11
subtractByKey按Key取差集1
join(otherDataset, [numPartitions])内联1
fullOuterJoin全联1
leftOuterJoin左联1
rightOuterJoin右联1
cogroup(otherDataset, [numPartitions])1
groupWith1

一维

val rdd0 = sc.makeRDD(Seq(1, 2, 2, 3))
val rdd1 = sc.makeRDD(Seq(4, 3, 2))
println(rdd0.union(rdd1).collect.toList)
//List(1, 2, 3, 4, 3, 2)
println(rdd0.intersection(rdd1).collect.toList)
//List(3, 2)
println(rdd0.subtract(rdd1).collect.toList)
//List(1)
println(rdd0.zip(rdd1).collect.toList)
//List((1,4), (2,3), (3,2))
println(rdd0.cartesian(rdd1).collect.toList)
//List((1,4), (1,3), (1,2), (2,4), (2,3), (2,2), (3,4), (3,3), (3,2))

二维

val rdd0 = sc.makeRDD(Seq((1, 1), (2, 2), (3, 3)))
val rdd1 = sc.makeRDD(Seq((4, -4), (3, -3), (2, -2)))
println(rdd0.join(rdd1).collect.toList)
//List((3,(3,-3)), (2,(2,-2)))
println(rdd0.fullOuterJoin(rdd1).collect.toList)
//List((4,(None,Some(-4))), (1,(Some(1),None)), (3,(Some(3),Some(-3))), (2,(Some(2),Some(-2))))
println(rdd0.leftOuterJoin(rdd1).collect.toList)
//List((1,(1,None)), (3,(3,Some(-3))), (2,(2,Some(-2))))
println(rdd0.rightOuterJoin(rdd1).collect.toList)
//List((4,(None,-4)), (3,(Some(3),-3)), (2,(Some(2),-2)))
println(rdd0.subtractByKey(rdd1).collect.toList)
//List((1,1))
println(rdd1.subtractByKey(rdd0).collect.toList)
//List((4,-4))

2、行动算子(Actions)

  • 行动算子执行后,会触发计算
方法名(参数)说明
reduce(func)Aggregate the elements of the dataset using a function func (which takes two arguments and returns one). The function should be commutative and associative so that it can be computed correctly in parallel.
count()统计元素个数
first()返回RDD中第1个元素
take(n)以数据形式返回RDD中前n个元素
takeSample(withReplacement, num, [seed])抽样,返回num个元素的数组
takeOrdered(n, [ordering])自定义排序后,返回前n个元素的数据
saveAsTextFile(path)将RDD元素以文本形式保存,可存本地或HDFS
countByKey()(K,V)型RDD专用,统计各 K 下 V 个数,返回(K,Int)
foreach(func)对RDD每个元素传到函数执行

2.1、collect

  • 以数组形式返回RDD中所有元素,数据量大情况下慎用!
  • collect源码:调用SparkContext对象scrunJob方法
def collect(): Array[T] = withScope {
  val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
  Array.concat(results: _*)
}

2.2、reduce、take、sum、countByValue

val r1 = sc.makeRDD(Seq(9, 7, 5, 3, 1), numSlices = 2)
println(r1.reduce(_ + _))  // 25
println(r1.fold(25)(_ + _))  // 100
println(r1.aggregate(25)(_ + _, _ + _))  // 100
println(r1.first)  // 9
println(r1.take(3).toList)  // List(9, 7, 5)
println(r1.takeOrdered(3).toList)  // List(1, 3, 5)
println(r1.takeSample(withReplacement = true, num = 4).toList)
// List(5, 9, 9, 5) 有重复
println(r1.takeSample(withReplacement = false, num = 4).toList)
// List(1, 5, 7, 3)
println(r1.sum)  // 25.0
println(r1.count)  // 5
println(r1.countByValue())  // Map(5 -> 1, 1 -> 1, 9 -> 1, 7 -> 1, 3 -> 1)

2.3、countByKey

val r2 = r1.map((2, _))
println(r2.countByKey())  // Map(2 -> 5)
r2.foreach(print)  // (2,5)(2,3)(2,1)(2,9)(2,7)

以上是关于大数据(8e)RDD常用算子的主要内容,如果未能解决你的问题,请参考以下文章

大数据之Spark:Spark调优之RDD算子调优

大数据-spark理论算子,shuffle优化

Spark——RDD算子

大数据:Spark Core用LogQuery的例子来说明Executor是如何运算RDD的算子

大数据之Spark:Spark Core

大数据之Spark:Spark调优之RDD算子调优