Spark Streaming Job 不可恢复

Posted

技术标签:

【中文标题】Spark Streaming Job 不可恢复【英文标题】:Spark Streaming Job is not recoverable 【发布时间】:2017-11-26 18:30:01 【问题描述】:

我正在使用带有初始 RDD 的 mapWithState 的火花流作业。重新启动应用程序并从检查点恢复时失败并出现错误:

此 RDD 缺少 SparkContext。它可能发生在以下情况:

    RDD 转换和动作不是由驱动程序调用的,而是在其他转换中调用的;例如,rdd1.map(x => rdd2.values.count() * x) 无效,因为值转换和计数操作无法在 rdd1.map 转换内部执行。有关详细信息,请参阅 SPARK-5063。 当 Spark Streaming 作业从检查点恢复时,如果在 DStream 操作中使用了对未由流作业定义的 RDD 的引用,则会遇到此异常。有关详细信息,请参阅 SPARK-13​​758

https://issues.apache.org/jira/browse/SPARK-13758 中描述了这种行为,但并未真正描述如何解决它。我的 RDD 不是由流式作业定义的,但我仍然需要它。

这是我的图表的示例:

class EventStreamingApplication 
  private val config: Config = ConfigFactory.load()
  private val sc: SparkContext = 
    val conf = new SparkConf()
      .setAppName(config.getString("streaming.appName"))
      .set("spark.cassandra.connection.host", config.getString("streaming.cassandra.host"))
    val sparkContext = new SparkContext(conf)
    System.setProperty("com.amazonaws.services.s3.enableV4", "true")
    sparkContext.hadoopConfiguration.set("com.amazonaws.services.s3.enableV4", "true")
    sparkContext
  

  def run(): Unit = 
    // streaming.eventCheckpointDir is an S3 Bucket
    val ssc: StreamingContext = StreamingContext.getOrCreate(config.getString("streaming.eventCheckpointDir"), createStreamingContext)
    ssc.start()
    ssc.awaitTermination()
  

  def receiver(ssc: StreamingContext): DStream[Event] = 
    RabbitMQUtils.createStream(ssc, Map(
      "hosts" -> config.getString("streaming.rabbitmq.host"),
      "virtualHost" -> config.getString("streaming.rabbitmq.virtualHost"),
      "userName" -> config.getString("streaming.rabbitmq.user"),
      "password" -> config.getString("streaming.rabbitmq.password"),
      "exchangeName" -> config.getString("streaming.rabbitmq.eventExchange"),
      "exchangeType" -> config.getString("streaming.rabbitmq.eventExchangeType"),
      "queueName" -> config.getString("streaming.rabbitmq.eventQueue")
    )).flatMap(EventParser.apply)
  

  def setupStreams(ssc: StreamingContext): Unit = 
    val events = receiver(ssc)
    ExampleJob(events, sc)
  

  private def createStreamingContext(): StreamingContext = 
    val ssc = new StreamingContext(sc, Seconds(config.getInt("streaming.batchSeconds")))
    setupStreams(ssc)
    ssc.checkpoint(config.getString("streaming.eventCheckpointDir"))
    ssc
  


case class Aggregation(value: Long) // Contains aggregation values

object ExampleJob 
  def apply(events: DStream[Event], sc: SparkContext): Unit = 
    val aggregations: RDD[(String, Aggregation)] = sc.cassandraTable('...', '...').map(...) // some domain class mapping
    val state = StateSpec
      .function((key, value, state) => 
        val oldValue = state.getOption().map(_.value).getOrElse(0)
        val newValue = oldValue + value.getOrElse(0)
        state.update(Aggregation(newValue))
        state.get
      )
      .initialState(aggregations)
      .numPartitions(1)
      .timeout(Seconds(86400))
    events
      .filter(...) // filter out unnecessary events
      .map(...) // domain class mapping to key, event dstream
      .groupByKey()
      .map(i => (i._1, i._2.size.toLong))
      .mapWithState(state)
      .stateSnapshots()
      .foreachRDD(rdd => 
        rdd.saveToCassandra(...)
      )
  

抛出的堆栈跟踪是:

Exception in thread "main" org.apache.spark.SparkException: This RDD lacks a SparkContext. It could happen in the following cases: 
(1) RDD transformations and actions are NOT invoked by the driver, but inside of other transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063.
(2) When a Spark Streaming job recovers from checkpoint, this exception will be hit if a reference to an RDD not defined by the streaming job is used in DStream operations. For more information, See SPARK-13758.
  at org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$sc(RDD.scala:89)
  at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
  at org.apache.spark.rdd.PairRDDFunctions.partitionBy(PairRDDFunctions.scala:534)
  at org.apache.spark.streaming.rdd.MapWithStateRDD$.createFromPairRDD(MapWithStateRDD.scala:193)
  at org.apache.spark.streaming.dstream.InternalMapWithStateDStream.compute(MapWithStateDStream.scala:146)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
  at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
  at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333)
  at scala.Option.orElse(Option.scala:289)
  at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330)
  at org.apache.spark.streaming.dstream.InternalMapWithStateDStream.compute(MapWithStateDStream.scala:134)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
  at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
  at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333)
  at scala.Option.orElse(Option.scala:289)
  ...
  <991 lines omitted>
  ...
  at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330)
  at org.apache.spark.streaming.dstream.InternalMapWithStateDStream.compute(MapWithStateDStream.scala:134)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
  at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
  at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335)
  at ... run in separate thread using org.apache.spark.util.ThreadUtils ... ()
  at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:577)
  at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:571)
  at com.example.spark.EventStreamingApplication.run(EventStreamingApplication.scala:31)
  at com.example.spark.EventStreamingApplication$.main(EventStreamingApplication.scala:63)
  at com.example.spark.EventStreamingApplication.main(EventStreamingApplication.scala)
  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
  at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  at java.lang.reflect.Method.invoke(Method.java:497)
  at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:743)
  at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:187)
  at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:212)
  at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:126)
  at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

【问题讨论】:

你能添加你的 Spark 图表吗? 代码?还是来自 UI 的东西? 代码,实际的 Spark DAG。 它很大,但我会把基本的东西放在一起并发布。 您能否也添加您在尝试恢复时看到的堆栈跟踪? 【参考方案1】:

似乎在 spark 尝试恢复时,没有选择正确的最新检查点文件。因为这个不正确的 RDD 被引用。

spark 2.1.1 版似乎受到影响,因为它不在固定版本列表中。

请参考以下链接获取尚未指定修复版本的 apache 文档。

https://issues.apache.org/jira/browse/SPARK-19280

在我看来,您可以尝试探索自动/手动解决方案,您可以在重新启动 spark 作业时指定最新的检查点文件。

我知道这没有多大帮助,但我认为最好向您解释此问题的根本原因以及解决该问题的当前开发以及我对可能解决方案的看法。

【讨论】:

这真的是同一个问题吗?对他们来说,这似乎只是偶尔发生。我真的不敢相信整个 mapWithState 都坏了。

以上是关于Spark Streaming Job 不可恢复的主要内容,如果未能解决你的问题,请参考以下文章

Spark Streaming源码解读之State管理之updateStateByKey和mapWithState解密

Spark Streaming源码解读之State管理之UpdataStateByKey和MapWithState解密

(版本定制)第14课:Spark Streaming源码解读之State管理之updateStateByKey和mapWithState解密

6.Spark streaming技术内幕 : Job动态生成原理与源码解析

Spark Streaming源码解读之Job详解

第6课:Spark Streaming源码解读之Job动态生成和深度思考