spark的broadcast理解
Posted ZL小屁孩
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了spark的broadcast理解相关的知识,希望对你有一定的参考价值。
Broadcast(使用BroadcastManager管理)一般用于处理共享配置文件、常用的数据结构等;但Broadcast不适合存放太大数据,Broadcast不会内存溢出,因为数据保存级别StoreageLevel是MEMORY_AND_DISK模式。数据量大会造成网络I/O和单点压力大,因此,当数据量较大时不要使用broadcast,网络成本较大,会适得其反。
广播变量:实际上就是Driver端的变量通过Broadcast方法传输到Executor端,Executor端不能修改广播变量的值,使用广播变量是为了减少Executor端的数据备份,减少Executor端的内存。
如:driver端有100个变量,如果不使用广播变量的话,从driver端发给executor端有100个备份,使用了广播变量,就会只有一个备份,从而可以减少executor端的内存。
注意:
1).不能将RDD广播出去,可以将RDD的结果广播出去
2).广播变量在Driver定义,在Exector端不可改变,在Executor端不能定义。
private def writeBlocks(value: T): Int =
// 在Driver中存储广播变量的副本,以便在Driver上运行任务,不是创建广播变量值得副本
SparkEnv.get.blockManager.putSingle(broadcastId, value, StorageLevel.MEMORY_AND_DISK,
tellMaster = false)
//序列化数据对象,并拆分成多个数据块
val blocks =TorrentBroadcast.blockifyObject(value, blockSize, SparkEnv.get.serializer, compressionCodec)
blocks.zipWithIndex.foreach case (block, i) =>
//存储数据块到BlockManager
SparkEnv.get.blockManager.putBytes(BroadcastBlockId(id, "piece" + i),block,StorageLevel.MEMORY_AND_DISK_SER,tellMaster = true)
blocks.length
代码:
object BroadcastTest
/**首次Broadcast变量在executor中反序列化,broadcast数据是从driver中拉取的,并存储在executor的BlockManager中
* */
def main(args: Array[String])
val blockSize = if (args.length > 2) args(2) else "4096"
val spark: SparkSession = SparkSession
.builder()
.master("local[*]")
.appName("Broadcast Test")
.config("spark.broadcast.blockSize", blockSize)
.getOrCreate()
val sc = spark.sparkContext
val slices = if (args.length > 0) args(0).toInt else 2
val num = if (args.length > 1) args(1).toInt else 1000000
val arr1 = (0 until num).toArray
for (i <- 0 until 3)
println(s"Iteration $i")
val startTime = System.nanoTime()
val barr1 = sc.broadcast(arr1)
val observedSizes = sc.parallelize(1 to 10, slices).map(_ => barr1.value.length)
/**不能将RDD广播出去,可以将RDD的结果(如rdd.collect)广播出去;
* 广播变量在Driver定义,在Executor端不可改变,在Executor端不能定义*/
observedSizes.collect().foreach(i => println(i))
println("Iteration %d took %.0f milliseconds".format(i, (System.nanoTime - startTime) / 1E6))
spark.stop()
以上是关于spark的broadcast理解的主要内容,如果未能解决你的问题,请参考以下文章