spark流检查点恢复非常非常慢

Posted

技术标签:

【中文标题】spark流检查点恢复非常非常慢【英文标题】:spark streaming checkpoint recovery is very very slow 【发布时间】:2016-11-18 08:05:00 【问题描述】: 目标:通过 Spark 流从 Kinesis 读取数据并以 Parquet 格式将数据存储到 S3。 情况: 应用程序最初运行良好,批量运行 1 小时,处理时间平均不到 30 分钟。出于某种原因,假设应用程序崩溃了,我们尝试从检查点重新启动。现在处理需要永远并且不会向前推进。 我们尝试以 1 分钟的批处理间隔测试相同的东西,处理运行良好,批处理完成需要 1.2 分钟。当我们从检查点恢复时,每批大约需要 15 分钟。 注意事项: 我们使用 s3 作为检查点 使用 1 个执行器,每个执行器有 19g 内存和 3 个核心

附上截图:

首次运行 - 检查点恢复之前

尝试从检查点恢复:

Config.scala

object Config 

  val sparkConf = new SparkConf


  val sc = new SparkContext(sparkConf)

  val sqlContext = new HiveContext(sc)


  val eventsS3Path = sc.hadoopConfiguration.get("eventsS3Path")
  val useIAMInstanceRole = sc.hadoopConfiguration.getBoolean("useIAMInstanceRole",true)

  val checkpointDirectory =  sc.hadoopConfiguration.get("checkpointDirectory")

//  sc.hadoopConfiguration.set("spark.sql.parquet.output.committer.class","org.apache.spark.sql.parquet.DirectParquetOutputCommitter")

  DateTimeZone.setDefault(DateTimeZone.forID("America/Los_Angeles"))

  val numStreams = 2

  def getSparkContext(): SparkContext = 
    this.sc
  

  def getSqlContext(): HiveContext = 
    this.sqlContext
  






S3Basin.scala

object S3Basin 
  def main(args: Array[String]): Unit = 
    Kinesis.startStreaming(s3basinFunction _)
  

  def s3basinFunction(streams : DStream[Array[Byte]]): Unit =
    streams.foreachRDD(jsonRDDRaw =>
      println(s"Old partitions $jsonRDDRaw.partitions.length")
      val jsonRDD = jsonRDDRaw.coalesce(10,true)
      println(s"New partitions $jsonRDD.partitions.length")

      if(!jsonRDD.isEmpty())
        val sqlContext =  SQLContext.getOrCreate(jsonRDD.context)

        sqlContext.read.json(jsonRDD.map(f=>
          val str = new String(f)
          if(str.startsWith("\"message\""))
            str.substring(11,str.indexOf("@version")-2)
          
          else
            str
          
        )).registerTempTable("events")

        sqlContext.sql(
          """
            |select
            |to_date(from_utc_timestamp(from_unixtime(at), 'US/Pacific')) as event_date,
            |hour(from_utc_timestamp(from_unixtime(at), 'US/Pacific')) as event_hour,
            |*
            |from events
          """.stripMargin).coalesce(1).write.mode(SaveMode.Append).partitionBy("event_date", "event_hour","verb").parquet(Config.eventsS3Path)


        sqlContext.dropTempTable("events")
      
    )
  

Kinesis.scala

object Kinesis


  def functionToCreateContext(streamFunc: (DStream[Array[Byte]]) => Unit): StreamingContext = 
    val streamingContext = new StreamingContext(Config.sc, Minutes(Config.sc.hadoopConfiguration.getInt("kinesis.StreamingBatchDuration",1)))   // new context
    streamingContext.checkpoint(Config.checkpointDirectory)   // set checkpoint directory
    val sc = Config.getSparkContext

    var awsCredentails : BasicAWSCredentials = null
    val kinesisClient = if(Config.useIAMInstanceRole)
      new AmazonKinesisClient()
    
    else
      awsCredentails = new BasicAWSCredentials(sc.hadoopConfiguration.get("kinesis.awsAccessKeyId"),sc.hadoopConfiguration.get("kinesis.awsSecretAccessKey"))
      new AmazonKinesisClient(awsCredentails)
    


    val endpointUrl = sc.hadoopConfiguration.get("kinesis.endpointUrl")
    val appName = sc.hadoopConfiguration.get("kinesis.appName")

    val streamName = sc.hadoopConfiguration.get("kinesis.streamName")

    kinesisClient.setEndpoint(endpointUrl)
    val numShards = kinesisClient.describeStream(streamName).getStreamDescription().getShards().size

    val batchInterval = Minutes(sc.hadoopConfiguration.getInt("kinesis.StreamingBatchDuration",1))

    // Kinesis checkpoint interval is the interval at which the DynamoDB is updated with information
    // on sequence number of records that have been received. Same as batchInterval for this
    // example.
    val kinesisCheckpointInterval = batchInterval

    // Get the region name from the endpoint URL to save Kinesis Client Library metadata in
    // DynamoDB of the same region as the Kinesis stream
    val regionName = sc.hadoopConfiguration.get("kinesis.regionName")


    val kinesisStreams = (0 until Config.numStreams).map  i =>
        println(s"creating stream for $i")
        if(Config.useIAMInstanceRole)
          KinesisUtils.createStream(streamingContext, appName, streamName, endpointUrl, regionName,
            InitialPositionInStream.TRIM_HORIZON, kinesisCheckpointInterval, StorageLevel.MEMORY_AND_DISK_2)

        else
          KinesisUtils.createStream(streamingContext, appName, streamName, endpointUrl, regionName,
            InitialPositionInStream.TRIM_HORIZON, kinesisCheckpointInterval, StorageLevel.MEMORY_AND_DISK_2,awsCredentails.getAWSAccessKeyId,awsCredentails.getAWSSecretKey)

        
      

    val unionStreams = streamingContext.union(kinesisStreams)
    streamFunc(unionStreams)

    streamingContext
  


  def startStreaming(streamFunc: (DStream[Array[Byte]]) => Unit) = 

    val sc = Config.getSparkContext

    if(sc.defaultParallelism < Config.numStreams+1)
      throw  new Exception(s"Number of shards = $Config.numStreams , number of processor = $sc.defaultParallelism")
    

    val streamingContext =  StreamingContext.getOrCreate(Config.checkpointDirectory, () => functionToCreateContext(streamFunc))


//    sys.ShutdownHookThread 
//      println("Gracefully stopping Spark Streaming Application")
//      streamingContext.stop(true, true)
//      println("Application stopped greacefully")
//    
//

    streamingContext.start()
    streamingContext.awaitTermination()


  





DAG

【问题讨论】:

图片很棒,但我们可以看看你的代码吗?最好是你的 Spark DAG。 @YuvalItzchakov 添加了代码和 DAG 这是迄今为止我在 SO 上看到的最有趣的帖子。感谢所有的截图=D @Kristian 关于我在哪里可以找到更多信息的任何线索? @amit_kumar 我看图片没问题。这可能是您的本地浏览器问题。回到问题上来,您是否刚刚尝试查看您的服务器是否正在交换?即使有 19G 的内存,它也可能会做一些意想不到的事情。 【参考方案1】:

提出了一个 Jira 问题:https://issues.apache.org/jira/browse/SPARK-19304

问题是因为我们每次迭代读取的数据多于所需数据,然后丢弃了这些数据。这可以通过对getResults aws 调用添加限制来避免。

修复:https://github.com/apache/spark/pull/16842

【讨论】:

@interfector 你可能对这个感兴趣 修复它的程度很棒@gaurav-shah !!【参考方案2】:

重新启动失败的驱动程序时,会发生以下情况:

    恢复计算 – 检查点信息用于 重新启动驱动程序,重建上下文并重新启动所有 接收者。 恢复块元数据 – 将要恢复的所有块的元数据 继续处理所必需的将被恢复。 重新生成不完整的作业 - 对于处理的批次 由于失败而没有完成,RDDs和相应的 使用恢复的块元数据重新生成作业。 读取保存在日志中的块 – 执行这些作业时, 块数据直接从预写日志中读取。这恢复 可靠地保存到日志中的所有必要数据。 重新发送未确认的数据 – 未保存到的缓冲数据 失败时的日志将由源再次发送。作为 接收方尚未确认。

由于所有这些步骤都是在驱动程序中执行的,因此您的批次 0 事件需要很长时间。这应该在第一批发生,然后一切都会正常。

参考here。

【讨论】:

正确,我们知道它只发生在需要通过检查点恢复的批次上(不仅仅是第一个),我们还发现它发生在驱动程序上,但是如何解决这个问题是问题 Checkpointing真的很慢,你试过KryoSerialization吗。还可以考虑使用 Datasets databricks.com/blog/2016/01/04/…,它具有更快的 ser/desr 和 Encoders。 慢是相对的,检查点需要将数据序列化到由接收者完成的检查点位置(额外工作)。恢复后重新计算对数据的所有操作,这可能涉及网络流量,因为现在数据源是您的检查点位置。在 SPARK-JIRA issues.apache.org/jira/browse/… 上将您的号码与此 cmets 匹配 我们不需要在 1.5 spark 的 Kinesis 上启用 WAL,它可以使用 Kinesis 作为源并从中恢复。检查点只包含元数据,编写起来并不昂贵。没有 ser/de ,没有数据写入。只有元数据issues.apache.org/jira/browse/SPARK-9215 @GauravShah 您是否创建了 jira 问题?我也很有趣关注这张票。我有流式传输选项卡,在重新启动流式传输后出现 45,并且在 1 小时后批处理需要越来越长的时间导致 oom。因为当我没有从检查点重新启动时我还没有 oom,所以我想知道引擎盖内有什么附加内容。【参考方案3】:

我之前也遇到过类似的问题,我的应用程序越来越慢。

使用rdd后尝试释放内存,调用rdd.unpersist()https://spark.apache.org/docs/latest/api/java/org/apache/spark/rdd/RDD.html#unpersist(boolean)

spark.streaming.backpressure.enabledtrue

http://spark.apache.org/docs/latest/streaming-programming-guide.html#setting-the-right-batch-interval

http://spark.apache.org/docs/latest/streaming-programming-guide.html#requirements

另外,请检查您的 locality 设置,可能有太多数据移动。

【讨论】:

应用程序仅在恢复中花费时间,而不是在常规处理中。如果我的内存不足,rdd.unpersist 会有所帮助,但事实并非如此。如果我不能像数据传入一样快地使用数据,那么背压很有用,但我实际上可以做到。

以上是关于spark流检查点恢复非常非常慢的主要内容,如果未能解决你的问题,请参考以下文章

Spark Streaming Job 不可恢复

为啥 spark 无法使用 getOrCreate 从检查点恢复

从检查点重新启动后,Spark 流选项卡消失

Spark结构化流检查点大小巨大

Spark Structured Streaming - 此查询不支持从检查点位置恢复

Spark Streaming - 检查点问题