Spark Streaming metadata checkpoint

Posted 世界那么大,我想去看看

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark Streaming metadata checkpoint相关的知识,希望对你有一定的参考价值。

一个流应用程序必须全天候运行,所有必须能够解决应用程序逻辑无关的故障(如系统错误,JVM崩溃等)。为了使这成为可能,Spark Streaming需要checkpoint足够的信息到容错存储系统中, 以使系统从故障中恢复。

  • Metadata checkpointing:保存流计算的定义信息到容错存储系统如HDFS中。这用来恢复应用程序中运行worker的节点的故障。元数据包括
    • Configuration :创建Spark Streaming应用程序的配置信息
    • DStream operations :定义Streaming应用程序的操作集合
    • Incomplete batches:操作存在队列中的未完成的批
  • Data checkpointing :保存生成的RDD到可靠的存储系统中,这在有状态transformation(如结合跨多个批次的数据)中是必须的。在这样一个transformation中,生成的RDD依赖于之前 批的RDD,随着时间的推移,这个依赖链的长度会持续增长。在恢复的过程中,为了避免这种无限增长。有状态的transformation的中间RDD将会定时地存储到可靠存储系统中,以截断这个依赖链。

元数据checkpoint主要是为了从driver故障中恢复数据。如果transformation操作被用到了,数据checkpoint即使在简单的操作中都是必须的。

 

Metadata checkpointing

相关代码:

 
def createContext(checkpointDirectory: String)
: StreamingContext = {
    // If you do not see this printed, that means the StreamingContext has been loaded
    // from the new checkpoint
    println("Creating new context")
    val sparkConf = new SparkConf().setAppName("DynamicRange")
    // Create the context with a 1 second batch size
    val ssc = new StreamingContext(sparkConf, Seconds(8))
    ssc.checkpoint(checkpointDirectory)
    ...
    //你的kafka streaming的相关代码最好放在这里,不然有可能抛异常:spark checkpoint KafkaInputDStream has not been initialize
    //create kafka stream
    val fullLines = KafkaUtils.createStream(ssc, SystemConfig.config.kafkaZkQuorum, SystemConfig.config.kafkaGroup, topicMap);
    //parse data string
    val valueLines = fullLines.map(_._2)
    ..
    ssc
}

def main(args: Array[String]) {
    var ssc: StreamingContext = null
    try {
        ssc = StreamingContext.getOrCreate(".", () => {
            println("get context fail, try to create a new one.")
            createContext(".")
        })
    } catch{
        case e:Exception =>{
            println("get context exception, try to create a new one.")
            ssc = createContext(".")
        }
    }

    ssc.start()
    ssc.awaitTermination()
}
 

注意:

  1. kafka streaming的相关代码最好放在createContext里面,不然有可能抛异常:spark checkpoint KafkaInputDStream has not been initialize。
  2. 不同版本之间的Spark Driver是不能从文件中恢复的,所以这里我用try catch如果有异常就新建一个context。

以上是关于Spark Streaming metadata checkpoint的主要内容,如果未能解决你的问题,请参考以下文章

Spark Streaming使用Kafka保证数据零丢失

Spark Streaming和Kafka整合保证数据零丢失

Spark 系列(十六)—— Spark Streaming 整合 Kafka

spark streaming kafka example

.Spark Streaming(上)--实时流计算Spark Streaming原理介

Spark Streaming实时流处理项目实战Spark Streaming整合Kafka实战一