Spark Streaming 预写日志在重启后不重播数据

Posted

技术标签:

【中文标题】Spark Streaming 预写日志在重启后不重播数据【英文标题】:Spark Streaming Write Ahead Log not replaying data after restart 【发布时间】:2016-01-21 11:20:25 【问题描述】:

为了有一种简单的方法来测试 Spark Streaming 预写日志,我创建了一个非常简单的自定义输入接收器,它将生成字符串并存储它们:

class InMemoryStringReceiver extends Receiver[String](StorageLevel.MEMORY_AND_DISK_SER) 

  val batchID = System.currentTimeMillis()

  def onStart() 
    new Thread("InMemoryStringReceiver") 
      override def run(): Unit = 
        var i = 0
        while(true) 
          //http://spark.apache.org/docs/latest/streaming-custom-receivers.html
          //To implement a reliable receiver, you have to use store(multiple-records) to store data.
          store(ArrayBuffer(s"$batchID-$i"))
          println(s"Stored => [$batchID-$i)]")
          Thread.sleep(1000L)
          i = i + 1
        
      
    .start()
  

  def onStop() 

然后我创建了一个简单的应用程序,它将使用自定义接收器来流式传输数据并对其进行处理:

object DStreamResilienceTest extends App 

  val conf = new SparkConf().setMaster("local[*]").setAppName("DStreamResilienceTest").set("spark.streaming.receiver.writeAheadLog.enable", "true")
  val ssc = new StreamingContext(conf, Seconds(1))
  ssc.checkpoint("hdfs://myhdfsserver/user/spark/checkpoint/DStreamResilienceTest")
  val customReceiverStream: ReceiverInputDStream[String] = ssc.receiverStream(new InMemoryStringReceiver())
  customReceiverStream.foreachRDD  (rdd: RDD[String]) =>
    println(s"processed => [$rdd.collect().toList]")
    Thread.sleep(2000L)
  
  ssc.start()
  ssc.awaitTermination()


正如您所见,每个接收到的 RDD 的处理都休眠了 2 秒,而字符串每秒存储一次。这会创建一个 backlog 并且新的字符串堆积起来,并且应该存储在 WAL 中。事实上,我可以看到检查点目录中的文件正在更新。运行应用程序我得到这样的输出:

[info] Stored => [1453374654941-0)]
[info] processed => [List(1453374654941-0)]
[info] Stored => [1453374654941-1)]
[info] Stored => [1453374654941-2)]
[info] processed => [List(1453374654941-1)]
[info] Stored => [1453374654941-3)]
[info] Stored => [1453374654941-4)]
[info] processed => [List(1453374654941-2)]
[info] Stored => [1453374654941-5)]
[info] Stored => [1453374654941-6)]
[info] processed => [List(1453374654941-3)]
[info] Stored => [1453374654941-7)]
[info] Stored => [1453374654941-8)]
[info] processed => [List(1453374654941-4)]
[info] Stored => [1453374654941-9)]
[info] Stored => [1453374654941-10)]

正如您所料,存储速度超出了处理速度。所以我杀死了应用程序并重新启动它。这次我把foreachRDD中的sleep注释掉了,这样处理就可以清除任何积压:

[info] Stored => [1453374753946-0)]
[info] processed => [List(1453374753946-0)]
[info] Stored => [1453374753946-1)]
[info] processed => [List(1453374753946-1)]
[info] Stored => [1453374753946-2)]
[info] processed => [List(1453374753946-2)]
[info] Stored => [1453374753946-3)]
[info] processed => [List(1453374753946-3)]
[info] Stored => [1453374753946-4)]
[info] processed => [List(1453374753946-4)]

如您所见,新事件已处理,但前一批没有处理。旧的 WAL 日志被清除,我看到这样的日志消息,但旧数据没有得到处理。

INFO WriteAheadLogManager : Recovered 1 write ahead log files from hdfs://myhdfsserver/user/spark/checkpoint/DStreamResilienceTest/receivedData/0

我做错了什么?我正在使用 Spark 1.5.2。

【问题讨论】:

【参考方案1】:

这是由 Shixiong(Ryan) Zhu 在Spark Users mailing list 上回答的。

按照他的建议使用StreamingContext.getOrCreate

【讨论】:

以上是关于Spark Streaming 预写日志在重启后不重播数据的主要内容,如果未能解决你的问题,请参考以下文章

Spark Streaming 容错改进与零数据丢失

spark streaming 读取kafka两种方式的区别

自动化基于Spark streaming的SQL服务实时自动化运维

如何在 Spark Streaming 中自动重启故障节点?

Spark Streaming 实时计算在甜橙金融监控系统中的应用及优化

Spark 实践——基于 Spark Streaming 的实时日志分析系统