Spark---RDD常用方法

Posted Shall潇

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark---RDD常用方法相关的知识,希望对你有一定的参考价值。

前言:RDD算子分为Transformation(转换算子)、Action(行动算子)
转换算子—lazy—只记录操作,只有遇到行动算子才会真正的计算
行动算子—non-lazy
本质上动作算子通过SparkContext执行提交作业操作,触发RDD DAG(有向无环图)的执行

val rdd1 = sc.makeRDD(1 to 10)
val rdd2 = sc.parallelize(5 to 15)
val rdd3 = sc.makeRDD(Array(2,4,4,9,10,-2))
val rdd4 = sc.makeRDD(Array("hello","scala","python","helloWorld","javascript"))

++

两个RDD的组合,和Union作用一样

aggregate

和Scala中的一样

rdd1.aggregate(0)(_+_,_+_).collect

cartesian — 笛卡尔

rdd1.cartesian(rdd2).foreach(println)

coalesce — 分区

小—>大 :true; 大—>小 :false

rdd1.coalesce(3,true)

collect

val c = sc.parallelize(List("Gnu", "Cat", "Rat", "Dog", "Gnu", "Rat"), 2)
c.collect
res1: Array[String] = Array(Gnu, Cat, Rat, Dog, Gnu, Rat)

checkpoint — 检查点

checkpoint 检查点机制,假设你在迭代1000次的计算中在第999次失败了,然后你没有checkpoint,你只能重新开始恢复了,如果恰好你在第998次迭代的时候你做了一个checkpoint,那么你只需要恢复第998次产生的rdd,然后再执行2次迭代完成总共1000的迭代,这样效率就很高,比较适用于迭代计算非常复杂的情况,也就是恢复计算代价非常高的情况,适当进行checkpoint会有很大的好处。

sc.setCheckpointDir("hdfs://192.168.XXX.71:9000/wc")
//检查点目录必须存在
val a = sc.parallelize(1 to 5)
a.checkpoint
//将其结果检查点更新
a.collect
res1: Long = 5

context

和SparkContext一样返回创建RDD时候的SparkContext

scala> rdd3.context
res2: org.apache.spark.SparkContext = org.apache.spark.SparkContext@41aa99fc
scala> rdd3.sparkContext
res3: org.apache.spark.SparkContext = org.apache.spark.SparkContext@41aa99fc

sparkContext

同上

count — 统计个数

print(rdd1.count)

countByValue — 统计一个key对应的多个value个数

rdd3.countByValue
res101: scala.collection.Map[Int,Long] = Map(5 -> 1, 1 -> 1, 6 -> 1, 2 -> 1, 3 -> 1, 4 -> 1)

dependencies—依赖

查看当前RDD是否有依赖,可以帮助恢复重建RDD
1、没有依赖

rdd3.dependencies
res103: Seq[org.apache.spark.Dependency[_]] = List()

2、有一个依赖

rdd3.map(_+2)
res104: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[75] at map at <console>:26
res104.dependencies
res105: Seq[org.apache.spark.Dependency[_]] = List(org.apache.spark.OneToOneDependency@27917a6b)

distinct — 去重

rdd3.distinct().foreach(println)

filter — 过滤

rdd3.filter(_%2==0).foreach(println)

first — 第一个元素

print(rdd3.first)

flatMap — 扁平化 同Scala

操作类似map,但是其粒度更小

fold — 同Scala

有初始值,只有一个函数

foreach — 遍历

foreachAsync — 异步遍历

foreachPartition — 一个分区执行一次遍历

foreachPartitionAsync —同理异步

getNumPartitions — 获取分区数

print(rdd3.getNumPartitions)

partitions.size — 获取分区数

同上

getStorageLevel — 存储等级

rdd3.getStorageLevel
res102: org.apache.spark.storage.StorageLevel = StorageLevel(1 replicas)

glom — 合并

将RDD的每一个分区作为一个单独的包装

val rg = rdd3.groupBy(_%2)
rg.glom.foreach(x=>println(x.mkString(",")))

groupBy — 分组

返回的是分区号和迭代器

rdd3.groupBy(_%2).collect
res15: Array[(Int, Iterable[Int])] = Array((0,CompactBuffer(2, 4, 4, 10, -2)), (1,CompactBuffer(9)))

groupByKey — 按key分组

写wordcount的时候有用到,不需要参数,

rdd4.map(x=>(x.length,x))
res17: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[17] at map at <console>:26

res17.groupByKey.collect
res19: Array[(Int, Iterable[String])] = Array((6,CompactBuffer(python)), (10,CompactBuffer(helloWorld, javascript)), (5,CompactBuffer(hello, scala)))

histogram - - 直方图

输入的参数buckets可以是一个数字,也可以是一个列表
输出结果为一个元组,元组包含两个列表分别是桶(直方图的边界)和直方图的频数
【注意】:
1、桶必须是排好序的,并且不包含重复元素,至少有两个元素
2、所有直方图的结果集合区右边是开区间,最后一个区间除外。
计算方式 : 一、参数buckets是一个数字的情况: 根据桶的总数来计算
先用排好序的数组的边界值来得出两个桶之间的间距 (9.0-1.1)/5 = 1.58
所以得到第一个元组(直方图的边界) 1.1-2.68 2.68-4.26 以此类推
然后计算每个桶中的频数 : 数组中的数字在桶中的分布情况, 位于第一个区间(1.1~2.68) 中的数字有 1.1、1.2、1.3、2.0、2.1 一共5个 对应第二个数组中的第一个数字

val a = sc.parallelize(List(1.1,1.2,1.3,2.0,2.1,7.4,7.5,7.6,8.8,9.0),3)
a.histogram(5)
res0:(Array[Double], Array[Long]) = (Array(1.1, 2.68, 4.26, 5.84, 7.42, 9.0),Array(5, 0, 0, 1, 4))

id — 编号

返回RDD的编号

scala> rdd4.id
res20: Int = 11

scala> rdd3.id
res21: Int = 10

intersection — 交集

返回两个集合中相同元素

scala> rdd1.collect
res22: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

scala> rdd2.collect
res23: Array[Int] = Array(5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15)

rdd2.intersection(rdd1).collect
res25: Array[Int] = Array(6, 7, 8, 9, 10, 5)

isCheckpointed

检查一个RDD是否有检查点,返回true 或 false

isEmpty

判断RDD是否为空

keyBy

指定一个函数特定产生的数据作为RDD的key

scala> rdd4.collect
res28: Array[String] = Array(hello, scala, python, helloWorld, javascript)

scala> rdd4.keyBy(_.length).collect
res29: Array[(Int, String)] = Array((5,hello), (5,scala), (6,python), (10,helloWorld), (10,javascript))

keys

获取RDD中的元组的key,key可以重复出现,List类型可以使用

l1.keyBy(_.length).keys.collect
res31: Array[Int] = Array(5, 4, 4, 4)

join

join 用于两个key-value类型的RDD的内连接操作,类似于数据库中的内连接。只有两者的key相同时,才会连接

val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3)
val b = a.keyBy(_.length)
val c = sc.parallelize(List("dog","cat","gnu","salmon","rabbit","turkey","wolf","bear","bee"), 3)
//相同的key,就能连接在一起
val d = c.keyBy(_.length)
b.join(d).collect 

res0: Array[(Int, (String, String))] = Array((6,(salmon,salmon)), (6,(salmon,rabbit)), (6,(salmon,turkey)), (6,(salmon,salmon)), (6,(salmon,rabbit)), (6,(salmon,turkey)), (3,(dog,dog)), (3,(dog,cat)), (3,(dog,gnu)), (3,(dog,bee)), (3,(rat,dog)), (3,(rat,cat)), (3,(rat,gnu)), (3,(rat,bee)))

fullOuterJoin

类似于数据库中的外连接,一共有 n×m 行,每个元素都要出现。按照key进行分组。

val pairRDD1 = sc.parallelize(List( ("cat",2), ("cat", 5), ("book", 4),("gnu", 12)))
val pairRDD2 = sc.parallelize(List( ("cat",2), ("gnu", 5), ("mouse", 4),("cat", 12)))
pairRDD1.fullOuterJoin(pairRDD2).collect
//pairRDD1中的每个元素都会于 m 个 pairRDD2 个元素连接,形成一个 n×m 行的数据
res0: Array[(String, (Option[Int], Option[Int]))] = Array((gnu,(Some(12),Some(5))), (cat,(Some(2),Some(12))), (cat,(Some(2),Some(2))), (cat,(Some(5),Some(12))), (cat,(Some(5),Some(2))), (book,(Some(4),None)), (mouse,(None,Some(4))))

leftJoin

类似于数据库中的左外连接,以左边作为标准,右边没有的填缺失值,左边没有的右边有,舍弃掉。

val a = sc.parallelize(List(("dog",2),("salmon",2),("rat",1),("elephant",10)),3)
val b = sc.parallelize(List(("dog",2),("salmon",2),("rabbit",1),("cat",7)), 3)
a.leftOuterJoin(b).collect

//左边有的,在结果集中都有,左边没有的,右边都舍弃掉。以左边作为参考标准
res1:Array((rat,(1,None)), (salmon,(2,Some(2))), (elephant,(10,None)), (dog,(2,Some(2))))

rightOuterJoin

rightOuterJoin 类似于数据中的右外连接,以右边的作为参考,要是左边没有,那么就将其补空。右边没有的,左边有,那么就舍弃。

//设置两个key-value集合
val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3)
val b = a.keyBy(_.length)
val c =sc.parallelize(List("dog","cat","gnu","salmon","turkey","wolf","bear"), 3)
val d = c.keyBy(_.length)
b.rightOuterJoin(d).collect
//如果右边有相同的key,他们会按照多个key来计算
res2: Array[(Int, (Option[String], String))] = Array((6,(Some(salmon),salmon)), (6,(Some(salmon),turkey)), (6,(Some(salmon),salmon)), (6,(Some(salmon),turkey)), (3,(Some(dog),gnu)), (3,(Some(dog),dog)), (3,(Some(dog),cat)), (3,(Some(rat)Learning Spark——RDD常用操作

Spark RDD 的Transformation与Action的常用功能总结(Python版本)

Spark RDD常用算子操作 键值对关联操作 subtractByKey, join,fullOuterJoin, rightOuterJoin, leftOuterJoin

spark rdd分区数据支持哪些压缩格式

scala 常用方法

`map` 和 `reduce` 方法在 Spark RDD 中如何工作?