spark的broadcast理解

Posted ZL小屁孩

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了spark的broadcast理解相关的知识,希望对你有一定的参考价值。

Broadcast(使用BroadcastManager管理)一般用于处理共享配置文件、常用的数据结构等;但Broadcast不适合存放太大数据,Broadcast不会内存溢出,因为数据保存级别StoreageLevelMEMORY_AND_DISK模式。数据量大会造成网络I/O和单点压力大,因此,当数据量较大时不要使用broadcast,网络成本较大,会适得其反。

广播变量:实际上就是Driver端的变量通过Broadcast方法传输到Executor端,Executor端不能修改广播变量的值,使用广播变量是为了减少Executor端的数据备份,减少Executor端的内存。

如:driver端有100个变量,如果不使用广播变量的话,从driver端发给executor端有100个备份,使用了广播变量,就会只有一个备份,从而可以减少executor端的内存。

注意:
        1).不能将RDD广播出去,可以将RDD的结果广播出去
        2).广播变量在Driver定义,在Exector端不可改变,在Executor端不能定义。

private def writeBlocks(value: T): Int = 
// 在Driver中存储广播变量的副本,以便在Driver上运行任务,不是创建广播变量值得副本
SparkEnv.get.blockManager.putSingle(broadcastId, value, StorageLevel.MEMORY_AND_DISK,
tellMaster = false)
//序列化数据对象,并拆分成多个数据块
val blocks =TorrentBroadcast.blockifyObject(value, blockSize, SparkEnv.get.serializer, compressionCodec)
blocks.zipWithIndex.foreach  case (block, i) =>
//存储数据块到BlockManager
SparkEnv.get.blockManager.putBytes(BroadcastBlockId(id, "piece" + i),block,StorageLevel.MEMORY_AND_DISK_SER,tellMaster = true)
blocks.length

代码:

object BroadcastTest 

  /**首次Broadcast变量在executor中反序列化,broadcast数据是从driver中拉取的,并存储在executor的BlockManager中
    * */
  def main(args: Array[String]) 

    val blockSize = if (args.length > 2) args(2) else "4096"

    val spark: SparkSession = SparkSession
      .builder()
      .master("local[*]")
      .appName("Broadcast Test")
      .config("spark.broadcast.blockSize", blockSize)
      .getOrCreate()

    val sc = spark.sparkContext

    val slices = if (args.length > 0) args(0).toInt else 2
    val num = if (args.length > 1) args(1).toInt else 1000000

    val arr1 = (0 until num).toArray

    for (i <- 0 until 3) 
      println(s"Iteration $i")
      val startTime = System.nanoTime()
      val barr1 = sc.broadcast(arr1)
      val observedSizes = sc.parallelize(1 to 10, slices).map(_ => barr1.value.length)
      /**不能将RDD广播出去,可以将RDD的结果(如rdd.collect)广播出去;
        * 广播变量在Driver定义,在Executor端不可改变,在Executor端不能定义*/
      observedSizes.collect().foreach(i => println(i))
      println("Iteration %d took %.0f milliseconds".format(i, (System.nanoTime - startTime) / 1E6))
    

    spark.stop()
  

 

以上是关于spark的broadcast理解的主要内容,如果未能解决你的问题,请参考以下文章

Spark广播变量broadcast案例

spark中的广播变量broadcast

spark小案例——RDD,broadcast

Spark调优Broadcast广播变量

在 Spark Streaming 作业中获取 broadcast_1 的 broadcast_1_piece0 失败

Spark Broadcast总结