如何在火花结构化流式读取流中倒带 Kafka 偏移
Posted
技术标签:
【中文标题】如何在火花结构化流式读取流中倒带 Kafka 偏移【英文标题】:How to rewind Kafka Offsets in spark structured streaming readstream 【发布时间】:2018-05-11 16:22:37 【问题描述】:我有一个 Spark Structured Streaming 作业,它被配置为从 Kafka 读取数据。请通过代码检查readStream()
的参数以从Kafka读取最新数据。
我了解readStream()
在启动新查询而不是恢复时从第一个偏移量读取。
但我不知道每次在 IntelliJ 中重新开始工作时如何启动新查询。
val kafkaStreamingDF = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", AppProperties.getProp(AppConstants.PROPS_SERVICES_KAFKA_SERVERS))
.option("subscribe", AppProperties.getProp(AppConstants.PROPS_SDV_KAFKA_TOPICS))
.option("failOnDataLoss", "false")
.option("startingOffsets","earliest")
.load()
.selectExpr("CAST(value as STRING)", "CAST(topic as STRING)")
我也试过用""""topicA":"0":0,"1":0"""
设置偏移量
下面是我的写流
val query = kafkaStreamingDF
.writeStream
.format("console")
.start()
每次我在 IntelliJ IDE 中重新启动作业时,日志都会显示偏移量已设置为最新,而不是 0 或最早。
有没有办法清理我的检查点,在这种情况下我不知道检查点目录在哪里,因为在上述情况下我没有指定任何检查点。
【问题讨论】:
当我按原样运行此代码并在 IntelliJ 中重新启动它时,查询总是从头开始读取。代码没有错。 【参考方案1】:Kafka 依赖属性auto.offset.reset
来处理偏移管理。
默认为“latest”,这意味着如果缺少有效的偏移量,消费者将从最新的记录(消费者开始运行后写入的记录)开始读取。另一种选择是“earliest”,这意味着如果缺少有效的偏移量,消费者将从头开始读取分区中的所有数据。
根据您的问题,您想阅读该主题的全部数据。因此将“startingOffsets
”设置为“earliest
”应该可以。但是,还要确保将 enable.auto.commit
设置为 false。
通过将enable.auto.commit
设置为true
意味着偏移量会以配置auto.commit.interval.ms
控制的频率自动提交。
将此设置为 true 会在从 Kafka 读取消息时自动将偏移量提交给 Kafka,这并不一定意味着 Spark 已完成处理这些消息。要启用对提交偏移量的精确控制,请将 Kafka 参数 enable.auto.commit
设置为 false
。
【讨论】:
enable.auto.commit 在 Spark Kafka 结构化流式处理中不可用【参考方案2】:尝试设置.option("kafka.client.id", "XX")
,以使用不同的client.id
。
【讨论】:
以上是关于如何在火花结构化流式读取流中倒带 Kafka 偏移的主要内容,如果未能解决你的问题,请参考以下文章
在火花结构化流中反序列化 kafka avro 主题的 int 编码无效
如何从 Spark 结构化流中的 Cassandra 等外部存储读取 Kafka 和查询?