如何在 Spark 结构化流中保存通过水印丢弃的记录

Posted

技术标签:

【中文标题】如何在 Spark 结构化流中保存通过水印丢弃的记录【英文标题】:How to save the records that are dropped by watermarking in spark structured streaming 【发布时间】:2020-02-26 16:42:12 【问题描述】:

水印可以自动删除 Apache Spark 结构化流中的旧状态数据。 在structured-streaming-programming-guide.md 中,字数统计示例演示了水印如何轻松删除到达系统较晚的记录或事件。 (https://github.com/apache/spark/blob/master/docs/structured-streaming-programming-guide.md)

words.withWatermark("timestamp", "10 分钟")

有没有办法将通过水印删除或丢弃的记录保存在磁盘或表中?

【问题讨论】:

查看手册说明没有这种可能性。 应该有一种方法或函数将删除的记录存储在磁盘上,以供审查、分析或审计之用。我们不能一味地忽视后期事件。 不确定我是否同意,也许 Flink 有,它有一些扩展。 不也是一个答案。 【参考方案1】:

是的,spark 没有追踪这些记录的功能,但是 flink 可以!

【讨论】:

【参考方案2】:

通过支付一些性能开销,使用查询侦听器 + 广播变量和过滤函数应该是可行的。比如:

class WaterMark extends Serializable 

 var ws: Long = 0L;
 def set(value: Long) : Unit = 
   ws = value
 
def get(): Long =  ws


var currentWs = spark.sparkContext.broadcast[WaterMark](new WaterMark) 
 
 df.filter(row => 
    if(row.get("timestamp") < currentWs.value.ws)
   //this will be filtered by watermark. we can persist it using custom method
   .........................
  
  //Not filtering the row as that would be done by watermarking 
  true
  )   
..............................

class QueryListener (currentWs: Broadcast[WaterMark]) extends StreamingQueryListener 

import java.util.Locale

val format = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")
format.setTimeZone(TimeZone.getTimeZone("UTC"))
...........................
override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = 

//un-persist the broadcast var so that it can be updated with next batch watermark
currentWs.unpersist(true)
currentWs.value.set(format.parse(event.progress.eventTime.get("watermark")).getTime)
  println("Listener: " + currentWs.value.ws)

 
......................

注意:我自己没有尝试过端到端,并且由于失败或代码更改(checkpoint_dir/commit 文件夹救援?? )

【讨论】:

以上是关于如何在 Spark 结构化流中保存通过水印丢弃的记录的主要内容,如果未能解决你的问题,请参考以下文章

在 Spark 结构化流中,我如何将完整的聚合输出到外部源,如 REST 服务

spark结构化流如何计算水印

如何在 Spark 结构化流中获取书面记录的数量?

如何在 Spark 结构化流中手动设置 group.id 并提交 kafka 偏移量?

如何从 Spark 结构化流中的 Cassandra 等外部存储读取 Kafka 和查询?

Spark 结构化流中的临时视图