在 Spark Streaming 中,如何检测空批次?

Posted

技术标签:

【中文标题】在 Spark Streaming 中,如何检测空批次?【英文标题】:In Spark Streaming, how to detect for an empty batch? 【发布时间】:2015-05-21 23:28:49 【问题描述】:

让我们以有状态的流式字数统计为例:https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java。是否可以仅在将新单词添加到流中时才打印单词计数RDD?

【问题讨论】:

【参考方案1】:

这就是我的做法。创建一个空 RDD,它是您的 previousWindow。然后在 forEachRDD 中,计算最后一个窗口和当前窗口之间的差异。如果当前窗口包含前一个窗口中没有的记录,则批次中有新内容。最后,将前一个窗口设置为当前窗口中的内容。

  ...

  var previousWindowRdd = sc.emptyRDD[String]

  dStream.foreachRDD 
    windowRdd => 
      if (!windowRdd.isEmpty) processWindow(windowRdd.cache())
    
  

  ...

def processWindow(windowRdd: RDD[String]) = 
  val newInBatch = windowRdd.subtract(previousWindowRdd)

  if (!newInBatch.isEmpty())
    processNewBatch(windowRdd)

  previousWindowRdd = windowRdd

【讨论】:

【参考方案2】:

这就是我避免空批次并在同一目录中覆盖的方法。

import java.time.format.DateTimeFormatter
import java.time.LocalDateTime

   messageRecBased.foreachRDD rdd =>
        rdd.repartition(1)
        val eachRdd = rdd.map(record => record.value)
        if(!eachRdd.isEmpty)
          eachRdd.saveAsTextFile("hdfs/location/"+DateTimeFormatter.ofPattern("yyyyMMddHHmmss").format(LocalDateTime.now)+"/")
      

【讨论】:

以上是关于在 Spark Streaming 中,如何检测空批次?的主要内容,如果未能解决你的问题,请参考以下文章

spark版本定制十八:Spark Streaming中空RDD处理及流处理程序优雅的停止

Spark Streaming中空RDD处理及流处理程序优雅的停止

Spark学习笔记——Spark Streaming

如何在 Spark-Streaming 的 DStream 中使用“for”循环进行转换和输出?

如何在 Spark Streaming 中自动重启故障节点?

如何使用 Python 在 Spark Structured Streaming 中查看特定指标