如何将 Spark 消耗的最新偏移量保存到 ZK 或 Kafka 并在重启后可以读取
Posted
技术标签:
【中文标题】如何将 Spark 消耗的最新偏移量保存到 ZK 或 Kafka 并在重启后可以读取【英文标题】:How to save latest offset that Spark consumed to ZK or Kafka and can read back after restart 【发布时间】:2015-10-29 01:32:23 【问题描述】:我使用 Kafka 0.8.2
从 AdExchange 接收数据,然后使用 Spark Streaming 1.4.1
将数据存储到 MongoDB
。
我的问题是当我重新启动我的Spark Streaming
工作时,例如更新新版本、修复错误、添加新功能。它将继续读取当时最新的offset
和kafka
,然后在重新启动作业期间我将丢失 AdX 推送到 kafka 的数据。
我尝试auto.offset.reset -> smallest
之类的东西,但它会从 0 接收 -> 最后数据很大并且在 db 中重复。
我也尝试将特定的group.id
和consumer.id
设置为Spark
,但它是一样的。
如何将消耗的最新offset
spark 保存到zookeeper
或kafka
,然后可以从中读取到最新的offset
?
【问题讨论】:
【参考方案1】:createDirectStream 函数的构造函数之一可以获得一个映射,该映射将保存分区 id 作为键和您开始消费的偏移量作为值。
看看这里的api:http://spark.apache.org/docs/2.2.0/api/java/org/apache/spark/streaming/kafka/KafkaUtils.html 我说的地图通常叫做:fromOffsets
您可以在地图中插入数据:
startOffsetsMap.put(TopicAndPartition(topicName,partitionId), startOffset)
并在创建直接流时使用它:
KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](
streamingContext, kafkaParams, startOffsetsMap, messageHandler(_))
每次迭代后,您可以使用以下方法获得处理后的偏移量:
rdd.asInstanceOf[HasOffsetRanges].offsetRanges
您将能够使用此数据在下一次迭代中构建 fromOffsets 映射。
您可以在此处查看完整的代码和用法:页面末尾的https://spark.apache.org/docs/latest/streaming-kafka-integration.html
【讨论】:
但是如何将使用的最新偏移量保存到 ZK 或 Kafka。我尝试启用kafkaParams ++= Map[String, String]("auto.commit.interval.ms" -> "1000") kafkaParams ++= Map[String, String]("zookeeper.sync.time.ms" -> "200") kafkaParams ++= Map[String, String]("zookeeper.session.timeout.ms" -> "400")
,但它不起作用
其中一个选项是我告诉您使用 .offsetRanges 数据结构。在给定迭代中处理完流之后,您可以执行以下操作:dStream.foreachRDD rdd => val x = rdd.asInstanceOf[HasOffsetRanges].offsetRanges; // Do something with X (save it external FS for example)
x 将保存 RDD 的每个主题-分区组合的最后处理偏移量。如果您需要仅具有一次语义,则必须手动支持它,但这是可能的。
我不想保存在外部存储中,因为 ZK 和 Kafka 可以处理这个问题。
我相信他们做不到。 Spark 1.3.1 将其关于如何使用 Kafka 作为数据源的方法从 Write Ahead Logs 更改为直接流。直接流使用 Kafka SimpleConsumer 从 Kafka 获取消息。您可以在此处阅读:cwiki.apache.org/confluence/display/KAFKA/… 使用 SimpleConsumer 的不利方面之一是您必须跟踪自己已经使用的偏移量。只要 Spark 流使用简单的消费者,你就不会从 Kafka / ZK 的角度找到解决方案。但 Spark 可能会在 Kafka 之上添加自己的处理方式。
任何可靠的存储都应该能够胜任。我通常将数据保存到 HDFS,因为我认为这是最简单的解决方案。我想不出 Redis 不能同时完成这项工作的原因。【参考方案2】:
我还没有 100% 弄清楚这一点,但您最好的选择可能是设置 JavaStreamingContext.checkpoint()。
有关示例,请参阅 https://spark.apache.org/docs/1.3.0/streaming-programming-guide.html#checkpointing。
根据一些博客条目 https://github.com/koeninger/kafka-exactly-once/blob/master/blogpost.md 有一些警告,但几乎感觉它涉及某些边缘案例,这些案例只是被提及而没有实际解释。
【讨论】:
检查点是正确的方法,以防您不对您的 StreamingContext 进行任何更改,因为那时您将能够自动从正确的偏移量继续处理(Spark 会注意这一点)。如果您想添加功能/纠正错误(显然 giaosudau 想要这样做),您将经常更改流上下文,因此无法使用检查点目录。您提供的最后一个链接完美地解释了它。 @MichaelKopaniov 是否有任何方法可以校验上下文函数并在函数发生更改时使先前的上下文无效?在这种情况下,它将退回到从存储(fs,数据库)读取偏移量 @Stephane 自从我处理了这个问题后几天过去了,所以我可能弄错了,但据我记得在旧的 Spark 流式传输( @Stephane 但是你可以做的是有一些可配置的参数来指示你是想使用检查点目录中的流上下文还是你想创建一个自己的新上下文。如果此参数指定您要创建一个新的上下文,那么您将创建 if from (fs, database) 并在将数据检查点到检查点目录时覆盖之前的上下文。 不一样 - 来自文档:“如果启用 Spark 检查点,偏移量将存储在检查点中。这很容易启用,但也有缺点。你的输出操作必须是幂等的,因为您将获得重复的输出;事务不是一种选择。此外,如果您的应用程序代码已更改,您将无法从检查点恢复。对于计划的升级,您可以通过在运行旧代码的同时运行新代码来缓解这种情况(因为无论如何输出都需要是幂等的,它们不应该发生冲突)。但是对于需要更改代码的计划外故障,您将丢失数据..”【参考方案3】:添加到 Michael Kopaniov 的回答中,如果您真的想使用 ZK 作为您存储和加载偏移地图的位置,您可以。
但是,由于您的结果没有输出到 ZK,除非您的输出操作是幂等的(听起来不是),否则您将无法获得可靠的语义。
如果可以将您的结果与单个原子操作中的偏移量一起存储在 mongo 中的同一文档中,那可能对您更好。
更多详情请见https://www.youtube.com/watch?v=fXnNEq1v3VA
【讨论】:
【参考方案4】:这里有一些代码可用于在 ZK http://geeks.aretotally.in/spark-streaming-kafka-direct-api-store-offsets-in-zk/ 中存储偏移量
下面是一些代码,您可以在调用 KafkaUtils.createDirectStream 时使用偏移量: http://geeks.aretotally.in/spark-streaming-direct-api-reusing-offset-from-zookeeper/
【讨论】:
这两个链接现在都已损坏,这就是为什么社区总是建议将解决方案作为答案的一部分与链接一起发布,而不仅仅是链接。以上是关于如何将 Spark 消耗的最新偏移量保存到 ZK 或 Kafka 并在重启后可以读取的主要内容,如果未能解决你的问题,请参考以下文章
Spark Streaming Kafka 偏移量 Offset 管理
jdbc PostgreSQL 中的偏移时间(带时区的时间)