kafka-python KafkaConsumer 多分区提交偏移量

Posted

技术标签:

【中文标题】kafka-python KafkaConsumer 多分区提交偏移量【英文标题】:kafka-python KafkaConsumer multiple partition commit offset 【发布时间】:2018-01-24 18:24:48 【问题描述】:

是否有可能将偏移量提交到具有多个分区的 Kafka 主题,因此可以将偏移量 1 提交到分区 1,将偏移量 2 提交到 p2 等等?

编辑:

是的,有可能:

consumer = KafkaConsumer()
topicpartitions = [TopicPartition('topicname', partitionId) for partitionId in consumer.partitions_for_topic('topicname')]

consumer.assign(topicpartitions)
for tp in topicpartitions:
    consumer.commit(tp: OffsetAndMetadata(1000, None))

for msg in consumer:
    #do whatever

【问题讨论】:

【参考方案1】:

Kafka 偏移量始终是每个分区的。我的意思是,如果您的主题有 2 个分区,则 p0 中的消息将从偏移量 0 开始,并为每条新消息增加我的 1。 p1 相似度中的消息从偏移量 0 开始并增加 1。

因此,如果您发布了两条消息(没有密钥),一条将进入分区 0,偏移量为 0,另一条进入分区 1,偏移量为 0。

现在,如果另一个应用程序正在使用该主题并提交其偏移量,那么它将向 __consumer_offsets 主题生成消息,其中包括其 group.id、主题、分区号和偏移量。例如,类似 "myconsumerid","mytopic",P0,1 和 "myconsumerid","mytopic",P1,1。

如果应用停止并且一个或两个其他使用者从相同的 group.id 开始,他们将从分配给他们的分区的最后提交的偏移量继续。

如果您想将消费者偏移量重新定位到任何其他位置,您可以使用 0.11 Kafka 工具更改组的已提交偏移量

bin/kafka-consumer-groups.sh --reset-offsets

如果你给它正确的标志,这个工具将允许你独立设置每个分区的偏移量。

如果您愿意,可以从 Python 程序中调用此工具。消费者组中的所有现有消费者应首先关闭,否则他们可能会覆盖偏移量。

如果你想编写这个工具的 Python 版本而不是运行现有的 CLI 命令,你需要找到一个支持 seek() 的 Python 客户端,或者你可以将偏移量更改为你想要的,然后提交当消费应用程序重新启动时,它们处于该位置。另一种方法是放弃动态分区分配并手动分配()要更改的分区并将偏移量提交到分配的列表。您不能在同一个应用程序中同时使用动态管理的分区订阅和手动分配的分区。

您还需要确保在这些分区上使用相同使用者组的所有其他使用者都已关闭,否则一旦其他使用者自动提交或手动提交偏移量,已提交的偏移量将被其他使用者覆盖你刚刚设置好了。

【讨论】:

您的回答与问题无关。问题是是否可以在 python 中向 kafka 主题分区提交偏移量。因此,假设您有一个 12 个分区的主题,并且您想向 kafka 提交 12 个 k:v 对的字典,其中 k=partition 和 v=offset 值,因此下次您开始使用偏移量存储在该主题中的消息时你之前手动提交的。例如主题有 10000 条消息平均分布在 10 个分区中。你想从手动强制偏移开始。 您不能用任何语言编写消息并分配您自己的偏移量。偏移量由经纪人生成,并且总是单调递增的。生成消息会自动创建偏移量。消费和调用 commitSync() 是提交消费者偏移量的原因。您是否在问是否可以将消费者现有的已提交偏移重新定位为其他内容,因为这可以通过管理工具实现。 Kafka 将消息存储在主题中。每条消息都有一个偏移量。 KafkaConsumer 可以将偏移量提交回 kafka。 Kafka 也可以存储有关已提交偏移量的信息。您可以强制 KafkaConsumer 从最早或最新的偏移量或特定的偏移量值消费。我的问题是:如何强制 KafkaConsumer 使用来自 kafka 主题的消息,其中多个分区来自为每个分区手动设置的特定偏移量。 我想帮助你,你的问题有点模棱两可,所以如果我的解释有误,请不要生气。您应该询问是否可以使用 Python 为消费者组“重置”已提交的偏移量。您没有说您使用的是 3 个 Kafka Python 客户端中的哪一个。你没有说你使用的是什么版本的 Kafka。 不,我不想重置偏移量。您可以通过更改 group_id 来简单地重置偏移量。我确实说哪个客户端-> 只是阅读问题:它说'kafka-python',这是客户端的名称。 kafka 版本 >0.8.2

以上是关于kafka-python KafkaConsumer 多分区提交偏移量的主要内容,如果未能解决你的问题,请参考以下文章

如何在kafka-python和confluent-kafka之间做出选择

kafka-python 消费者未收到消息

kafka-python消费者读取数据时自定义偏移量,自定义数据读取的顺序

kafka-python消息读写操作kafka,python,Windows

Kafka SASL/PLAIN加密 及Kafka-Python整合

如何使用 kafka-python 订阅多个 kafka 通配符模式的列表?