Spark-Join优化之Broadcast
Posted 0xcafedaddy
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark-Join优化之Broadcast相关的知识,希望对你有一定的参考价值。
适用场景
- 进行join中至少有一个RDD的数据量比较少(比如几百M,或者1-2G)
- 因为,每个Executor的内存中,都会驻留一份广播变量的全量数据
Broadcast与map进行join代码示例
创建RDD
val list1 = List((jame,23), (wade,3), (kobe,24)) val list2 = List((jame,cave), (wade,bulls), (kobe,lakers)) val rdd1 = sc.makeRDD(list1) val rdd2 = sc.makeRDD(list2)
传统的join
// 传统的join操作会导致shuffle操作。 // 因为两个RDD中,相同的key都需要通过网络拉取到一个节点上,由一个task进行join操作。 val rdd3 = rdd1.join(rdd2) // 结果如下 scala> rdd1.join(rdd2).collect res27: Array[(String, (Int, String))] = Array((kobe,(24,lakers)), (wade,(3,bulls)), (jame,(23,cave)))
使用Broadcast+map的join操作
// Broadcast+map的join操作,不会导致shuffle操作。 // 使用Broadcast将一个数据量较小的RDD作为广播变量 val rdd2Data = rdd2.collect() val rdd2Bc = sc.broadcast(rdd2Data) // 在rdd1.map算子中,可以从rdd2DataBroadcast中,获取rdd2的所有数据。 // 然后进行遍历,如果发现rdd2中某条数据的key与rdd1的当前数据的key是相同的,那么就判定可以进行join。 def function(tuple: (String,Int)): (String,(Int,String)) ={ for(value <- rdd2Bc.value){ if(value._1.equals(tuple._1)) return (tuple._1,(tuple._2,value._2.toString)) } (tuple._1,(tuple._2,null)) } // 在rdd1.map算子中,可以从rdd2DataBroadcast中,获取rdd2的所有数据。 // 然后进行遍历,如果发现rdd2中某条数据的key与rdd1的当前数据的key是相同的,那么就判定可以进行join。 // 此时就可以根据自己需要的方式,将rdd1当前数据与rdd2中可以连接的数据,拼接在一起(String或Tuple)。 val rdd3 = rdd1.map(function(_)) //结果如下,达到了与传统join相同的效果 scala> rdd1.map(function(_)).collect res31: Array[(String, (Int, String))] = Array((jame,(23,cave)), (wade,(3,bulls)), (kobe,(24,lakers)))
以上是关于Spark-Join优化之Broadcast的主要内容,如果未能解决你的问题,请参考以下文章
vue之自行实现派发与广播-dispatch与broadcast
Android入门第48天-静态BroadCast之接受开机广播
Android 逆向整体加固脱壳 ( DEX 优化流程分析 | dvmDexFileOpenPartial | dexFileParse | 脱壳点 | 获取 dex 文件在内存中的首地址 )(代码片