Spark 结构化流文件源起始偏移量
Posted
技术标签:
【中文标题】Spark 结构化流文件源起始偏移量【英文标题】:Spark Structured Streaming File Source Starting Offset 【发布时间】:2018-07-18 00:00:12 【问题描述】:有没有办法为 Spark 结构化文件流源指定起始偏移量?
我正在尝试从 HDFS 流式传输镶木地板:
spark.sql("SET spark.sql.streaming.schemaInference=true")
spark.readStream
.parquet("/tmp/streaming/")
.writeStream
.option("checkpointLocation", "/tmp/streaming-test/checkpoint")
.format("parquet")
.option("path", "/tmp/parquet-sink")
.trigger(Trigger.ProcessingTime(1.minutes))
.start()
如我所见,第一次运行是处理在路径中检测到的所有可用文件,然后将偏移量保存到检查点位置并仅处理新文件,即接受年龄并且不存在于所见地图中的文件中。
我正在寻找一种方法,如何指定起始偏移量或时间戳或选项数量,以便在第一次运行时不处理所有可用文件。
有没有我正在寻找的方法?
【问题讨论】:
【参考方案1】:感谢@jayfah,据我所知,我们可以使用以下技巧模拟 Kafka 的“最新”起始偏移:
使用option("latestFirst", true)
和option("maxFilesPerTrigger", "1")
运行警告向上流,带有检查点、虚拟接收器和巨大的处理时间。这样,预热流会将最新的文件时间戳保存到检查点。
使用option("maxFileAge", "0")
运行真实流,使用相同检查点位置的真实接收器。在这种情况下,流将只处理新可用的文件。
这很可能不是生产所必需的,并且有更好的方法,例如重新组织数据路径等,但至少我发现这种方式可以作为我问题的答案。
【讨论】:
我面临着类似的问题......每当我将新文件添加到 readStream 目录时......那个时候旧文件正在被处理......假设我首先添加了文件 1......然后什么都没有得到处理并且没有写入 hdfs ......下次当我添加文件 2 时,文件 1 拾取并反映在 hdfs 中......知道如何解决这个问题吗? @Mikhail,感谢您的解决方案。这个对我有用。如果要开始工作,我的挑战是要处理 4M 文件。这给我的驱动程序记忆带来了压力,我不得不手动创建检查点(没有追求太多)。我创建了两个程序(一个使用 Triggger.Once() 来加载具有历史范围的批处理,另一个用于流式传输)。我正在寻找设置起始偏移量的解决方案,您的解决方案似乎已经奏效。为什么它没有被标记为可接受的解决方案?有没有其他办法?【参考方案2】:FileStreamSource
没有指定起始偏移量的选项。
但是您可以将latestFirst
的选项设置为true
以确保它首先处理最新的文件(此选项默认为false)
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#input-sources
spark.readStream
.option("latestFirst", true)
.parquet("/tmp/streaming/")
.writeStream
.option("checkpointLocation", "/tmp/streaming-test/checkpoint")
.format("parquet")
.option("path", "/tmp/parquet-sink")
.trigger(Trigger.ProcessingTime(1.minutes))
.start()
【讨论】:
以上是关于Spark 结构化流文件源起始偏移量的主要内容,如果未能解决你的问题,请参考以下文章
在同一个 Spark 会话中运行多个 Spark Kafka 结构化流查询会增加偏移量但显示 numInputRows 0
如何获取 Kafka 偏移量以进行结构化查询以进行手动可靠的偏移量管理?
Spark Streaming Kafka 偏移量 Offset 管理