Spark DStream 使用转换定期调用 saveAsObjectFile 不能按预期工作

Posted

技术标签:

【中文标题】Spark DStream 使用转换定期调用 saveAsObjectFile 不能按预期工作【英文标题】:Spark DStream periodically call saveAsObjectFile using transform does not work as expected 【发布时间】:2017-08-26 04:39:35 【问题描述】:

我使用DirectKafkaStream API 1 从 Kafka 读取数据,进行一些转换,更新计数,然后将数据写回 Kafka。实际上,这种代码和平正在测试中:

kafkaStream[Key, Value]("test")
      .map(record => (record.key(), 1))
      .updateStateByKey[Int](
        (numbers: Seq[Int], state: Option[Int]) =>
          state match 
            case Some(s) => Some(s + numbers.length)
            case _ => Some(numbers.length)
          
      )
      .checkpoint(this)("count") 
        case (save: (Key, Int), current: (Key, Int)) =>
          (save._1, save._2 + current._2)
      
      .map(_._2)
      .reduce(_ + _)
      .map(count => (new Key, new Result[Long](count.toLong)))
      .toKafka(Key.Serializer.getClass.getName, Result.longKafkaSerializer.getClass.getName)

checkpoint 运算符是对我创建的DStream API 的扩充,它实际上应该使用saveAsObjectFile 将给定DStream 的一个Time 的一个RDD 保存到HDFS 中。实际上,它将每 60 个微批次 (RDD) 的结果保存到 HDFS 中。

检查点执行以下操作:

def checkpoint(processor: Streaming)(name: String)(
mergeStates: (T, T) => T): DStream[T] = 
val path = processor.configuration.get[String](
  "processing.spark.streaming.checkpoint-directory-prefix") + "/" +
  Reflection.canonical(processor.getClass) + "/" + name + "/"
logInfo(s"Checkpoint base path is [$path].")

processor.registerOperator(name)

if (processor.fromCheckpoint && processor.restorationPoint.isDefined) 
  val restorePath = path + processor.restorationPoint.get.ID.stringify
  logInfo(s"Restoring from path [$restorePath].")
  checkpointData = context.objectFile[T](restorePath).cache()

  stream
    .transform((rdd: RDD[T], time: Time) => 
      val merged = rdd
        .union(checkpointData)
        .map[(Boolean, T)](record => (true, record))
        .reduceByKey(mergeStates)
        .map[T](_._2)

      processor.maybeCheckpoint(name, merged, time)

      merged
    
  )
 else 
  stream
    .transform((rdd: RDD[T], time: Time) => 
      processor.maybeCheckpoint(name, rdd, time)

      rdd
    )


有效的代码如下:

dstream.transform((rdd: RDD[T], time: Time) => 
      processor.maybeCheckpoint(name, rdd, time)

      rdd
    )

上面代码中的dstream变量是前一个运算符的结果,即updateStateByKey,所以在updateStateByKey之后调用了一个变换。

def maybeCheckpoint(name: String, rdd: RDD[_], time: Time) = 
  if (doCheckpoint(time)) 
    logInfo(s"Checkpointing for operator [$name] with RDD ID of [$rdd.id].")
    val newPath = configuration.get[String](
    "processing.spark.streaming.checkpoint-directory-prefix") + "/" +
    Reflection.canonical(this.getClass) + "/" + name + "/" + checkpointBarcode
    logInfo(s"Saving new checkpoint to [$newPath].")
    rdd.saveAsObjectFile(newPath)
    registerCheckpoint(name, Operator(name), time)
    logInfo(s"Checkpoint completed for operator [$name].")
  

如您所见,大部分代码只是记账,但有效地调用了 saveAsObjectFile

问题是即使来自updateStateByKey 的结果RDD 应该自动持久化,当saveAsObjectFile 在每Xth 个微批处理上调用时,Spark 将从头开始重新计算所有内容,从流式作业开始,从再次阅读 Kafka 的所有内容开始。我尝试在 DStreams 和 RDDs 上放置并强制使用不同级别的存储 cachepersist

微批次:

作业 22 的 DAG:

运行 saveAsObjectFile 的作业的 DAG:

可能是什么问题?

谢谢!

1 使用 Spark 2.1.0。

【问题讨论】:

并不是saveAsObjectFile是这里的坏小子,一个简单的count也是一样的, 通过调用saveAsObjectFile 告诉spark 执行一个动作,该动作执行在初始流上定义的所有转换(记住:转换是惰性的)。当您之后执行其他操作(例如reduce)时,将再次执行相同的转换。为防止发生这种情况,您可以在第一个操作之前调用dstream.cache()(即checkpoint(this)(...) @AdiGerber 我当然试过了。就在checkpoint 充实之前,缓存甚至尝试了持久化磁盘存储,以确保仅内存存储级别不会强制RDD 被驱逐。无论如何,我只会“重用”最后一个缓存的 RDD,它的大小只有 4KB - 用少量数据进行测试 - 300.000 个键值记录,基数为 10。updateStateByKey 实际上存储了 10 个键。 请您发布checkpoint 函数签名,最好是正文? @ImDarrenG 谢谢你的提问,我已经添加了。 【参考方案1】:

我相信使用transform 定期检查点会导致意外的缓存行为。

使用foreachRDD 执行定期检查点将允许 DAG 保持足够稳定以有效缓存 RDD。

我几乎可以肯定,这是我们不久前遇到的类似问题的解决方案。

【讨论】:

Checkpointing 会切割 linage 图(当您在 DStream 上调用它时),但在这里仍然无法帮助我。问题不在于行数图很宽。 updateStateByKey 创建了一个StateDStream ,当你查看它时,你会发现它默认被持久化到内存中,实际上每个RDD都被持久化到内存中。实际上,我在那里所做的是我在这样的 RDD 上调用 count,它应该通过缓存的 RDD ID 从 BlockManager 中提取,而不是重新计算它。我会试试你的解决方案,以防万一,但我认为这对我没有帮助。 请做,让我们知道,消除低垂的果实是个好主意! :) 话虽如此,我认为你是对的。烦人的是,我确定我以前见过一个非常相似的问题,但不记得我们是如何解决的。 我已经设置了ssc.checkpoint,因为当使用updateStateByKey 或类似的“flatMap with state”运算符时,Spark 会强制设置checkpoint 目录。如果要强制在没有ssc.checkpoint 的情况下启动作业生成,则必须为updateStateByKey 显式关闭检查点。无论如何,我已经在updateStateByKey 之后设置了手动 Spark checkpointlocalCheckpoint。没有按预期使用。 很高兴我们发现了这个问题。已相应地编辑了答案。

以上是关于Spark DStream 使用转换定期调用 saveAsObjectFile 不能按预期工作的主要内容,如果未能解决你的问题,请参考以下文章

Spark DStream 转换

如何在 Spark-Streaming 的 DStream 中使用“for”循环进行转换和输出?

将带有joda.DateTime的案例类的DStream转换为Spark DataFrame

Spark Streaming的核心DStream之转换操作实例

Spark Streaming的核心DStream之转换操作实例

070 DStream中的transform和foreachRDD函数