为啥运行数天后在 S​​park Streaming 中读取广播变量会出现异常?

Posted

技术标签:

【中文标题】为啥运行数天后在 S​​park Streaming 中读取广播变量会出现异常?【英文标题】:Why reading broadcast variable in Spark Streaming got exception after days of running?为什么运行数天后在 S​​park Streaming 中读取广播变量会出现异常? 【发布时间】:2016-02-01 06:49:15 【问题描述】:

我在我的项目中使用 Spark Streaming (Spark V1.6.0) 和 HBase,并且 HBase(HBase V1.1.2) 配置在具有广播变量的执行器之间传输。 Spark Streaming 应用程序一开始工作,大​​约 2 天后会出现异常。

  val hBaseContext: HBaseContext = new HBaseContext(sc, HBaseCock.hBaseConfiguration())
  private def _materialDStream(dStream: DStream[(String, Int)], columnName: String, batchSize: Int) = 
    hBaseContext.streamBulkIncrement[(String, Int)](
      dStream,
      hTableName,
      (t) => 
        val rowKey = t._1
        val incVal = t._2
        val increment = new Increment(Bytes.toBytes(rowKey))
        increment.addColumn(Bytes.toBytes(hFamily), Bytes.toBytes(columnName), incVal)
        increment
      , batchSize)
  

HBaseContext 的整个源文件可以在HBaseContext.scala 找到,一些sn-ps 可以在下面找到。

运行几天后,会出现异常,堆栈跟踪是:

Unable to getConfig from broadcast
16/02/01 10:08:10 ERROR Executor: Exception in task 3.0 in stage 187175.0 (TID 561527)

逻辑如下:

    使用配置(HBaseContext)创建HBaseContext并广播配置(如果指定了文件路径,则将配置保存到文件中) 在连接HBase之前,首先会检查config字段是否为null,如果是则从指定文件恢复,如果没有指定文件路径,则从广播变量恢复。

从广播变量恢复配置时会出现问题,并且在“configBroadcast.value.value”中从广播读取值时发生异常。

我猜如果 master 失败,Spark Streaming 是否不会恢复广播变量,而 getOrCreate() 用于获取 SparkStreaming 实例。我对 HBaseContext.scala 源代码更好奇,该文件优先于广播变量来恢复值。那么在 Spark Streaming 中使用广播的最佳实践是什么?我是否需要将它们存储在文件中,比如 HDFS 中的文件?

class HBaseContext(@transient sc: SparkContext, @transient config: Configuration, val tmpHdfsConfgFile: String = null) extends Serializable
    @transient var tmpHdfsConfiguration: Configuration = config

    val broadcastedConf = sc.broadcast(new SerializableWritable(config))

    if(tmpHdfsConfgFile != null && config != null)
      // save config to file
    

    private def getConf(configBroadcast: Broadcast[SerializableWritable[Configuration]]): Configuration = 

      if (tmpHdfsConfiguration != null) 
        tmpHdfsConfiguration
       else if (tmpHdfsConfgFile != null) 
        // read config from file

        tmpHdfsConfiguration
      
      if (tmpHdfsConfiguration == null) 
        try 
          // Exception happens here!!!
          tmpHdfsConfiguration = configBroadcast.value.value
          tmpHdfsConfiguration
         catch 
          case ex: Exception => 
            println("Unable to getConfig from broadcast")
          
        
      
    tmpHdfsConfiguration
  

【问题讨论】:

【参考方案1】:

由于某种原因重新启动 spark 作业后,广播变量会被重置。或者驱动程序在作业失败后与尝试重新关联。

在流式作业的情况下,要使用广播变量,应该在创建 StreamingContext 之前从 sprarkContext 初始化广播。这将确保在流式传输开始时广播变量可用。

JavaSparkContext javaSparkContext = createSparkContext();

Broadcast<BroadcastContext> broadcastContext = getBroadcastContext(javaSparkContext);

JavaStreamingContext javaStreamingContext = JavaStreamingContext.getOrCreate(sparkCheckPointDir,
                () -> processor.create(sparkCheckPointDir, javaSparkContext));

【讨论】:

以上是关于为啥运行数天后在 S​​park Streaming 中读取广播变量会出现异常?的主要内容,如果未能解决你的问题,请参考以下文章

为啥 ARKit 应用程序在几天后停止工作?

为啥刷新令牌会在 14 天后过期

为啥我的 Google 云端硬盘刷新令牌会在 7 天后过期?

为啥我的VS2005用几天后就老是出现加载项目失败啊.我把系统还原了.从新安装用几天后又是这样.

nginx+php运行几天后偶尔会出现404错误

Park Visit(树的直径)