Spark 结构化流文件源起始偏移量

Posted

技术标签:

【中文标题】Spark 结构化流文件源起始偏移量【英文标题】:Spark Structured Streaming File Source Starting Offset 【发布时间】:2018-07-18 00:00:12 【问题描述】:

有没有办法为 Spark 结构化文件流源指定起始偏移量?

我正在尝试从 HDFS 流式传输镶木地板:

spark.sql("SET spark.sql.streaming.schemaInference=true")

spark.readStream
  .parquet("/tmp/streaming/")
  .writeStream
  .option("checkpointLocation", "/tmp/streaming-test/checkpoint")
  .format("parquet")
  .option("path", "/tmp/parquet-sink")
  .trigger(Trigger.ProcessingTime(1.minutes))
  .start()

如我所见,第一次运行是处理在路径中检测到的所有可用文件,然后将偏移量保存到检查点位置并仅处理新文件,即接受年龄并且不存在于所见地图中的文件中。

我正在寻找一种方法,如何指定起始偏移量或时间戳或选项数量,以便在第一次运行时不处理所有可用文件。

有没有我正在寻找的方法?

【问题讨论】:

【参考方案1】:

感谢@jayfah,据我所知,我们可以使用以下技巧模拟 Kafka 的“最新”起始偏移:

    使用option("latestFirst", true)option("maxFilesPerTrigger", "1") 运行警告向上流,带有检查点、虚拟接收器和巨大的处理时间。这样,预热流会将最新的文件时间戳保存到检查点。

    使用option("maxFileAge", "0") 运行真实流,使用相同检查点位置的真实接收器。在这种情况下,流将只处理新可用的文件。

这很可能不是生产所必需的,并且有更好的方法,例如重新组织数据路径等,但至少我发现这种方式可以作为我问题的答案。

【讨论】:

我面临着类似的问题......每当我将新文件添加到 readStream 目录时......那个时候旧文件正在被处理......假设我首先添加了文件 1......然后什么都没有得到处理并且没有写入 hdfs ......下次当我添加文件 2 时,文件 1 拾取并反映在 hdfs 中......知道如何解决这个问题吗? @Mikhail,感谢您的解决方案。这个对我有用。如果要开始工作,我的挑战是要处理 4M 文件。这给我的驱动程序记忆带来了压力,我不得不手动创建检查点(没有追求太多)。我创建了两个程序(一个使用 Triggger.Once() 来加载具有历史范围的批处理,另一个用于流式传输)。我正在寻找设置起始偏移量的解决方案,您的解决方案似乎已经奏效。为什么它没有被标记为可接受的解决方案?有没有其他办法?【参考方案2】:

FileStreamSource 没有指定起始偏移量的选项。

但是您可以将latestFirst 的选项设置为true 以确保它首先处理最新的文件(此选项默认为false)

https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#input-sources

 spark.readStream
  .option("latestFirst", true)
  .parquet("/tmp/streaming/")
  .writeStream
  .option("checkpointLocation", "/tmp/streaming-test/checkpoint")
  .format("parquet")
  .option("path", "/tmp/parquet-sink")
  .trigger(Trigger.ProcessingTime(1.minutes))
  .start()

【讨论】:

以上是关于Spark 结构化流文件源起始偏移量的主要内容,如果未能解决你的问题,请参考以下文章

在同一个 Spark 会话中运行多个 Spark Kafka 结构化流查询会增加偏移量但显示 numInputRows 0

从最新的偏移量恢复结构化流

如何获取 Kafka 偏移量以进行结构化查询以进行手动可靠的偏移量管理?

Spark Streaming Kafka 偏移量 Offset 管理

我无法在链接描述文件中指定 Rust 程序的起始偏移量是不是有原因?

从符号名和偏移量自动获取源和行号