如何根据时间戳获取Kafka消息

Posted

技术标签:

【中文标题】如何根据时间戳获取Kafka消息【英文标题】:How to get Kafka messages based on timestamp 【发布时间】:2020-11-18 06:12:23 【问题描述】:

我正在开发一个使用 kafka 的应用程序,而技术是 scala。我的kafka消费者代码如下:

val props = new Properties()
        props.put("group.id", "test")
        props.put("bootstrap.servers", "localhost:9092")
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
        props.put("auto.offset.reset", "earliest")
        props.put("group.id", "consumer-group")
    val consumer: KafkaConsumer[String, String] = new KafkaConsumer[String, String](props)
    consumer.subscribe(util.Collections.singletonList(topic))
    val record = consumer.poll(Duration.ofMillis(500)).asScala.toList

它给了我所有的记录,但问题是我已经在 kafka 消费者中有数据,这可能导致重复数据意味着具有相同键的数据已经存在于主题中。有什么方法可以让我从特定时间检索数据。如果我可以计算当前时间并仅检索那些在该时间之后出现的记录,则意味着在轮询之前。我有什么办法可以做到这一点?

【问题讨论】:

您是否在寻找latest 选项-props.put("auto.offset.reset", "latest") no latest 对我没有用,因为我已经有主题中的数据 我已经在 kafka 消费者中有数据你能详细说明一下这个问题吗?一旦consumer group 中的consumer 阅读了消息Kafkacommit 消息并且应该阅读only once by consumer。你是怎么得到重复的? 我可能有重复的键。我的制作人每次都会使用相同的密钥发送数据。我正在使用已经有数据的主题中的数据。我只是订阅它并阅读数据 @PrathapReddy 有有效的用例用于多次读取/处理 【参考方案1】:

使用任何给定时间戳的唯一方法是

    查找offsetsForTimes seekcommitSync 那个结果 开始投票

但是,你需要注意数据流是连续的,以后可能还会有重复的key。


如果您在数据中拥有相同的键,并且您只想查看最新的,那么您最好使用 KTable

【讨论】:

抱歉,不想复制您的答案...只是花了一些时间来实际编写和测试我的代码。 @mike 没关系。我在使用所有 Futures 类之前编写了代码,所以它比你的要混乱得多【参考方案2】:

您可以在 KafkaConsumer API 中使用 offsetsForTimes method。

代码

import java.time.Duration
import java.util.Properties
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.common.TopicPartition
import collection.JavaConverters._

object OffsetsForTime extends App 

  implicit def toJavaOffsetQuery(offsetQuery: Map[TopicPartition, scala.Long]): java.util.Map[TopicPartition, java.lang.Long] =
    offsetQuery
      .map  case (tp, time) => tp -> new java.lang.Long(time) 
      .asJava

  val topic = "myInTopic"
  val timestamp: Long = 1595971151000L

  val props = new Properties()
  props.put("group.id", "group-id1337")
  props.put("bootstrap.servers", "localhost:9092")
  props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
  props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
  props.put("auto.offset.reset", "earliest")
  val consumer: KafkaConsumer[String, String] = new KafkaConsumer[String, String](props)

  val topicPartition = new TopicPartition(topic, 0)
  consumer.assign(java.util.Collections.singletonList(topicPartition))
  // dummy poll before calling seek
  consumer.poll(Duration.ofMillis(500))

  // get next available offset after given timestamp
  val (_, offsetAndTimestamp) = consumer.offsetsForTimes(Map(topicPartition -> timestamp)).asScala.head
  // seek to offset
  consumer.seek(topicPartition, offsetAndTimestamp.offset())

  // poll data
  val record = consumer.poll(Duration.ofMillis(500)).asScala.toList

  for (data <- record) 
    println(s"Timestamp: $data.timestamp(), Key: $data.key(), Value: $data.value()")
  


测试

./kafka/current/bin/kafconsole-consumer.sh --bootstrap-server localhost:9092 --topic myInTopic --from-beginning --property print.value=true --property print.timestamp=true
CreateTime:1595971142560    1_old
CreateTime:1595971147697    2_old
CreateTime:1595971150136    3_old
CreateTime:1595971192649    1_new
CreateTime:1595971194489    2_new
CreateTime:1595971196416    3_new

将时间戳选择为3_old1_new 之间的时间,以仅使用“新”消息。

输出

Timestamp: 1595971192649, Key: null, Value: 1_new
Timestamp: 1595971194489, Key: null, Value: 2_new
Timestamp: 1595971196416, Key: null, Value: 3_new

【讨论】:

以上是关于如何根据时间戳获取Kafka消息的主要内容,如果未能解决你的问题,请参考以下文章

kafka为什么有的消息的时间戳的值是-1

Kafka中的索引机制

如何从 Smack 4.1 中的实时消息中获取服务器时间戳

如何获取带有发件人时间的时间戳消息

如何获取带有时间戳 XMPP iOS 的消息?

如何根据来自firebase的时间戳获取排序数据[重复]