如何获取 Kafka 偏移量以进行结构化查询以进行手动可靠的偏移量管理?

Posted

技术标签:

【中文标题】如何获取 Kafka 偏移量以进行结构化查询以进行手动可靠的偏移量管理?【英文标题】:How to get Kafka offsets for structured query for manual and reliable offset management? 【发布时间】:2018-02-19 13:00:43 【问题描述】:

Spark 2.2 引入了 Kafka 的结构化流式源。据我了解,它依靠 HDFS 检查点目录来存储偏移量并保证“恰好一次”的消息传递。

但旧码头(如https://blog.cloudera.com/blog/2017/06/offset-management-for-apache-kafka-with-apache-spark-streaming/)表示,Spark Streaming 检查点无法跨应用程序或 Spark 升级恢复,因此不太可靠。作为一种解决方案,有一种做法是支持在支持 mysql 或 RedshiftDB 等事务的外部存储中存储偏移量。

如果我想将来自 Kafka 源的偏移量存储到事务数据库中,如何从结构化流批处理中获取偏移量?

以前可以通过将RDD转换为HasOffsetRanges来完成:

val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges    

但是使用新的 Streaming API,我有一个 DatasetInternalRow,我找不到获取偏移量的简单方法。 Sink API 只有 addBatch(batchId: Long, data: DataFrame) 方法,我怎么能想得到给定批次 id 的偏移量?

【问题讨论】:

你最终是如何实现这个的?能否请您粘贴您的伪代码...我需要实现... 【参考方案1】:

带有 Kafka 源的流式数据集将 offset 作为 field 之一。您可以简单地查询查询中的所有偏移量并将它们保存到 JDBC Sink 中

【讨论】:

对于简单的查询,我需要使用按分区、主题和聚合最大偏移量进行分组的额外 Spark 操作? 似乎不应该推荐这种方法,因为关于获得的偏移范围的信息已经存在(它在 KafkaSourceRDD 中),但由于它被映射到 InternalRow,我无法访问它。它只是丢失了,客户端必须浪费集群资源才能将其取回。 是的。从代码方面来看,这很容易,但这是必要的 抱歉,讨论过头了,但是通过 addBatch() 方法传递可选的偏移量会更好吗?看起来有一个带有一些偏移量概念的可重放流源是必需的,所以它应该在公共 API 中。目前,它隐藏在实现细节(内部行和 Kafka 模式)中。这使得实现可靠的自定义接收器变得更加困难。 @dnaumenko 也许不是偏移量,而是源的一般元数据:)【参考方案2】:

相关的 Spark DEV 邮件列表讨论线程是 here。

总结:

Spark Streaming 将支持在未来版本 (> 2.2.0) 中获取偏移量。 JIRA 票要关注 - https://issues-test.apache.org/jira/browse/SPARK-18258

对于 Spark

val checkpointRoot = // read 'checkpointLocation' from custom sink params
val checkpointDir = new Path(new Path(checkpointRoot), "offsets").toUri.toString
val offsetSeqLog = new OffsetSeqLog(sparkSession, checkpointDir)

val endOffset: Map[TopicPartition, Long] = offsetSeqLog.get(batchId).map  endOffset =>
  endOffset.offsets.filter(_.isDefined).map  str =>
    JsonUtilsWrapper.jsonToOffsets(str.get.json)
  



/**
  * Hack to access private API
  * Put this class into org.apache.spark.sql.kafka010 package
  */
object JsonUtilsWrapper 
  def offsetsToJson(partitionOffsets: Map[TopicPartition, Long]): String = 
    JsonUtils.partitionOffsets(partitionOffsets)
  

  def jsonToOffsets(str: String): Map[TopicPartition, Long] = 
    JsonUtils.partitionOffsets(str)
  

这个endOffset 将包含每个主题/分区的直到偏移量。 获取起始偏移量是有问题的,因为您必须阅读“提交”检查点目录。但通常,您并不关心起始偏移量,因为存储结束偏移量足以可靠地重新启动 Spark 作业。

请注意,您还必须将处理后的批次 ID 存储在存储中。在某些情况下,Spark 可以使用相同的批处理 id 重新运行失败的批处理,因此请确保使用最新处理的批处理 id(您应该从外部存储中读取)初始化自定义接收器,并忽略 id

【讨论】:

【参考方案3】:

Spark 2.2 引入了 Kafka 的结构化流式源。据我了解,它依靠 HDFS 检查点目录来存储偏移量并保证“恰好一次”的消息传递。

正确。

每个触发 Spark Structured Streaming 都会将偏移量保存到检查点位置中的offset 目录(使用checkpointLocation 选项或spark.sql.streaming.checkpointLocation Spark 属性定义或随机分配),这应该保证偏移量被处理在最多一次。该功能称为Write Ahead Logs

检查点位置的另一个目录是 commits 目录,用于完成流式批处理,每个批处理一个文件(文件名是批处理 id)。

引用Fault Tolerance Semantics中的官方文档:

为此,我们设计了结构化流式传输源、接收器和执行引擎,以可靠地跟踪处理的确切进度,以便它可以通过重新启动和/或重新处理来处理任何类型的故障。假定每个流式源具有偏移量(类似于 Kafka 偏移量或 Kinesis 序列号)以跟踪流中的读取位置。引擎使用检查点和预写日志来记录每个触发器中正在处理的数据的偏移范围。流式接收器被设计为具有处理再处理的幂等性。结合使用可重放源和幂等接收器,结构化流式处理可以确保在任何故障下端到端的精确一次语义。

每次执行触发器StreamExecution 都会检查目录并“计算”已经处理了哪些偏移量。这为您提供至少一次语义,并且总共恰好一次

但旧文档 (...) 说 Spark Streaming 检查点无法跨应用程序或 Spark 升级恢复,因此不太可靠。

你称他们为“老”是有原因的,不是吗?

他们指的是旧的和(在我看来)死的 Spark Streaming,它不仅保留了偏移量,而且保留了导致检查点几乎不可用的情况的整个查询代码,例如当您更改代码时。

时代已经过去,结构化流式传输更加谨慎,检查点的时间和内容。

如果我想将来自 Kafka 源的偏移量存储到事务数据库中,如何从结构化流批处理中获取偏移量?

一种解决方案可能是实现或以某种方式使用用于处理偏移检查点的MetadataLog 接口。这可以工作。

我怎样才能获得给定批次 id 的偏移量?

目前不可能。

我的理解是,您将能够做到这一点,因为流的语义对您隐藏。您根本不应该处理这种称为偏移量的低级“事物”,Spark Structured Streaming 使用它来提供恰好一次的保证。

引用 Michael Armbrust 在 Spark 峰会上的演讲 Easy, Scalable, Fault Tolerant Stream Processing with Structured Streaming in Apache Spark:

你不应该对流媒体进行推理

和further in the talk (on the next slide):

您应该编写简单的查询,Spark 应该不断更新答案


一种使用StreamingQueryProgress 获取偏移量(来自任何来源,包括Kafka)的方法,您可以使用StreamingQueryListener 和onQueryProgress 回调来拦截。

onQueryProgress(event: QueryProgressEvent): Unit 当有一些状态更新(摄取率更新等)时调用

使用StreamingQueryProgress,您可以使用SourceProgress 访问sources 属性,从而满足您的需求。

【讨论】:

哇,不错的答案 :) 但最后一点应该是第一点 :) 但是,投票是值得的 :) “你不应该。期间。” - 这不是我一直在寻找的答案 :) 如果您遵循 Spark 上的 JIRA 票证,获取偏移量仍然是一个有效的用例。例如,关于不将自己锁定到 Spark 的情况如何?如果我在外部存储中有偏移,我可以将我的 ETL 重新写入 Apache Flink,让它从我的存储中获取最新的偏移(默认情况下这是可靠的,因为所有数据/偏移更新都发生在一个事务中) 看起来 StreamingQueryListener 需要用于监视、调试等其他用途。我看不出如何在自定义接收器中使用它。 介意分享 JIRA 票证的链接吗? StreamingQueryListener 不适用于自定义接收器。这是肯定的。你说的对。我只提到它是为了全面了解偏移量对您隐藏的深度:) @JacekLaskowski 我不这么认为。它对正在发生的事情有一个很好的解释,它帮助我更好地理解,但它在某种程度上说“你不应该这样做”是误导性的。也许最好说,目前很难做到这一点 - 你要么将主题/分区和偏移量从 Source 传递到 Sink,要么从检查点目录中读取批处理 id 的偏移量。

以上是关于如何获取 Kafka 偏移量以进行结构化查询以进行手动可靠的偏移量管理?的主要内容,如果未能解决你的问题,请参考以下文章

如何计算偏移量以绘制到画布中应用了比例变换的最近像素?

如何在火花结构化流式读取流中倒带 Kafka 偏移

在同一个 Spark 会话中运行多个 Spark Kafka 结构化流查询会增加偏移量但显示 numInputRows 0

如何正确使用 Kafka 消费者“寻找”以返回所有分区的未提交偏移量?

如何从Kafka中的旧偏移点获取数据?

如何获取 kafka 主题分区的最新偏移量?