如何根据时间戳获取Kafka消息
Posted
技术标签:
【中文标题】如何根据时间戳获取Kafka消息【英文标题】:How to get Kafka messages based on timestamp 【发布时间】:2020-11-18 06:12:23 【问题描述】:我正在开发一个使用 kafka 的应用程序,而技术是 scala。我的kafka消费者代码如下:
val props = new Properties()
props.put("group.id", "test")
props.put("bootstrap.servers", "localhost:9092")
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("auto.offset.reset", "earliest")
props.put("group.id", "consumer-group")
val consumer: KafkaConsumer[String, String] = new KafkaConsumer[String, String](props)
consumer.subscribe(util.Collections.singletonList(topic))
val record = consumer.poll(Duration.ofMillis(500)).asScala.toList
它给了我所有的记录,但问题是我已经在 kafka 消费者中有数据,这可能导致重复数据意味着具有相同键的数据已经存在于主题中。有什么方法可以让我从特定时间检索数据。如果我可以计算当前时间并仅检索那些在该时间之后出现的记录,则意味着在轮询之前。我有什么办法可以做到这一点?
【问题讨论】:
您是否在寻找latest 选项-props.put("auto.offset.reset", "latest")
?
no latest 对我没有用,因为我已经有主题中的数据
我已经在 kafka 消费者中有数据你能详细说明一下这个问题吗?一旦consumer group
中的consumer
阅读了消息Kafka
将commit
消息并且应该阅读only once by consumer
。你是怎么得到重复的?
我可能有重复的键。我的制作人每次都会使用相同的密钥发送数据。我正在使用已经有数据的主题中的数据。我只是订阅它并阅读数据
@PrathapReddy 有有效的用例用于多次读取/处理
【参考方案1】:
使用任何给定时间戳的唯一方法是
-
查找
offsetsForTimes
seek
和 commitSync
那个结果
开始投票
但是,你需要注意数据流是连续的,以后可能还会有重复的key。
如果您在数据中拥有相同的键,并且您只想查看最新的,那么您最好使用 KTable
【讨论】:
抱歉,不想复制您的答案...只是花了一些时间来实际编写和测试我的代码。 @mike 没关系。我在使用所有 Futures 类之前编写了代码,所以它比你的要混乱得多【参考方案2】:您可以在 KafkaConsumer API 中使用 offsetsForTimes method。
代码
import java.time.Duration
import java.util.Properties
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.common.TopicPartition
import collection.JavaConverters._
object OffsetsForTime extends App
implicit def toJavaOffsetQuery(offsetQuery: Map[TopicPartition, scala.Long]): java.util.Map[TopicPartition, java.lang.Long] =
offsetQuery
.map case (tp, time) => tp -> new java.lang.Long(time)
.asJava
val topic = "myInTopic"
val timestamp: Long = 1595971151000L
val props = new Properties()
props.put("group.id", "group-id1337")
props.put("bootstrap.servers", "localhost:9092")
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("auto.offset.reset", "earliest")
val consumer: KafkaConsumer[String, String] = new KafkaConsumer[String, String](props)
val topicPartition = new TopicPartition(topic, 0)
consumer.assign(java.util.Collections.singletonList(topicPartition))
// dummy poll before calling seek
consumer.poll(Duration.ofMillis(500))
// get next available offset after given timestamp
val (_, offsetAndTimestamp) = consumer.offsetsForTimes(Map(topicPartition -> timestamp)).asScala.head
// seek to offset
consumer.seek(topicPartition, offsetAndTimestamp.offset())
// poll data
val record = consumer.poll(Duration.ofMillis(500)).asScala.toList
for (data <- record)
println(s"Timestamp: $data.timestamp(), Key: $data.key(), Value: $data.value()")
测试
./kafka/current/bin/kafconsole-consumer.sh --bootstrap-server localhost:9092 --topic myInTopic --from-beginning --property print.value=true --property print.timestamp=true
CreateTime:1595971142560 1_old
CreateTime:1595971147697 2_old
CreateTime:1595971150136 3_old
CreateTime:1595971192649 1_new
CreateTime:1595971194489 2_new
CreateTime:1595971196416 3_new
将时间戳选择为3_old
和1_new
之间的时间,以仅使用“新”消息。
输出
Timestamp: 1595971192649, Key: null, Value: 1_new
Timestamp: 1595971194489, Key: null, Value: 2_new
Timestamp: 1595971196416, Key: null, Value: 3_new
【讨论】:
以上是关于如何根据时间戳获取Kafka消息的主要内容,如果未能解决你的问题,请参考以下文章