如何使用 Kafka 控制台消费者在两个时间戳之间消费消息

Posted

技术标签:

【中文标题】如何使用 Kafka 控制台消费者在两个时间戳之间消费消息【英文标题】:How to consume messages between two timestamps using Kafka Console Consumer 【发布时间】:2020-07-08 00:11:46 【问题描述】:

是否可以在 Kafka 控制台消费者中检索特定时间戳范围的消息?

例如昨天 08:00 到 09:00 之间的 kafka 消息。

【问题讨论】:

您如何使用数据 - 哪种客户端库/语言? 我正在使用 java,但老实说,我想在终端中查看结果,而不是更改 java 代码 【参考方案1】:

您可以使用kcat 来消费两个时间戳之间的消息:

kcat -b localhost:9092 -C -t mytopic -o s@1568276612443 -o e@1568276617901

在哪里

s@ 表示以毫秒为单位的起始时间戳 e@ 表示以毫秒为单位的结束时间戳(不包括在内)

【讨论】:

s@1568276612443 ,这是从一天开始的毫秒数吗?? @Hummingbird 这是一个时间戳。您可以使用任何您想要的时间戳。 s@ 表示开始时间戳,e@ 表示结束时间戳。 是的,但是号码1568276612443,这是什么??毫秒或纳秒,如果是,从什么时候开始?一天的开始还是某个特定日子的开始?? @Hummingbird 好的。您要使用的时间戳间隔是多少? 没关系,我找到了我想要的时间戳。反正我想实现命令的服务器没有kafkacat。【参考方案2】:

是的,你可以从Kafka version 0.10.1开始。 使用KafkaConsumer中的函数offsetsForTimes

按时间戳查找给定分区的偏移量。这 每个分区的返回偏移量是其最早的偏移量 时间戳大于或等于给定时间戳 对应分区。这是一个阻塞调用。消费者这样做 不必分配分区。

【讨论】:

嘿 ofek ,我实际上希望在终端中完成,使用 kafka-console-consumer shell 脚本 您可以使用KafkaConsumer api 来检索相关的偏移量,然后在kafka-console-consumer 中使用--offset myoffset,我认为没有其他解决方案而不是在@ 中使用--property print.timestamp=true 987654328@ 并使用grep 过滤每条消息的打印时间,以适应您的范围。如果这对你来说是个好方向,我可以写得更详细。 或者,如果您的经纪人可以使用 PLAINTEXT,您可以使用 Kafka 工具之一通过时间戳获取偏移量:kafka-run-class.sh kafka.tools.GetOffsetShell ... --time <Long: timestamp>。然后,将该偏移量传递给 kafka-console-consumer。 @OfekHod 但我不会还在使用 api 吗??

以上是关于如何使用 Kafka 控制台消费者在两个时间戳之间消费消息的主要内容,如果未能解决你的问题,请参考以下文章

如何在kafka-python和confluent-kafka之间做出选择

Kafka - 使用高级消费者的延迟队列实现

kafka重复消费的原因

如何获取Kafka的消费者详情——从Scala到Java的切换

Kafka 中的 Consumer Id 和 Group Id:是啥让两个消费者相同

aws创建kafka(msk)