从最新的偏移量恢复结构化流
Posted
技术标签:
【中文标题】从最新的偏移量恢复结构化流【英文标题】:Resuming Structured Streaming from latest offsets 【发布时间】:2020-06-23 13:16:52 【问题描述】:我想创建 Spark Structured Streaming 作业,从 Kafka 源读取消息,写入 Kafka 接收器,失败后将恢复仅读取当前最新消息。因此,我不需要为我的工作设置检查站。
但在结构化流中写入 Kafka 接收器时,似乎没有禁用检查点的选项。据我了解,即使我在来源上指定:
.option("startingOffsets", "latest")
只有在流第一次运行时才会考虑,失败后流将从检查点恢复。有一些解决方法吗?有没有办法禁用检查点?
【问题讨论】:
如果您添加检查点位置,您是否遇到任何问题? 我正在添加它 - 尝试跳过它会导致错误。我正在使用数据块,它们在 S3 之上实现 hdfs (dbfs)。有大量与检查点进程相关的流量,产生了不必要的成本。此外,如果我正确理解文档,失败后我的工作将始终从检查点重新开始,而不是从最新/当前消息重新开始,但我还没有测试它。我不希望它从检查点开始。 我明白了。下面的解决方案会起作用吗?我已经给出了 hdfs 的解决方案 .. 可能你可以根据 databricks 文件系统 (dbfs) 进行转换 【参考方案1】:解决方法是从代码中删除现有的检查点位置,以便每次开始获取最新的偏移数据。
import org.apache.hadoop.fs.FileSystem, Path
val checkPointLocation="/path/in/hdfs/location"
val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration)
fs.delete(new Path(checkPointLocation),true)
// Delete check point location if exist.
val options = Map(
"kafka.bootstrap.servers"-> "localhost:9092",
"topic" -> "topic_name",
"checkpointLocation" -> checkPointLocation,
"startingOffsets" -> "latest"
)
df
.writeStream
.format("kafka")
.outputMode("append")
.options(options)
.start()
.awaitTermination()
【讨论】:
我认为它应该以某种方式工作,在重新启动查询之前清除检查点将确保作业无法从这些检查点恢复,但它会重新创建文件夹并重新启动检查点。所以成本问题依然存在。以上是关于从最新的偏移量恢复结构化流的主要内容,如果未能解决你的问题,请参考以下文章
如何在 Spark 结构化流中手动设置 group.id 并提交 kafka 偏移量?
在同一个 Spark 会话中运行多个 Spark Kafka 结构化流查询会增加偏移量但显示 numInputRows 0
Spark Streaming Kafka 偏移量 Offset 管理