如何在火花结构化流式读取流中倒带 Kafka 偏移

Posted

技术标签:

【中文标题】如何在火花结构化流式读取流中倒带 Kafka 偏移【英文标题】:How to rewind Kafka Offsets in spark structured streaming readstream 【发布时间】:2018-05-11 16:22:37 【问题描述】:

我有一个 Spark Structured Streaming 作业,它被配置为从 Kafka 读取数据。请通过代码检查readStream()的参数以从Kafka读取最新数据。

我了解readStream() 在启动新查询而不是恢复时从第一个偏移量读取。

但我不知道每次在 IntelliJ 中重新开始工作时如何启动新查询。

val kafkaStreamingDF = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", AppProperties.getProp(AppConstants.PROPS_SERVICES_KAFKA_SERVERS))
  .option("subscribe", AppProperties.getProp(AppConstants.PROPS_SDV_KAFKA_TOPICS))
  .option("failOnDataLoss", "false")
  .option("startingOffsets","earliest")
  .load()
  .selectExpr("CAST(value as STRING)", "CAST(topic as STRING)")

我也试过用""""topicA":"0":0,"1":0"""设置偏移量

下面是我的写流

val query = kafkaStreamingDF
  .writeStream
  .format("console")
  .start()

每次我在 IntelliJ IDE 中重新启动作业时,日志都会显示偏移量已设置为最新,而不是 0 或最早。

有没有办法清理我的检查点,在这种情况下我不知道检查点目录在哪里,因为在上述情况下我没有指定任何检查点。

【问题讨论】:

当我按原样运行此代码并在 IntelliJ 中重新启动它时,查询总是从头开始读取。代码没有错。 【参考方案1】:

Kafka 依赖属性auto.offset.reset 来处理偏移管理

默认为“latest”,这意味着如果缺少有效的偏移量,消费者将从最新的记录(消费者开始运行后写入的记录)开始读取。另一种选择是“earliest”,这意味着如果缺少有效的偏移量,消费者将从头开始读取分区中的所有数据。

根据您的问题,您想阅读该主题的全部数据。因此将“startingOffsets”设置为“earliest”应该可以。但是,还要确保将 enable.auto.commit 设置为 false。

通过将enable.auto.commit 设置为true 意味着偏移量会以配置auto.commit.interval.ms 控制的频率自动提交。

将此设置为 true 会在从 Kafka 读取消息时自动将偏移量提交给 Kafka,这并不一定意味着 Spark 已完成处理这些消息。要启用对提交偏移量的精确控制,请将 Kafka 参数 enable.auto.commit 设置为 false

【讨论】:

enable.auto.commit 在 Spark Kafka 结构化流式处理中不可用【参考方案2】:

尝试设置.option("kafka.client.id", "XX"),以使用不同的client.id

【讨论】:

以上是关于如何在火花结构化流式读取流中倒带 Kafka 偏移的主要内容,如果未能解决你的问题,请参考以下文章

在火花结构化流中反序列化 kafka avro 主题的 int 编码无效

如何在火花流中刷新加载的数据帧内容?

从最新的偏移量恢复结构化流

如何从 Spark 结构化流中的 Cassandra 等外部存储读取 Kafka 和查询?

如何获取 Kafka 偏移量以进行结构化查询以进行手动可靠的偏移量管理?

基于kafka分区的结构化流式读取