为啥运行数天后在 Spark Streaming 中读取广播变量会出现异常?
Posted
技术标签:
【中文标题】为啥运行数天后在 Spark Streaming 中读取广播变量会出现异常?【英文标题】:Why reading broadcast variable in Spark Streaming got exception after days of running?为什么运行数天后在 Spark Streaming 中读取广播变量会出现异常? 【发布时间】:2016-02-01 06:49:15 【问题描述】:我在我的项目中使用 Spark Streaming (Spark V1.6.0) 和 HBase,并且 HBase(HBase V1.1.2) 配置在具有广播变量的执行器之间传输。 Spark Streaming 应用程序一开始工作,大约 2 天后会出现异常。
val hBaseContext: HBaseContext = new HBaseContext(sc, HBaseCock.hBaseConfiguration())
private def _materialDStream(dStream: DStream[(String, Int)], columnName: String, batchSize: Int) =
hBaseContext.streamBulkIncrement[(String, Int)](
dStream,
hTableName,
(t) =>
val rowKey = t._1
val incVal = t._2
val increment = new Increment(Bytes.toBytes(rowKey))
increment.addColumn(Bytes.toBytes(hFamily), Bytes.toBytes(columnName), incVal)
increment
, batchSize)
HBaseContext 的整个源文件可以在HBaseContext.scala 找到,一些sn-ps 可以在下面找到。
运行几天后,会出现异常,堆栈跟踪是:
Unable to getConfig from broadcast
16/02/01 10:08:10 ERROR Executor: Exception in task 3.0 in stage 187175.0 (TID 561527)
逻辑如下:
-
使用配置(HBaseContext)创建HBaseContext并广播配置(如果指定了文件路径,则将配置保存到文件中)
在连接HBase之前,首先会检查config字段是否为null,如果是则从指定文件恢复,如果没有指定文件路径,则从广播变量恢复。
从广播变量恢复配置时会出现问题,并且在“configBroadcast.value.value”中从广播读取值时发生异常。
我猜如果 master 失败,Spark Streaming 是否不会恢复广播变量,而 getOrCreate() 用于获取 SparkStreaming 实例。我对 HBaseContext.scala 源代码更好奇,该文件优先于广播变量来恢复值。那么在 Spark Streaming 中使用广播的最佳实践是什么?我是否需要将它们存储在文件中,比如 HDFS 中的文件?
class HBaseContext(@transient sc: SparkContext, @transient config: Configuration, val tmpHdfsConfgFile: String = null) extends Serializable
@transient var tmpHdfsConfiguration: Configuration = config
val broadcastedConf = sc.broadcast(new SerializableWritable(config))
if(tmpHdfsConfgFile != null && config != null)
// save config to file
private def getConf(configBroadcast: Broadcast[SerializableWritable[Configuration]]): Configuration =
if (tmpHdfsConfiguration != null)
tmpHdfsConfiguration
else if (tmpHdfsConfgFile != null)
// read config from file
tmpHdfsConfiguration
if (tmpHdfsConfiguration == null)
try
// Exception happens here!!!
tmpHdfsConfiguration = configBroadcast.value.value
tmpHdfsConfiguration
catch
case ex: Exception =>
println("Unable to getConfig from broadcast")
tmpHdfsConfiguration
【问题讨论】:
【参考方案1】:由于某种原因重新启动 spark 作业后,广播变量会被重置。或者驱动程序在作业失败后与尝试重新关联。
在流式作业的情况下,要使用广播变量,应该在创建 StreamingContext 之前从 sprarkContext 初始化广播。这将确保在流式传输开始时广播变量可用。
JavaSparkContext javaSparkContext = createSparkContext();
Broadcast<BroadcastContext> broadcastContext = getBroadcastContext(javaSparkContext);
JavaStreamingContext javaStreamingContext = JavaStreamingContext.getOrCreate(sparkCheckPointDir,
() -> processor.create(sparkCheckPointDir, javaSparkContext));
【讨论】:
以上是关于为啥运行数天后在 Spark Streaming 中读取广播变量会出现异常?的主要内容,如果未能解决你的问题,请参考以下文章
为啥我的 Google 云端硬盘刷新令牌会在 7 天后过期?