RDD——transformation_value类型
Posted chxyshaodiao
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RDD——transformation_value类型相关的知识,希望对你有一定的参考价值。
map(func)
返回一个新的RDD,该RDD由每一个输入元素经过func函数转换后组成。有多少个元素,func就被执行多少次。
mapPartitions(func)
类似于map,但是,map函数是独立地在RDD的每一个分区上运行,因此在类型为T的RDD上运行时,func的函数类型必须是Iterator[T] => Iterator[U](批量地接受数据,批量地返回数据)。
def main(args: Array[String]): Unit = { val sc: SparkContext = new SparkContext(new SparkConf() .setMaster("local[*]").setAppName("spark")) val rawData: RDD[Int] = sc.parallelize(Array[Int](1, 2, 3)) val processedWithMap: RDD[Int] = rawData.map(_ * 2) val processedWithMappartition: RDD[Int] = rawData.mapPartitions(x => x.map(_ * 2)) processedWithMap.collect().foreach(println) processedWithMappartition.collect().foreach(println) }
着两个函数都能正确地返回数据处理结果:
2
4
6
map与mapPartitions函数的比较:
map函数是以元素为单位,一条一条地处理数据。mapPartitions函数是以分区为单位,一个分区一个分区地处理数据。
从线程间通信的角度来看,map函数是一条一条的交换数据,通信效率比较低;而mapPartitions函数是一个分区一个分区地交换数据,通信效率比较高,但是若分区的数据未处理完,一整个分区的数据都得不到释放,可能导致OOM问题。
mapPartitionsWithIndex(func)
类似于mapPartitions,但func带有一个整数参数表示分片的索引值,因此在类型为T的RDD上运行时,func的函数类型必须是(Int, Interator[T]) => Iterator[U];(一个是int数据类型,一个是遍历器数据类型);这个index的传入为使得unc函数可以访问分区索引。
def main(args: Array[String]): Unit = { val sc: SparkContext = new SparkContext(new SparkConf() .setMaster("local[*]").setAppName("spark")) val rdd = sc.parallelize(Array("one","two","three","four"),4) val indexRdd = rdd.mapPartitionsWithIndex((index,items)=>(items.map((index,_)))) indexRdd.foreach(println) }
flatMap(func)
类似于map,但是每一个输入元素可以被映射为0或多个输出元素。它的函数类型是f: T => TraversableOnce[U]
def main(args: Array[String]): Unit = { val sc: SparkContext = new SparkContext(new SparkConf() .setMaster("local[*]").setAppName("spark")) val raw: RDD[String] = sc.parallelize(Array(("hello spark"), ("hello scala"))) val splitor: RDD[String] = raw.flatMap(_.split(" ")) splitor.collect().foreach(println); }
打印结果为:
hello
spark
hello
scala
glom()
将每一个分区形成一个数组,形成新的RDD类型RDD[Array[T]],这个rdd中的每个元素都是一个数组类型,数组中包含的是相应分区的所有元素。
def main(args: Array[String]): Unit = { val sc: SparkContext = new SparkContext(new SparkConf() .setMaster("local[*]").setAppName("spark")) val raw: RDD[Int] = sc.parallelize(1 to 16, 4) val processed: RDD[Array[Int]] = raw.glom() val results: Array[Array[Int]] = processed.collect() for(result <- results){ result.foreach(print) println() } }
打印结果为:
1234
5678
9101112
13141516
groupBy(func)
分组,按照传入函数的返回值进行分组。将相同的key对应的值放入一个迭代器。函数类型为f: T => K,也就是根据计算结果的不同将数据投入到不同的分区。其中有shuffle过程。
def main(args: Array[String]): Unit = { val sc: SparkContext = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("spark")) val raw: RDD[Int] = sc.parallelize(1 to 10) val processed: RDD[(Int, Iterable[Int])] = raw.groupBy(_ % 2) processed.saveAsTextFile("E:/idea/spark2/out/groupby") }
分别查看两个分区文件:(0,CompactBuffer(2, 4, 6, 8, 10)) (1,CompactBuffer(1, 3, 5, 7, 9))
filter(func)
过滤。返回一个新的RDD,该RDD由经过func函数计算后返回值为true的输入元素组成。函数类型为:f: T => Boolean
问题是:该函数是否存在shuffle过程
def main(args: Array[String]): Unit = { val sc: SparkContext = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("spark")) val raw: RDD[Int] = sc.parallelize(1 to 16,4) raw.saveAsTextFile("E:/idea/spark2/out/filter_before") val processed: RDD[Int] = raw.filter(_ % 2 == 0) processed.saveAsTextFile("E:/idea/spark2/out/filter_after") }
查看文件,每个对应的分区文件在过滤之后元素变少,但是元素并没有出现跨文件的移动。据此判断filter没有shuffle过程。
sample(withReplacement, fraction, seed)
以指定的随机种子随机抽样出数量为fraction的数据,withReplacement表示是抽出的数据是否放回,true为有放回的抽样,false为无放回的抽样,seed用于指定随机数生成器种子。
def main(args: Array[String]): Unit = { val sc: SparkContext = new SparkContext(new SparkConf() .setMaster("local[*]").setAppName("spark")) val raw: RDD[Int] = sc.parallelize(0 to 9) raw.sample(true,4,3).collect().foreach(println) println("----------") raw.sample(true,0.4,3).collect().foreach(println) println("----------") raw.sample(false,0.5,3).collect().foreach(println)
println("----------")
raw.sample(false,0.5,3).collect().foreach(println) } }
//最后两次打印的结果是一样的;这说明,固定这三个参数,每次随机抽样的结果是一定的。
@param withReplacement can elements be sampled multiple times (replaced when sampled out)
* @param fraction expected size of the sample as a fraction of this RDD‘s size
* without replacement: probability that each element is chosen; fraction must be [0, 1]
* with replacement: expected number of times each element is chosen; fraction must be greater
* than or equal to 0
* @param seed seed for the random number generator
*
/*
种子:根据这个种子,结合给定的算法,会生成一个随机数序列,由seed生成A,再由A生成B,由B生成C。。。种子一定,后面的随机数也是一定的
但是这里的fraction参数如何影响样本容量。
*/
coalesce(numPartitions) 案例
缩减分区数,用于大数据集过滤后,提高小数据集的执行效率。
如何缩减:将分区中的数据简单地加以合并,因此可能会导致数据在分区间的不平衡,也就是数据倾斜。
部分函数源代码如下:
def coalesce(numPartitions: Int, shuffle: Boolean = false, partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
从中可以看出:该函数默认的shuffle机制是关闭状态,也就是简单地将一个分区的数据转移到另外一个分区。比如原先有4个分区1,2,3,4,现在将分区数缩减为2,那么1,2合并为1个分区,3,4合并为一个分区。由于没有shuffle过程的存在,转换速度快,但是可能会导致数据倾斜问题,比如,两个分区数据量比较大,合并为一个大分区,而两外两个分区数据量小,合并为一个小分区。
下面来验证这一机制,紧接着filter算子的案例:
def main(args: Array[String]): Unit = { val sc: SparkContext = new SparkContext(new SparkConf() .setMaster("local[*]").setAppName("spark")) val raw: RDD[Int] = sc.parallelize(1 to 16,4) val processed: RDD[Int] = raw.filter(_ % 2 == 0) val coalsesced: RDD[Int] = processed.coalesce(2) coalsesced.saveAsTextFile("E:/idea/spark2/out/coalesce_afterwithshuffle") }
filter之后,四个分区的数据依次为:2 4,6 8,10 12,14 16,
在coalesce之后,两个分区的数据依次为:2 4 6 8 ,10 12 14 16;
启动coalesce的shuffle机制:
val coalsesced: RDD[Int] = processed.coalesce(2,true)
在coalesce之后,两个分区的数据依次为:2 6 10 14,4 8 12 16;
repartition(numPartitions)
根据分区数,重新通过网络随机洗牌所有数据。
部分源代码如下:
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope { coalesce(numPartitions, shuffle = true) }
它在底层调用了coalsesce函数,并且默认开启了shuffle机制。
sortBy(func,[ascending], [numTasks])
使用func先对数据进行处理,按照处理后的数据比较结果排序,默认为正序。存在shuffle
def sortBy[K]( f: (T) => K,//根据数据,得到排序的key ascending: Boolean = true, numPartitions: Int = this.partitions.length)//如果没有指定分区数,采用RDD之前的分区数
示例代码如下:
def main(args: Array[String]): Unit = { val sc: SparkContext = new SparkContext(new SparkConf() .setMaster("local[*]").setAppName("spark")) val raw: RDD[Int] = sc.parallelize(Array(4, 8, 7, 5, 9, 2, 1, 6, 3), 4) val sorted: RDD[Int] = raw.sortBy(x => x, true) sorted.saveAsTextFile("E:/idea/spark2/out/sorted") }
各个文件中的结果为:1 2 3,4 5,6 7,8 9
以上是关于RDD——transformation_value类型的主要内容,如果未能解决你的问题,请参考以下文章