滴滴金融杨凡_Spark之SortShuffle

Posted 大数据那些事

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了滴滴金融杨凡_Spark之SortShuffle相关的知识,希望对你有一定的参考价值。



01

PART

Spark之SortShuffle


SortShuffleManager两种运行机制

SortShuffleManager的运行机制主要分成两种,一种是普通运行机制,另一种是bypass运行机制;

普通机制:


bypass机制:


普通机制为默认使用,在1.2之前使用HashShuffleManager;使用bypass机制需要满足shuffle map task数量小于spark.shuffle.sort.bypassMergeThreshold参数的值并且mapSideCombine(受聚合算子影响)不为true


private[spark] object SortShuffleWriter { def shouldBypassMergeSort(conf: SparkConf, dep: ShuffleDependency[_, _, _]): Boolean = { // We cannot bypass sorting if we need to do map-side aggregation. if (dep.mapSideCombine) { require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!") false } else { val bypassMergeThreshold: Int = conf.getInt("spark.shuffle.sort.bypassMergeThreshold", 200) dep.partitioner.numPartitions <= bypassMergeThreshold } }}// 创建shuffleHandleroverride def registerShuffle[K, V, C]( shuffleId: Int, numMaps: Int, dependency: ShuffleDependency[K, V, C]): ShuffleHandle = { if (SortShuffleWriter.shouldBypassMergeSort(conf, dependency)) { // If there are fewer than spark.shuffle.sort.bypassMergeThreshold partitions and we don't // need map-side aggregation, then write numPartitions files directly and just concatenate // them at the end. This avoids doing serialization and deserialization twice to merge // together the spilled files, which would happen with the normal code path. The downside is // having multiple files open at a time and thus more memory allocated to buffers. new BypassMergeSortShuffleHandle[K, V]( shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]]) } else if (SortShuffleManager.canUseSerializedShuffle(dependency)) { // Otherwise, try to buffer map outputs in a serialized form, since this is more efficient:


      //帮助器方法,用于确定洗牌是否应使用优化的序列化洗牌路径,或者是否应该退回到对反序列化对象操作的原始路径。// 同时满足序列化器支持其序列化对象的重定位,未定义聚合器 aggregator ,numPartitions小于2^24;才能使用

SerializedShuffleHandle new SerializedShuffleHandle[K, V]( shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]]) } else { // Otherwise, buffer map outputs in a deserialized form: new BaseShuffleHandle(shuffleId, numMaps, dependency) } }


spark.shuffle.sort.bypassMergeThreshold

带来的影响


从上述过程来看,调整spark.shuffle.sort.bypassMergeThreshold的值,使其大于等于分区数,能有效的减少部分Shuffler算子的排序过程;同时能有效的减少rdd的复制情况,源码如下 private def needToCopyObjectsBeforeShuffle( partitioner: Partitioner, serializer: Serializer): Boolean = { val conf = SparkEnv.get.conf val shuffleManager = SparkEnv.get.shuffleManager val sortBasedShuffleOn = shuffleManager.isInstanceOf[SortShuffleManager] val bypassMergeThreshold = conf.getInt("spark.shuffle.sort.bypassMergeThreshold", 200) if (sortBasedShuffleOn) { val bypassIsSupported = SparkEnv.get.shuffleManager.isInstanceOf[SortShuffleManager] if (bypassIsSupported && partitioner.numPartitions <= bypassMergeThreshold) { false } else if (serializer.supportsRelocationOfSerializedObjects) { // 满足序列化器支持其序列化对象的重定位 false } else { true } } else { true } }
val rddWithPartitionIds: RDD[Product2[Int, InternalRow]] = { if (needToCopyObjectsBeforeShuffle(part, serializer)) { rdd.mapPartitionsInternal { iter => val getPartitionKey = getPartitionKeyExtractor() iter.map { row => (part.getPartition(getPartitionKey(row)), row.copy()) } } } else { rdd.mapPartitionsInternal { iter => val getPartitionKey = getPartitionKeyExtractor() val mutablePair = new MutablePair[Int, InternalRow]() iter.map { row => mutablePair.update(part.getPartition(getPartitionKey(row)), row) } } } }

上述源码中如果序列化器不支持对象的重定位,则可能会产生数据复制;目前spark集成的JavaSerializer是不支持重定位的,但KryoSerializer支持重定位


扫码入群和大佬们一起讨论技术

来稿请投邮箱:miaochuanhai@126.com

以上是关于滴滴金融杨凡_Spark之SortShuffle的主要内容,如果未能解决你的问题,请参考以下文章

大数据之Spark:Spark 的两种核心 Shuffle

杨凡:房价暴跌,贷款压力测试会取消吗?

只了解MapReduce的Shuffle?Spark Shuffle了解一下

大数据技术之_16_Scala学习_11_客户信息管理系统+并发编程模型 Akka+Akka 网络编程-小黄鸡客服案例+Akka 网络编程-Spark Master Worker 进程通讯项目(示例代

SortShuffle之UnsafeShuffleWriter

Spark 在金融领域的应用之日内走势预测