Spark的Key算子
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark的Key算子相关的知识,希望对你有一定的参考价值。
一 reduceByKey
原理
spark的根据key进行分区内数据预聚合,再进行最后进行聚合,好处是可以减少网络带宽的传输,适用于值聚合的情况
import org.apache.spark.rdd.RDD
import org.apache.spark.HashPartitioner, SparkConf, SparkContext
object Test11
def main(args: Array[String]): Unit =
val sparkconf = new SparkConf().setMaster("local[*]").setAppName("wordcount")
val sc =new SparkContext(sparkconf)
val rdd= sc.parallelize(List(1,2,5,7,8,9,3,4,4,5),3)
val rdds:RDD[(Int,Int)] =rdd.map
(_,1)
rdds.reduceByKey(_+_).glom().map(_.toList).collect().foreach(println)
print("*******************")
rdds.reduceByKey(new HashPartitioner(3),(_+_)).glom().map(_.toList).collect().foreach(println)
/**
* scala聚合算子 reduce foldleft ==>聚合成一个值
* spark的 根据key
*
*/
/**
* 当使用默认分区器是,分区的方法是根据rdd开始时设置的并行度,否则则根据rdd分区的长度进行分区,分区的分区器默认使用 HashPartitioner
*
* Choose a partitioner to use for a cogroup-like operation between a number of RDDs.
*
* If spark.default.parallelism is set, well use the value of SparkContext defaultParallelism
* as the default partitions number, otherwise well use the max number of upstream partitions.
*
* When available, we choose the partitioner from rdds with maximum number of partitions. If this
* partitioner is eligible (number of partitions within an order of maximum number of partitions
* in rdds), or has partition number higher than or equal to default partitions number - we use
* this partitioner.
*
* Otherwise, well use a new HashPartitioner with the default partitions number.
*
* Unless spark.default.parallelism is set, the number of partitions will be the same as the
* number of partitions in the largest upstream RDD, as this should be least likely to cause
* out-of-memory errors.
*
* We use two method parameters (rdd, others) to enforce callers passing at least 1 RDD.
*
def defaultPartitioner(rdd: RDD[_], others: RDD[_]*): Partitioner =
val rdds = (Seq(rdd) ++ others)
val hasPartitioner = rdds.filter(_.partitioner.exists(_.numPartitions > 0))
val hasMaxPartitioner: Option[RDD[_]] = if (hasPartitioner.nonEmpty)
Some(hasPartitioner.maxBy(_.partitions.length))
else
None
val defaultNumPartitions = if (rdd.context.conf.contains("spark.default.parallelism"))
rdd.context.defaultParallelism
else
rdds.map(_.partitions.length).max
// If the existing max partitioner is an eligible one, or its partitions number is larger
// than or equal to the default number of partitions, use the existing partitioner.
if (hasMaxPartitioner.nonEmpty && (isEligiblePartitioner(hasMaxPartitioner.get, rdds) ||
defaultNumPartitions <= hasMaxPartitioner.get.getNumPartitions))
hasMaxPartitioner.get.partitioner.get
else
new HashPartitioner(defaultNumPartitions)
*/
二groupBykey
~~
原理
与reduceByKey相同,都是根据相同key进行分组,但与groupbykey不同的是groupBykey只将value合并成为CompactBuffer,但需要更多的带宽
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkConf, SparkContext
object Test12
def main(args: Array[String]): Unit =
val sparkconf = new SparkConf().setMaster("local[*]").setAppName("wordcount")
val sc =new SparkContext(sparkconf)
val rdd= sc.parallelize(List(1,2,5,7,8,9,3,4,4,5),3)
val rdds:RDD[(Int,Int)] =rdd.map
(_,1)
//rdds.groupByKey().mapValues(_.sum).collect().foreach(println)
rdds.groupByKey().collect().foreach(println)
rdds.groupByKey().map(x=>x._2.toList).collect().foreach(println)
/**
* reduceBykey 使用预聚合
* groupbykey 需要更多的网络传输
*
*
* */
三foldByKey
原理
foldByKey采用和reduceBykey相同的分区聚合方法,但它多传递了一个固定的值,但是它只对分区聚合后的key起作用
val rdd1= sc.parallelize(List(3,3,3,1,2,5,7,8,9,3,4,4,5,3),3)
val rdd2= sc.parallelize(List(3,1,2,5,7,8,9,3,4,4,5,3),3)
将两个转化为k-v 结构后 rdd1聚合后结果为(3,13),rdd2结果为(3,15),因为上面多出来的两个3,将另外两个三合并为一个分区,导致最后的结果反而变小了
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkConf, SparkContext
object Test13
def main(args: Array[String]): Unit =
val sparkconf = new SparkConf().setMaster("local[*]").setAppName("wordcount")
val sc =new SparkContext(sparkconf)
val rdd= sc.parallelize(List(3,1,2,5,7,8,9,3,4,4,5,3),3)
val rdds:RDD[(Int,Int)] =rdd.map
(_,1)
rdds.foldByKey(4)(_+_).collect().foreach(println)
sc.stop()
/**
*
foldByKey 和reduceByKey 相同采用了分区内预聚合的方法,但和reduceByKey不同的是,foldKey传递一个参数,每个key的值会自动加上那个参数的值
* Merge the values for each key using an associative function and a neutral "zero value" which
* may be added to the result an arbitrary number of times, and must not change the result
* (e.g., Nil for list concatenation, 0 for addition, or 1 for multiplication.).
def foldByKey(
zeroValue: V,
partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)] = self.withScope
// Serialize the zero value to a byte array so that we can get a new clone of it on each key
val zeroBuffer = SparkEnv.get.serializer.newInstance().serialize(zeroValue)
val zeroArray = new Array[Byte](zeroBuffer.limit)
zeroBuffer.get(zeroArray)
// When deserializing, use a lazy val to create just one instance of the serializer per task
lazy val cachedSerializer = SparkEnv.get.serializer.newInstance()
val createZero = () => cachedSerializer.deserialize[V](ByteBuffer.wrap(zeroArray))
val cleanedFunc = self.context.clean(func)
combineByKeyWithClassTag[V]((v: V) => cleanedFunc(createZero(), v),
cleanedFunc, cleanedFunc, partitioner)
/**
* Merge the values for each key using an associative function and a neutral "zero value" which
* may be added to the result an arbitrary number of times, and must not change the result
* (e.g., Nil for list concatenation, 0 for addition, or 1 for multiplication.).
*/
* def foldByKey(zeroValue: V, numPartitions: Int)(func: (V, V) => V): RDD[(K, V)] = self.withScope
foldByKey(zeroValue, new HashPartitioner(numPartitions))(func)
*/
四 aggregateByKey
package com.rdd算子复习
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkConf, SparkContext
object Test14
def main(args: Array[String]): Unit =
val sparkconf = new SparkConf().setMaster("local[*]").setAppName("wordcount")
val sc =new SparkContext(sparkconf)
val rdd= sc.parallelize(List(("a",3),("b",-2),("a",3),("b",2),("d",3),("c",2),("a",3),("b",2),
("d",3),("b",2),("c",3),("b",2)),4)
val result = rdd.aggregateByKey((1,3))(
//比较获取分区内key分组后最大最小值 v代指value,此时的结构变成了(key,(value,value))
case ((min,max),v)=>(
//分区内聚合 将两个分区key最大最小值进行合并
min.min(v),max.max(v))
,
//将分区间的
case ((min1,max1),(min2,max2))=>((min1+min2,max1+max2))
)
result.collect().foreach(println)
rdd.aggregateByKey((0,0))(
case ((sum,count),v )=>(
sum+v,count+1
)
,
case ((sum1,count1),(sum2,count2))=>(sum1+sum2,count1+count2)
//偏函数转化
).map
case((k,(sum,count)))=>(k,sum.toDouble/count)
.collect().foreach(println)
sc.stop()
/**
* aggregateByKey适用于分区内外逻辑不同的情况
* 一 传递两个值 一个默认值(可以是一个元组),一个分区数
* 二 进行分区内的逻辑
* 三 进行分区间的逻辑
* Aggregate the values of each key, using given combine functions and a neutral "zero value".
* This function can return a different result type, U, than the type of the values in this RDD,
* V. Thus, we need one operation for merging a V into a U and one operation for merging two Us,
* as in scala.TraversableOnce. The former operation is used for merging values within a
* partition, and the latter is used for merging values between partitions. To avoid memory
* allocation, both of these functions are allowed to modify and return their first argument
* instead of creating a new U.
def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner)(seqOp: (U, V) => U,
combOp: (U, U) => U): RDD[(K, U)] = self.withScope
// Serialize the zero value to a byte array so that we can get a new clone of it on each key
val zeroBuffer = SparkEnv.get.serializer.newInstance().serialize(zeroValue)
val zeroArray = new Array[Byte](zeroBuffer.limit)
zeroBuffer.get(zeroArray)
lazy val cachedSerializer = SparkEnv.get.serializer.newInstance()
val createZero = () => cachedSerializer.deserialize[U](ByteBuffer.wrap(zeroArray))
// We will clean the combiner closure later in `combineByKey`
val cleanedSeqOp = self.context.clean(seqOp)
combineByKeyWithClassTag[U]((v: V) => cleanedSeqOp(createZero(), v),
cleanedSeqOp, combOp, partitioner)
*/
/**
* Aggregate the values of each key, using given combine functions and a neutral "zero value".
* This function can return a different result type, U, than the type of the values in this RDD,
* V. Thus, we need one operation for merging a V into a U and one operation for merging two Us,
* as in scala.TraversableOnce. The former operation is used for merging values within a
* partition, and the latter is used for merging values between partitions. To avoid memory
* allocation, both of these functions are allowed to modify and return their first argument
* instead of creating a new U.
def aggregateByKey[U: ClassTag](zeroValue: U, numPartitions: Int)(seqOp: (U, V) => U,
combOp: (U, U) => U): RDD[(K, U)] = self.withScope
aggregateByKey(zeroValue, new HashPartitioner(numPartitions))(seqOp, combOp)
* Aggregate the values of each key, using given combine functions and a neutral "zero value".
* This function can return a different result type, U, than the type of the values in this RDD,
* V. Thus, we need one operation for merging a V into a U and one operation for merging two Us,
* as in scala.TraversableOnce. The former operation is used for merging values within a
* partition, and the latter is used for merging values between partitions. To avoid memory
* allocation, both of these functions are allowed to modify and return their first argument
* instead of creating a new U.
* def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U,
combOp: (U, U) => U): RDD[(K, U)] = self.withScope
aggregateByKey(zeroValue, defaultPartitioner(self))(seqOp, combOp)
*
*
*
*
*/
五 combineByKey
package com.rdd算子复习
import org.apache.spark.SparkConf, SparkContext
object Test15
def main(args: Array[String]): Unit =
val sparkconf = new SparkConf().setMaster("local[*]").setAppName("wordcount")
val sc =new SparkContext(sparkconf)
val rdd= sc.parallelize(List(("a",3),("b",2),("a",3),("b",2),("d",3),("c",2),("a",3),("b",2),
("d",3),("b",2),("c",3),("b",2)),2)
/**
* 将值相加
* */
rdd.combineByKey(
v=>v,
(c:Int,v:Int)=>c+v,
(c1:Int,c2:Int)=>c1+c2
).collect().foreach(println)
println("***************************")
/**
* 获取分区最大值,然后将最大值相加
* */
rdd.combineByKey(
v=>v,
(max:Int,v:Int)=>max.max(v),
(max1:Int,max2:Int)=>max1+max2
).collect().foreach(println)
sc.stop()
/**
* combineByKey 分区内聚合和分区外聚合逻辑不同,
/*
* Generic function to combine the elements for each key using a custom set of aggregation
* functions. This method is here for backward compatibility. It does not provide combiner
* classtag information to the shuffle.
*
* @see `combineByKeyWithClassTag`
*/
def combineByKey[C](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C,
partitioner: Partitioner,
mapSideCombine: Boolean = true,
serializer: Serializer = null): RDD[(K, C)] = self.withScope
combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners,
partitioner, mapSideCombine, serializer)(null)
/**
* Simplified version of combineByKeyWithClassTag that hash-partitions the output RDD.
* This method is here for backward compatibility. It does not provide combiner
* classtag information to the shuffle.
*
* @see `combineByKeyWithClassTag`
*/
def combineByKey[C](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C,
numPartitions: Int): RDD[(K, C)] = self.withScope
combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners, numPartitions)(null)
/**
* Simplified version of combineByKeyWithClassTag that hash-partitions the output RDD.
*/
def combineByKeyWithClassTag[C](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C,
numPartitions: Int)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope
combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners,
new HashPartitioner(numPartitions))
*/
六 sortByKey
根据key来排序
import org.apache.spark.SparkConf, SparkContext
object Test16
def main(args: Array[String]): Unit =
val sparkconf = new SparkConf().setMaster("local[*]").setAppName("wordcount")
val sc =new SparkContext(sparkconf)
val rdd= sc.parallelize(List(("a",3),("b",2),("a",3),("b",2),("d",3),("c",2),("a",3),("b",2),
("d",3),("b",2),("c",3),("b",2)),2)
//默认为升序排列
rdd.sortByKey(false).collect().foreach(println)
sc.stop()
以上是关于Spark的Key算子的主要内容,如果未能解决你的问题,请参考以下文章