Spark---RDD常用方法
Posted Shall潇
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark---RDD常用方法相关的知识,希望对你有一定的参考价值。
前言:RDD算子分为Transformation(转换算子)、Action(行动算子)
转换算子—lazy—只记录操作,只有遇到行动算子才会真正的计算
行动算子—non-lazy
本质上动作算子通过SparkContext执行提交作业操作,触发RDD DAG(有向无环图)的执行
文章目录
- ++
- aggregate
- cartesian --- 笛卡尔
- coalesce --- 分区
- collect
- checkpoint --- 检查点
- context
- sparkContext
- count --- 统计个数
- countByValue --- 统计一个key对应的多个value个数
- dependencies---依赖
- distinct --- 去重
- filter --- 过滤
- first --- 第一个元素
- flatMap --- 扁平化 同Scala
- fold --- 同Scala
- foreach --- 遍历
- foreachAsync --- 异步遍历
- foreachPartition --- 一个分区执行一次遍历
- foreachPartitionAsync ---同理异步
- getNumPartitions --- 获取分区数
- partitions.size --- 获取分区数
- getStorageLevel --- 存储等级
- glom --- 合并
- groupBy --- 分组
- groupByKey --- 按key分组
- histogram - - 直方图
- id --- 编号
- intersection --- 交集
- isCheckpointed
- isEmpty
- keyBy
- keys
- join
- fullOuterJoin
- leftJoin
- rightOuterJoin
- lookup
- map
- mapPartitions
- mapPartitionsWithIndex
- max --- 最大值
- min --- 最小值
- mean --- 平均值
- name
- setName
- partitions
- persist,cache
- pipe
- popStdev --- 标准差
- popVariance --- 方差
- randomSplit
- reduce
- reduceByKey
- repartition --- 重新分区
- sample --- 抽样
- sampleStdev --- 抽样标准差
- sampleVariance --- 抽样方差
- saveAsObjectFile --- 保存为二进制格式
- saveASTextFile --- 保存为文本格式
- sortBy --- 排序
- stats
- stdev --- 标准差
- subtract --- 差集
- sum --- 求和
- take --- 取前几个元素
- takeOrdered
- takeSample
- toDF
- toDS
- toDebugString
- toJavaRDD
- toLocalIterator
- toString
- top
- treeAggregate
- treeReduce
- union
- unpersist
- variance --- 方差
- zip
- zipPartitions
- zipWithIndex
- zipWithUniqueId
val rdd1 = sc.makeRDD(1 to 10)
val rdd2 = sc.parallelize(5 to 15)
val rdd3 = sc.makeRDD(Array(2,4,4,9,10,-2))
val rdd4 = sc.makeRDD(Array("hello","scala","python","helloWorld","javascript"))
++
两个RDD的组合,和Union作用一样
aggregate
和Scala中的一样
rdd1.aggregate(0)(_+_,_+_).collect
cartesian — 笛卡尔
rdd1.cartesian(rdd2).foreach(println)
coalesce — 分区
小—>大 :true; 大—>小 :false
rdd1.coalesce(3,true)
collect
val c = sc.parallelize(List("Gnu", "Cat", "Rat", "Dog", "Gnu", "Rat"), 2)
c.collect
res1: Array[String] = Array(Gnu, Cat, Rat, Dog, Gnu, Rat)
checkpoint — 检查点
checkpoint 检查点机制,假设你在迭代1000次的计算中在第999次失败了,然后你没有checkpoint,你只能重新开始恢复了,如果恰好你在第998次迭代的时候你做了一个checkpoint,那么你只需要恢复第998次产生的rdd,然后再执行2次迭代完成总共1000的迭代,这样效率就很高,比较适用于迭代计算非常复杂的情况,也就是恢复计算代价非常高的情况,适当进行checkpoint会有很大的好处。
sc.setCheckpointDir("hdfs://192.168.XXX.71:9000/wc")
//检查点目录必须存在
val a = sc.parallelize(1 to 5)
a.checkpoint
//将其结果检查点更新
a.collect
res1: Long = 5
context
和SparkContext一样返回创建RDD时候的SparkContext
scala> rdd3.context
res2: org.apache.spark.SparkContext = org.apache.spark.SparkContext@41aa99fc
scala> rdd3.sparkContext
res3: org.apache.spark.SparkContext = org.apache.spark.SparkContext@41aa99fc
sparkContext
同上
count — 统计个数
print(rdd1.count)
countByValue — 统计一个key对应的多个value个数
rdd3.countByValue
res101: scala.collection.Map[Int,Long] = Map(5 -> 1, 1 -> 1, 6 -> 1, 2 -> 1, 3 -> 1, 4 -> 1)
dependencies—依赖
查看当前RDD是否有依赖,可以帮助恢复重建RDD
1、没有依赖
rdd3.dependencies
res103: Seq[org.apache.spark.Dependency[_]] = List()
2、有一个依赖
rdd3.map(_+2)
res104: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[75] at map at <console>:26
res104.dependencies
res105: Seq[org.apache.spark.Dependency[_]] = List(org.apache.spark.OneToOneDependency@27917a6b)
distinct — 去重
rdd3.distinct().foreach(println)
filter — 过滤
rdd3.filter(_%2==0).foreach(println)
first — 第一个元素
print(rdd3.first)
flatMap — 扁平化 同Scala
操作类似map,但是其粒度更小
fold — 同Scala
有初始值,只有一个函数
foreach — 遍历
foreachAsync — 异步遍历
foreachPartition — 一个分区执行一次遍历
foreachPartitionAsync —同理异步
getNumPartitions — 获取分区数
print(rdd3.getNumPartitions)
partitions.size — 获取分区数
同上
getStorageLevel — 存储等级
rdd3.getStorageLevel
res102: org.apache.spark.storage.StorageLevel = StorageLevel(1 replicas)
glom — 合并
将RDD的每一个分区作为一个单独的包装
val rg = rdd3.groupBy(_%2)
rg.glom.foreach(x=>println(x.mkString(",")))
groupBy — 分组
返回的是分区号和迭代器
rdd3.groupBy(_%2).collect
res15: Array[(Int, Iterable[Int])] = Array((0,CompactBuffer(2, 4, 4, 10, -2)), (1,CompactBuffer(9)))
groupByKey — 按key分组
写wordcount的时候有用到,不需要参数,
rdd4.map(x=>(x.length,x))
res17: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[17] at map at <console>:26
res17.groupByKey.collect
res19: Array[(Int, Iterable[String])] = Array((6,CompactBuffer(python)), (10,CompactBuffer(helloWorld, javascript)), (5,CompactBuffer(hello, scala)))
histogram - - 直方图
输入的参数buckets可以是一个数字,也可以是一个列表
输出结果为一个元组,元组包含两个列表分别是桶(直方图的边界)和直方图的频数
【注意】:
1、桶必须是排好序的,并且不包含重复元素,至少有两个元素
2、所有直方图的结果集合区右边是开区间,最后一个区间除外。
计算方式 : 一、参数buckets是一个数字的情况: 根据桶的总数来计算
先用排好序的数组的边界值来得出两个桶之间的间距 (9.0-1.1)/5 = 1.58
所以得到第一个元组(直方图的边界) 1.1-2.68 2.68-4.26 以此类推
然后计算每个桶中的频数 : 数组中的数字在桶中的分布情况, 位于第一个区间(1.1~2.68) 中的数字有 1.1、1.2、1.3、2.0、2.1 一共5个 对应第二个数组中的第一个数字
val a = sc.parallelize(List(1.1,1.2,1.3,2.0,2.1,7.4,7.5,7.6,8.8,9.0),3)
a.histogram(5)
res0:(Array[Double], Array[Long]) = (Array(1.1, 2.68, 4.26, 5.84, 7.42, 9.0),Array(5, 0, 0, 1, 4))
id — 编号
返回RDD的编号
scala> rdd4.id
res20: Int = 11
scala> rdd3.id
res21: Int = 10
intersection — 交集
返回两个集合中相同元素
scala> rdd1.collect
res22: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
scala> rdd2.collect
res23: Array[Int] = Array(5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15)
rdd2.intersection(rdd1).collect
res25: Array[Int] = Array(6, 7, 8, 9, 10, 5)
isCheckpointed
检查一个RDD是否有检查点,返回true 或 false
isEmpty
判断RDD是否为空
keyBy
指定一个函数特定产生的数据作为RDD的key
scala> rdd4.collect
res28: Array[String] = Array(hello, scala, python, helloWorld, javascript)
scala> rdd4.keyBy(_.length).collect
res29: Array[(Int, String)] = Array((5,hello), (5,scala), (6,python), (10,helloWorld), (10,javascript))
keys
获取RDD中的元组的key,key可以重复出现,List类型可以使用
l1.keyBy(_.length).keys.collect
res31: Array[Int] = Array(5, 4, 4, 4)
join
join 用于两个key-value类型的RDD的内连接操作,类似于数据库中的内连接。只有两者的key相同时,才会连接
val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3)
val b = a.keyBy(_.length)
val c = sc.parallelize(List("dog","cat","gnu","salmon","rabbit","turkey","wolf","bear","bee"), 3)
//相同的key,就能连接在一起
val d = c.keyBy(_.length)
b.join(d).collect
res0: Array[(Int, (String, String))] = Array((6,(salmon,salmon)), (6,(salmon,rabbit)), (6,(salmon,turkey)), (6,(salmon,salmon)), (6,(salmon,rabbit)), (6,(salmon,turkey)), (3,(dog,dog)), (3,(dog,cat)), (3,(dog,gnu)), (3,(dog,bee)), (3,(rat,dog)), (3,(rat,cat)), (3,(rat,gnu)), (3,(rat,bee)))
fullOuterJoin
类似于数据库中的外连接,一共有 n×m 行,每个元素都要出现。按照key进行分组。
val pairRDD1 = sc.parallelize(List( ("cat",2), ("cat", 5), ("book", 4),("gnu", 12)))
val pairRDD2 = sc.parallelize(List( ("cat",2), ("gnu", 5), ("mouse", 4),("cat", 12)))
pairRDD1.fullOuterJoin(pairRDD2).collect
//pairRDD1中的每个元素都会于 m 个 pairRDD2 个元素连接,形成一个 n×m 行的数据
res0: Array[(String, (Option[Int], Option[Int]))] = Array((gnu,(Some(12),Some(5))), (cat,(Some(2),Some(12))), (cat,(Some(2),Some(2))), (cat,(Some(5),Some(12))), (cat,(Some(5),Some(2))), (book,(Some(4),None)), (mouse,(None,Some(4))))
leftJoin
类似于数据库中的左外连接,以左边作为标准,右边没有的填缺失值,左边没有的右边有,舍弃掉。
val a = sc.parallelize(List(("dog",2),("salmon",2),("rat",1),("elephant",10)),3)
val b = sc.parallelize(List(("dog",2),("salmon",2),("rabbit",1),("cat",7)), 3)
a.leftOuterJoin(b).collect
//左边有的,在结果集中都有,左边没有的,右边都舍弃掉。以左边作为参考标准
res1:Array((rat,(1,None)), (salmon,(2,Some(2))), (elephant,(10,None)), (dog,(2,Some(2))))
rightOuterJoin
rightOuterJoin 类似于数据中的右外连接,以右边的作为参考,要是左边没有,那么就将其补空。右边没有的,左边有,那么就舍弃。
//设置两个key-value集合
val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3)
val b = a.keyBy(_.length)
val c =sc.parallelize(List("dog","cat","gnu","salmon","turkey","wolf","bear"), 3)
val d = c.keyBy(_.length)
b.rightOuterJoin(d).collect
//如果右边有相同的key,他们会按照多个key来计算
res2: Array[(Int, (Option[String], String))] = Array((6,(Some(salmon),salmon)), (6,(Some(salmon),turkey)), (6,(Some(salmon),salmon)), (6,(Some(salmon),turkey)), (3,(Some(dog),gnu)), (3,(Some(dog),dog)), (3,(Some(dog),cat)), (3,(Some(rat)Learning Spark——RDD常用操作
Spark RDD 的Transformation与Action的常用功能总结(Python版本)
Spark RDD常用算子操作 键值对关联操作 subtractByKey, join,fullOuterJoin, rightOuterJoin, leftOuterJoin