如何从Kafka中的旧偏移点获取数据?
Posted
技术标签:
【中文标题】如何从Kafka中的旧偏移点获取数据?【英文标题】:How to get data from old offset point in Kafka? 【发布时间】:2013-02-02 20:25:58 【问题描述】:我正在使用 zookeeper 从 kafka 获取数据。在这里,我总是从最后一个偏移点获取数据。有什么方法可以指定偏移时间获取旧数据吗?
有一个选项 autooffset.reset。它接受最小或最大。有人可以解释什么是最小和最大的。 autooffset.reset 可以帮助从旧偏移点而不是最新偏移点获取数据吗?
【问题讨论】:
【参考方案1】:请参阅有关 kafka 配置的文档:http://kafka.apache.org/08/configuration.html 以查询偏移参数的最小值和最大值。
顺便说一句,在探索 kafka 时,我想知道如何为消费者重播所有消息。我的意思是,如果一个消费者组已经轮询了所有消息并且它想要重新获取这些消息。
可以实现的方式是从zookeeper中删除数据。使用 kafka.utils.ZkUtils 类删除 zookeeper 上的节点。下面是它的用法:
ZkUtils.maybeDeletePath($zkhost:zkport", "/consumers/$group.id");
【讨论】:
【参考方案2】:消费者始终属于一个组,对于每个分区,Zookeeper 都会跟踪分区中该消费者组的进度。
要从头开始获取,您可以删除所有与 Hussain 提到的进度相关的数据
ZkUtils.maybeDeletePath($zkhost:zkport", "/consumers/$group.id");
你也可以指定你想要的分区偏移量,在core/src/main/scala/kafka/tools/UpdateOffsetsInZK.scala中指定
ZkUtils.updatePersistentPath(zkClient, topicDirs.consumerOffsetDir + "/" + partition, offset.toString)
然而偏移量不是时间索引的,但你知道每个分区都是一个序列。
如果您的消息包含时间戳(请注意,此时间戳与 Kafka 收到您的消息的那一刻无关),您可以尝试做一个索引器,尝试通过将偏移量增加 N 来逐步检索一个条目,并将元组(主题 X,第 2 部分,偏移量 100,时间戳)存储在某处。
当您想要检索指定时刻的条目时,您可以对粗略索引应用二进制搜索,直到找到您想要的条目并从那里获取。
【讨论】:
【参考方案3】:他们说来自卡夫卡documentation “kafka.api.OffsetRequest.EarliestTime() 在日志中找到数据的开头并从那里开始流式传输,kafka.api.OffsetRequest.LatestTime() 只会流式传输新消息。不要假设偏移量0是开始偏移量,因为消息会随着时间的推移而超出日志。"
在此处使用 SimpleConsumerExample:https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example
类似问题:Kafka High Level Consumer Fetch All Messages From Topic Using Java API (Equivalent to --from-beginning)
这可能会有所帮助
【讨论】:
他们也有代码示例供参考。值得一看 Hild 所指的例子是:cwiki.apache.org/confluence/display/KAFKA/… 你不能使用'Consumer'例子,你必须使用'SimpleConsumerDemo'例子来玩偏移量。【参考方案4】:Kafka 协议文档是处理请求/响应/偏移量/消息的绝佳来源: https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol 您使用简单消费者示例作为以下代码演示状态的地方:
FetchRequest req = new FetchRequestBuilder()
.clientId(clientName)
.addFetch(a_topic, a_partition, readOffset, 100000)
.build();
FetchResponse fetchResponse = simpleConsumer.fetch(req);
设置 readOffset 以开始初始偏移量。但是您需要检查最大偏移量,上述方法将根据 addFetch 方法的最后一个参数中的 FetchSize 提供有限的偏移量。
【讨论】:
请检查 Kafka 0.9.0.0 版本中提供的新 Api,通过结合简单和高级消费者,他们已经提升了一步。【参考方案5】:暂时
Kafka FAQ 给出了这个问题的答案。
如何使用 OffsetRequest 准确获取某个时间戳的消息偏移量?
Kafka 允许按时间查询消息的偏移量,并且以段粒度进行。 timestamp 参数是 unix 时间戳,按时间戳查询偏移量会返回不晚于给定时间戳附加的消息的最新可能偏移量。时间戳有 2 个特殊值 - 最新和最早。对于 unix 时间戳的任何其他值,Kafka 将获取不晚于给定时间戳创建的日志段的起始偏移量。因此,由于仅在段粒度上提供偏移量请求,因此对于较大的段大小,偏移量获取请求返回的结果不太准确。
为了获得更准确的结果,您可以根据时间 (log.roll.ms) 而不是大小 (log.segment.bytes) 来配置日志段大小。但是应该小心,因为这样做可能会由于频繁的日志段滚动而增加文件处理程序的数量。
未来计划
Kafka 将为消息格式添加时间戳。参考
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Enriched+Message+Metadata
【讨论】:
【参考方案6】:使用 KafkaConsumer,您可以使用 Seek、SeekToBeginning 和 SeekToEnd 在流中移动。
https://kafka.apache.org/0100/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#seekToBeginning(java.util.Collection)
另外,如果没有提供分区,它将寻找所有当前分配的分区的第一个偏移量。
【讨论】:
【参考方案7】:你试过吗?
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
它将打印给定主题的所有消息,在本例中为“test”。
更多详情来自此链接https://kafka.apache.org/quickstart
【讨论】:
以上是关于如何从Kafka中的旧偏移点获取数据?的主要内容,如果未能解决你的问题,请参考以下文章
如何在从 Spark 消费 Kafka 时获取偏移 id,将其保存在 Cassandra 中并使用它来重新启动 Kafka?