驱动程序重新启动后 Spark Streaming 检查点不起作用

Posted

技术标签:

【中文标题】驱动程序重新启动后 Spark Streaming 检查点不起作用【英文标题】:Spark Streaming Checkpoint not working after driver restart 【发布时间】:2015-11-04 06:53:56 【问题描述】:

我有一个简单的 spark 流应用程序,它从 rabbitMQ 读取数据 并在 1 分钟和 1 小时的窗口间隔上进行一些聚合,批处理间隔为 30 秒。

我有一个三节点设置。并启用检查点, 我已使用 sshfs 将同一目录挂载到所有工作节点以创建检查点。

当我第一次运行 spark 流应用程序时,它运行良好。 我可以看到结果打印在控制台上,并且在网络目录中发生了一些检查点。

但是在我杀死驱动程序并重新启动后,它会失败并出现以下异常

        ERROR 2015-11-06 08:29:10 org.apache.spark.streaming.scheduler.JobScheduler: Error running job streaming job 1446778740000 ms.2
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 506.0 failed 4 times, most recent failure: Lost task 0.3 in stage 506.0 (TID 858, 10.29.23.166): java.lang.
Exception: Could not compute split, block input-0-1446778594400 not found
        at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
        at org.apache.spark.scheduler.Task.run(Task.scala:70)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1273) ~[spark-core_2.10-1.4.1.3.jar:1.4.1.3]
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1264) ~[spark-core_2.10-1.4.1.3.jar:1.4.1.3]
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1263) ~[spark-core_2.10-1.4.1.3.jar:1.4.1.3]
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) ~[scala-library-2.10.5.jar:na]
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) ~[scala-library-2.10.5.jar:na]
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1263) ~[spark-core_2.10-1.4.1.3.jar:1.4.1.3]
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730) ~[spark-core_2.10-1.4.1.3.jar:1.4.1.3]
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730) ~[spark-core_2.10-1.4.1.3.jar:1.4.1.3]
        at scala.Option.foreach(Option.scala:236) ~[scala-library-2.10.5.jar:na]
        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730) ~[spark-core_2.10-1.4.1.3.jar:1.4.1.3]
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1457) ~[spark-core_2.10-1.4.1.3.jar:1.4.1.3]
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1418) ~[spark-core_2.10-1.4.1.3.jar:1.4.1.3]
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) [spark-core_2.10-1.4.1.3.jar:1.4.1.3]
WARN  2015-11-06 08:29:10 org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 507
WARN  2015-11-06 08:29:10 org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 508
WARN  2015-11-06 08:29:10 org.apache.spark.scheduler.TaskSetManager: Lost task 1.0 in stage 509.0 (TID 882): java.lang.Exception: Could not compute split, block input-0-1446778
622600 not found
        at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
        at org.apache.spark.scheduler.Task.run(Task.scala:70)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)

并且异常重复。

我没有向rabbitMQ 发送大量数据。当我第一次运行该作业时,我只转储

我已经尝试设置 "spark.streaming.unpersist","true" 。 我的设置有 3 个节点,每个节点都有一个核心分配给 spark,每个节点的执行器内存为 512MB。

Spark 版本 - 1.4.1 (DSE 4.8)

statio 接收器 rabbitmq - release 1.0

代码:

def createContext(checkpointDirectory: String, config: Config): StreamingContext = 
    println("Creating new context")

    val conf = new SparkConf(true).setAppName(appName).set("spark.streaming.unpersist","true")

    val ssc = new StreamingContext(conf, Seconds(config.getInt(batchIntervalParam)))
    ssc.checkpoint(checkpointDirectory)
    val isValid = validate(ssc, config)

    if (isValid) 
      val result = runJob(ssc, config)
      println("result is " + result)
     else 
      println(isValid.toString)
    

    ssc
 

  def main(args: Array[String]): Unit = 

    if (args.length < 1) 
      println("Must specify the path to config file ")
      println("Usage progname <path to config file> ")
      return
    
    val url = args(0)
    logger.info("Starting " + appName)
    println("Got the path as %s".format(url))
    val source = scala.io.Source.fromFile(url)
    val lines = try source.mkString finally source.close()
    val config = ConfigFactory.parseString(lines)
    val directoryPath = config.getString(checkPointParam)

    val ssc = StreamingContext.getOrCreate(directoryPath, () => 
      createContext(directoryPath,config)
    )

    ssc.start()
    ssc.awaitTermination()
  


  def getRabbitMQStream(config: Config, ssc: StreamingContext): ReceiverInputDStream[String] = 
    val rabbitMQHost = config.getString(rabbitmqHostParam)
    val rabbitMQPort = config.getInt(rabbitmqPortParam)
    val rabbitMQQueue = config.getString(rabbitmqQueueNameParam)
    println("changing the memory lvel")
    val receiverStream: ReceiverInputDStream[String] = 
      RabbitMQUtils.createStreamFromAQueue(ssc, rabbitMQHost, rabbitMQPort, rabbitMQQueue,StorageLevel.MEMORY_AND_DISK_SER)
    
    receiverStream.start()
    receiverStream
  

  def getBaseDstream(config: Config, ssc: StreamingContext): ReceiverInputDStream[String] = 
    val baseDstream = config.getString(receiverTypeParam) match 
      case "rabbitmq" => getRabbitMQStream(config, ssc)
    
    baseDstream
  

  def runJob(ssc: StreamingContext, config: Config): Any = 

    val keyspace = config.getString(keyspaceParam)
    val clientStatsTable = config.getString(clientStatsTableParam)
    val hourlyStatsTable = config.getString(hourlyStatsTableParam)
    val batchInterval = config.getInt(batchIntervalParam)
    val windowInterval = config.getInt(windowIntervalParam)
    val hourlyInterval = config.getInt(hourlyParam)
    val limit = config.getInt(limitParam)

    val lines = getBaseDstream(config, ssc)
    val statsRDD = lines.filter(_.contains("client_stats")).map(_.split(",")(1))

    val parserFunc = getProtobufParserFunction()
    val clientUsageRDD: DStream[((String, String), Double)] = statsRDD.flatMap(x => parserFunc(x))
    val formatterFunc = getJsonFormatterFunc()
    val oneMinuteWindowResult = clientUsageRDD.reduceByKeyAndWindow((x: Double, y: Double) => x + y, Seconds(windowInterval), Seconds(batchInterval))
      .map(x => ((x._1._2), ArrayBuffer((x._1._1, x._2))))
      .reduceByKey((x, y) => (x ++ y))
      .mapValues(x => (x.toList.sortBy(x => -x._2).take(limit)))

    println("Client Usage from rabbitmq ")
    oneMinuteWindowResult.map(x => (x._1, DateTime.now, formatterFunc(x._2))).saveToCassandra(keyspace, clientStatsTable)
    oneMinuteWindowResult.print()

    val HourlyResult = clientUsageRDD.reduceByKeyAndWindow((x: Double, y: Double) => x + y, Seconds(hourlyInterval), Seconds(batchInterval))
      .map(x => ((x._1._2), ArrayBuffer((x._1._1, x._2))))
      .reduceByKey((x, y) => (x ++ y))
      .mapValues(x => (x.toList.sortBy(x => -x._2).take(limit)))

    HourlyResult.map(x => (x._1, DateTime.now, formatterFunc(x._2))).saveToCassandra(keyspace, hourlyStatsTable)
    HourlyResult.map(x => (x, "hourly")).print()

  

请帮我解决这个问题。

【问题讨论】:

能否请您添加完整的异常和 StreamingContext 初始化代码? 请在这里找到我的主要代码。我将发布异常。 【参考方案1】:

您错误地创建了 StreamingContext 以使用检查点。

正如您在此处看到的:http://spark.apache.org/docs/1.4.1/streaming-programming-guide.html#how-to-configure-checkpointing 实例化 StreamingContext 以使用检查点的正确方法是:

// Function to create and setup a new StreamingContext
def functionToCreateContext(): StreamingContext = 
    val ssc = new StreamingContext(...)   // new context
    val lines = ssc.socketTextStream(...) // create DStreams
    ...
    ssc.checkpoint(checkpointDirectory)   // set checkpoint directory
    ssc


// Get StreamingContext from checkpoint data or create a new one
val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)

// Do additional setup on context that needs to be done,
// irrespective of whether it is being started or restarted
context. ...

// Start the context
context.start()
context.awaitTermination()

如果特定检查点目录中没有数据,则只需要创建一个新的 StreamingContext 实例。

另外,关于 checkpointing 文件夹,据我所知,您需要在集群中安装 HDFS,而不是使用 sshfs 在节点之间共享数据:

配置检查点 - 如果流应用程序需要它, 然后是 Hadoop API 兼容容错存储中的一个目录 (例如 HDFS、S3 等)必须配置为检查点目录 以及以检查点的方式编写的流应用程序 信息可用于故障恢复

更多信息在这里:http://spark.apache.org/docs/1.4.1/streaming-programming-guide.html#requirements

希望对你有帮助。

【讨论】:

对不起,我粘贴了错误的代码。现在我已经编辑并粘贴了实际代码。 这是导致异常的代码。我正在使用 NFS 进行检查点 启用 WAL 后,代码运行正常。但它具有巨大的性能影响。性能下降了 50%。【参考方案2】:

从检查点恢复时需要注意的两个要点-

    spark.streaming.receiver.writeAheadLog.enable 应为 true 以启用预写日志。

    创建DStream、迭代RDD批处理、写入HDFS和其他东西应该在传递给getOrCreate()的回调中返回StreamingContext之前完成。

    def functionToCreateContext(): StreamingContext = 
        val ssc = new StreamingContext(...)   // new context
        val lines = ssc.socketTextStream(...) // create DStreams
        // do all stuffs here
        ssc.checkpoint(checkpointDirectory)   // set checkpoint directory
        // or here
        ssc
    
    
    // Get StreamingContext from checkpoint data or create a new one
    val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)
    
    // nothing can be done here once the context has been created by restoring from checkpoint
    
    // Start the context
    context.start()
    context.awaitTermination()
    

这是问题 - "SparkException: DStream has not been initialized" when restoring StreamingContext from checkpoint and the dstream is created afterwards

【讨论】:

以上是关于驱动程序重新启动后 Spark Streaming 检查点不起作用的主要内容,如果未能解决你的问题,请参考以下文章

从检查点重新启动后,Spark 流选项卡消失

Spark Streaming Job 不可恢复

Spark Streaming应用启动过程分析

Spark Streaming应用启动过程分析

Spark Streaming应用启动过程分析

Spark Structured Streaming框架之进程管理