驱动程序重新启动后 Spark Streaming 检查点不起作用
Posted
技术标签:
【中文标题】驱动程序重新启动后 Spark Streaming 检查点不起作用【英文标题】:Spark Streaming Checkpoint not working after driver restart 【发布时间】:2015-11-04 06:53:56 【问题描述】:我有一个简单的 spark 流应用程序,它从 rabbitMQ 读取数据 并在 1 分钟和 1 小时的窗口间隔上进行一些聚合,批处理间隔为 30 秒。
我有一个三节点设置。并启用检查点, 我已使用 sshfs 将同一目录挂载到所有工作节点以创建检查点。
当我第一次运行 spark 流应用程序时,它运行良好。 我可以看到结果打印在控制台上,并且在网络目录中发生了一些检查点。
但是在我杀死驱动程序并重新启动后,它会失败并出现以下异常
ERROR 2015-11-06 08:29:10 org.apache.spark.streaming.scheduler.JobScheduler: Error running job streaming job 1446778740000 ms.2
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 506.0 failed 4 times, most recent failure: Lost task 0.3 in stage 506.0 (TID 858, 10.29.23.166): java.lang.
Exception: Could not compute split, block input-0-1446778594400 not found
at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1273) ~[spark-core_2.10-1.4.1.3.jar:1.4.1.3]
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1264) ~[spark-core_2.10-1.4.1.3.jar:1.4.1.3]
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1263) ~[spark-core_2.10-1.4.1.3.jar:1.4.1.3]
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) ~[scala-library-2.10.5.jar:na]
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) ~[scala-library-2.10.5.jar:na]
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1263) ~[spark-core_2.10-1.4.1.3.jar:1.4.1.3]
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730) ~[spark-core_2.10-1.4.1.3.jar:1.4.1.3]
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730) ~[spark-core_2.10-1.4.1.3.jar:1.4.1.3]
at scala.Option.foreach(Option.scala:236) ~[scala-library-2.10.5.jar:na]
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730) ~[spark-core_2.10-1.4.1.3.jar:1.4.1.3]
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1457) ~[spark-core_2.10-1.4.1.3.jar:1.4.1.3]
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1418) ~[spark-core_2.10-1.4.1.3.jar:1.4.1.3]
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) [spark-core_2.10-1.4.1.3.jar:1.4.1.3]
WARN 2015-11-06 08:29:10 org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 507
WARN 2015-11-06 08:29:10 org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 508
WARN 2015-11-06 08:29:10 org.apache.spark.scheduler.TaskSetManager: Lost task 1.0 in stage 509.0 (TID 882): java.lang.Exception: Could not compute split, block input-0-1446778
622600 not found
at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
并且异常重复。
我没有向rabbitMQ 发送大量数据。当我第一次运行该作业时,我只转储
我已经尝试设置 "spark.streaming.unpersist","true"
。
我的设置有 3 个节点,每个节点都有一个核心分配给 spark,每个节点的执行器内存为 512MB。
Spark 版本 - 1.4.1 (DSE 4.8)
statio 接收器 rabbitmq - release 1.0
代码:
def createContext(checkpointDirectory: String, config: Config): StreamingContext =
println("Creating new context")
val conf = new SparkConf(true).setAppName(appName).set("spark.streaming.unpersist","true")
val ssc = new StreamingContext(conf, Seconds(config.getInt(batchIntervalParam)))
ssc.checkpoint(checkpointDirectory)
val isValid = validate(ssc, config)
if (isValid)
val result = runJob(ssc, config)
println("result is " + result)
else
println(isValid.toString)
ssc
def main(args: Array[String]): Unit =
if (args.length < 1)
println("Must specify the path to config file ")
println("Usage progname <path to config file> ")
return
val url = args(0)
logger.info("Starting " + appName)
println("Got the path as %s".format(url))
val source = scala.io.Source.fromFile(url)
val lines = try source.mkString finally source.close()
val config = ConfigFactory.parseString(lines)
val directoryPath = config.getString(checkPointParam)
val ssc = StreamingContext.getOrCreate(directoryPath, () =>
createContext(directoryPath,config)
)
ssc.start()
ssc.awaitTermination()
def getRabbitMQStream(config: Config, ssc: StreamingContext): ReceiverInputDStream[String] =
val rabbitMQHost = config.getString(rabbitmqHostParam)
val rabbitMQPort = config.getInt(rabbitmqPortParam)
val rabbitMQQueue = config.getString(rabbitmqQueueNameParam)
println("changing the memory lvel")
val receiverStream: ReceiverInputDStream[String] =
RabbitMQUtils.createStreamFromAQueue(ssc, rabbitMQHost, rabbitMQPort, rabbitMQQueue,StorageLevel.MEMORY_AND_DISK_SER)
receiverStream.start()
receiverStream
def getBaseDstream(config: Config, ssc: StreamingContext): ReceiverInputDStream[String] =
val baseDstream = config.getString(receiverTypeParam) match
case "rabbitmq" => getRabbitMQStream(config, ssc)
baseDstream
def runJob(ssc: StreamingContext, config: Config): Any =
val keyspace = config.getString(keyspaceParam)
val clientStatsTable = config.getString(clientStatsTableParam)
val hourlyStatsTable = config.getString(hourlyStatsTableParam)
val batchInterval = config.getInt(batchIntervalParam)
val windowInterval = config.getInt(windowIntervalParam)
val hourlyInterval = config.getInt(hourlyParam)
val limit = config.getInt(limitParam)
val lines = getBaseDstream(config, ssc)
val statsRDD = lines.filter(_.contains("client_stats")).map(_.split(",")(1))
val parserFunc = getProtobufParserFunction()
val clientUsageRDD: DStream[((String, String), Double)] = statsRDD.flatMap(x => parserFunc(x))
val formatterFunc = getJsonFormatterFunc()
val oneMinuteWindowResult = clientUsageRDD.reduceByKeyAndWindow((x: Double, y: Double) => x + y, Seconds(windowInterval), Seconds(batchInterval))
.map(x => ((x._1._2), ArrayBuffer((x._1._1, x._2))))
.reduceByKey((x, y) => (x ++ y))
.mapValues(x => (x.toList.sortBy(x => -x._2).take(limit)))
println("Client Usage from rabbitmq ")
oneMinuteWindowResult.map(x => (x._1, DateTime.now, formatterFunc(x._2))).saveToCassandra(keyspace, clientStatsTable)
oneMinuteWindowResult.print()
val HourlyResult = clientUsageRDD.reduceByKeyAndWindow((x: Double, y: Double) => x + y, Seconds(hourlyInterval), Seconds(batchInterval))
.map(x => ((x._1._2), ArrayBuffer((x._1._1, x._2))))
.reduceByKey((x, y) => (x ++ y))
.mapValues(x => (x.toList.sortBy(x => -x._2).take(limit)))
HourlyResult.map(x => (x._1, DateTime.now, formatterFunc(x._2))).saveToCassandra(keyspace, hourlyStatsTable)
HourlyResult.map(x => (x, "hourly")).print()
请帮我解决这个问题。
【问题讨论】:
能否请您添加完整的异常和 StreamingContext 初始化代码? 请在这里找到我的主要代码。我将发布异常。 【参考方案1】:您错误地创建了 StreamingContext 以使用检查点。
正如您在此处看到的:http://spark.apache.org/docs/1.4.1/streaming-programming-guide.html#how-to-configure-checkpointing 实例化 StreamingContext 以使用检查点的正确方法是:
// Function to create and setup a new StreamingContext
def functionToCreateContext(): StreamingContext =
val ssc = new StreamingContext(...) // new context
val lines = ssc.socketTextStream(...) // create DStreams
...
ssc.checkpoint(checkpointDirectory) // set checkpoint directory
ssc
// Get StreamingContext from checkpoint data or create a new one
val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)
// Do additional setup on context that needs to be done,
// irrespective of whether it is being started or restarted
context. ...
// Start the context
context.start()
context.awaitTermination()
如果特定检查点目录中没有数据,则只需要创建一个新的 StreamingContext 实例。
另外,关于 checkpointing 文件夹,据我所知,您需要在集群中安装 HDFS,而不是使用 sshfs 在节点之间共享数据:
配置检查点 - 如果流应用程序需要它, 然后是 Hadoop API 兼容容错存储中的一个目录 (例如 HDFS、S3 等)必须配置为检查点目录 以及以检查点的方式编写的流应用程序 信息可用于故障恢复
更多信息在这里:http://spark.apache.org/docs/1.4.1/streaming-programming-guide.html#requirements
希望对你有帮助。
【讨论】:
对不起,我粘贴了错误的代码。现在我已经编辑并粘贴了实际代码。 这是导致异常的代码。我正在使用 NFS 进行检查点 启用 WAL 后,代码运行正常。但它具有巨大的性能影响。性能下降了 50%。【参考方案2】:从检查点恢复时需要注意的两个要点-
spark.streaming.receiver.writeAheadLog.enable
应为 true
以启用预写日志。
创建DStream
、迭代RDD
批处理、写入HDFS和其他东西应该在传递给getOrCreate()
的回调中返回StreamingContext
之前完成。
def functionToCreateContext(): StreamingContext =
val ssc = new StreamingContext(...) // new context
val lines = ssc.socketTextStream(...) // create DStreams
// do all stuffs here
ssc.checkpoint(checkpointDirectory) // set checkpoint directory
// or here
ssc
// Get StreamingContext from checkpoint data or create a new one
val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)
// nothing can be done here once the context has been created by restoring from checkpoint
// Start the context
context.start()
context.awaitTermination()
这是问题 - "SparkException: DStream has not been initialized" when restoring StreamingContext from checkpoint and the dstream is created afterwards
【讨论】:
以上是关于驱动程序重新启动后 Spark Streaming 检查点不起作用的主要内容,如果未能解决你的问题,请参考以下文章