Spark Structured Streaming 2.3.0 中的水印
Posted
技术标签:
【中文标题】Spark Structured Streaming 2.3.0 中的水印【英文标题】:Watermarking in Spark Structured Streaming 2.3.0 【发布时间】:2019-06-14 05:53:57 【问题描述】:我在 Spark Structured Streaming 2.3.0 中从 Kafka 读取数据。数据包含一些教师的信息,有teacherId、teacherName 和teacherGroupsIds。 TeacherGroupsIds 是一个包含组 ID 的数组列。在我的任务中,我必须将具有组 ID 的列映射到包含组名称信息的列([1,2,3] => [Suns,Books,Flowers])。名称和 ID 存储在 HBase 中,并且可以每天更改。稍后我必须将数据发送到另一个 Kafka 主题。
所以,我从两个来源读取数据 - Kafka 和 HBase。我使用 shc 库从 HBase 读取数据。
首先,我分解数组列(组 ID),然后加入来自 HBase 的数据。
在下一步中,我想使用teacherId 聚合数据。但是我使用的附加模式不支持此操作。
我尝试过添加水印,但目前不起作用。我添加了一个带有时间戳的新列,我将按此列分组。
Dataset<Row> inputDataset = //reading from Kafka
Dataset<Row> explodedDataset = // explode function applied and join with HBase
Dataset<Row> outputDataset = explodedDataset
.withColumn("eventTime", lit(current_timestamp()))
.withWatermark("eventTime", "2 minutes")
.groupBy(window(col("eventTime"), "5 seconds"), col("teacherId"))
.agg(collect_list(col("groupname")));
实际结果在输出中显示空数据框。没有任何行。
【问题讨论】:
我使用“追加模式”将数据写入另一个 Kafka 主题。 【参考方案1】:问题是current_timestamp()
。
current_timestamp 返回该时刻的时间戳,因此,如果您使用此列创建数据框并打印结果,则打印当前时间戳,但如果您处理 df 并打印同一列,则打印新时间戳.
此解决方案在本地工作,但有时在分布式系统中会失败,因为工作人员在收到打印数据的命令时,此数据已超出时间戳范围。
【讨论】:
以上是关于Spark Structured Streaming 2.3.0 中的水印的主要内容,如果未能解决你的问题,请参考以下文章
Spark Structured Streaming - 1
删除由 spark-structured-streaming 写入的损坏的 parquet 文件时,我会丢失数据吗?