从最新的偏移量恢复结构化流

Posted

技术标签:

【中文标题】从最新的偏移量恢复结构化流【英文标题】:Resuming Structured Streaming from latest offsets 【发布时间】:2020-06-23 13:16:52 【问题描述】:

我想创建 Spark Structured Streaming 作业,从 Kafka 源读取消息,写入 Kafka 接收器,失败后将恢复仅读取当前最新消息。因此,我不需要为我的工作设置检查站。

但在结构化流中写入 Kafka 接收器时,似乎没有禁用检查点的选项。据我了解,即使我在来源上指定:

.option("startingOffsets", "latest")

只有在流第一次运行时才会考虑,失败后流将从检查点恢复。有一些解决方法吗?有没有办法禁用检查点?

【问题讨论】:

如果您添加检查点位置,您是否遇到任何问题? 我正在添加它 - 尝试跳过它会导致错误。我正在使用数据块,它们在 S3 之上实现 hdfs (dbfs)。有大量与检查点进程相关的流量,产生了不必要的成本。此外,如果我正确理解文档,失败后我的工作将始终从检查点重新开始,而不是从最新/当前消息重新开始,但我还没有测试它。我不希望它从检查点开始。 我明白了。下面的解决方案会起作用吗?我已经给出了 hdfs 的解决方案 .. 可能你可以根据 databricks 文件系统 (dbfs) 进行转换 【参考方案1】:

解决方法是从代码中删除现有的检查点位置,以便每次开始获取最新的偏移数据。

import org.apache.hadoop.fs.FileSystem, Path

val checkPointLocation="/path/in/hdfs/location"
val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration)

fs.delete(new Path(checkPointLocation),true) 
// Delete check point location if exist.

val options = Map(
    "kafka.bootstrap.servers"-> "localhost:9092",
    "topic" -> "topic_name",
    "checkpointLocation" -> checkPointLocation,
    "startingOffsets" -> "latest"
  )
df
  .writeStream
  .format("kafka")
  .outputMode("append")
  .options(options)
  .start()
  .awaitTermination()

【讨论】:

我认为它应该以某种方式工作,在重新启动查询之前清除检查点将确保作业无法从这些检查点恢复,但它会重新创建文件夹并重新启动检查点。所以成本问题依然存在。

以上是关于从最新的偏移量恢复结构化流的主要内容,如果未能解决你的问题,请参考以下文章

Spark 结构化流文件源起始偏移量

如何在 Spark 结构化流中手动设置 group.id 并提交 kafka 偏移量?

在同一个 Spark 会话中运行多个 Spark Kafka 结构化流查询会增加偏移量但显示 numInputRows 0

Spark Streaming Kafka 偏移量 Offset 管理

如何获取 Kafka 偏移量以进行结构化查询以进行手动可靠的偏移量管理?

C 语言结构体 ( 结构体偏移量计算 | 代码示例 )