在 Spark Streaming 中,如何检测空批次?
Posted
技术标签:
【中文标题】在 Spark Streaming 中,如何检测空批次?【英文标题】:In Spark Streaming, how to detect for an empty batch? 【发布时间】:2015-05-21 23:28:49 【问题描述】:让我们以有状态的流式字数统计为例:https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java。是否可以仅在将新单词添加到流中时才打印单词计数RDD?
【问题讨论】:
【参考方案1】:这就是我的做法。创建一个空 RDD,它是您的 previousWindow。然后在 forEachRDD 中,计算最后一个窗口和当前窗口之间的差异。如果当前窗口包含前一个窗口中没有的记录,则批次中有新内容。最后,将前一个窗口设置为当前窗口中的内容。
...
var previousWindowRdd = sc.emptyRDD[String]
dStream.foreachRDD
windowRdd =>
if (!windowRdd.isEmpty) processWindow(windowRdd.cache())
...
def processWindow(windowRdd: RDD[String]) =
val newInBatch = windowRdd.subtract(previousWindowRdd)
if (!newInBatch.isEmpty())
processNewBatch(windowRdd)
previousWindowRdd = windowRdd
【讨论】:
【参考方案2】:这就是我避免空批次并在同一目录中覆盖的方法。
import java.time.format.DateTimeFormatter
import java.time.LocalDateTime
messageRecBased.foreachRDD rdd =>
rdd.repartition(1)
val eachRdd = rdd.map(record => record.value)
if(!eachRdd.isEmpty)
eachRdd.saveAsTextFile("hdfs/location/"+DateTimeFormatter.ofPattern("yyyyMMddHHmmss").format(LocalDateTime.now)+"/")
【讨论】:
以上是关于在 Spark Streaming 中,如何检测空批次?的主要内容,如果未能解决你的问题,请参考以下文章
spark版本定制十八:Spark Streaming中空RDD处理及流处理程序优雅的停止
Spark Streaming中空RDD处理及流处理程序优雅的停止
如何在 Spark-Streaming 的 DStream 中使用“for”循环进行转换和输出?