如何获取 kafka 主题分区的最新偏移量?

Posted

技术标签:

【中文标题】如何获取 kafka 主题分区的最新偏移量?【英文标题】:How to get latest offset for a partition for a kafka topic? 【发布时间】:2016-05-27 17:14:30 【问题描述】:

我正在为 Kafka 使用 Python 高级消费者,并且想知道主题的每个分区的最新偏移量。但是我无法让它工作。

from kafka import TopicPartition
from kafka.consumer import KafkaConsumer

con = KafkaConsumer(bootstrap_servers = brokers)
ps = [TopicPartition(topic, p) for p in con.partitions_for_topic(topic)]

con.assign(ps)
for p in ps:
    print "For partition %s highwater is %s"%(p.partition,con.highwater(p))

print "Subscription = %s"%con.subscription()
print "con.seek_to_beginning() = %s"%con.seek_to_beginning()

但我得到的输出是

For partition 0 highwater is None
For partition 1 highwater is None
For partition 2 highwater is None
For partition 3 highwater is None
For partition 4 highwater is None
For partition 5 highwater is None
....
For partition 96 highwater is None
For partition 97 highwater is None
For partition 98 highwater is None
For partition 99 highwater is None
Subscription = None
con.seek_to_beginning() = None
con.seek_to_end() = None

我有另一种使用 assign 的方法,但结果是一样的

con = KafkaConsumer(bootstrap_servers = brokers)
ps = [TopicPartition(topic, p) for p in con.partitions_for_topic(topic)]

con.assign(ps)
for p in ps:
    print "For partition %s highwater is %s"%(p.partition,con.highwater(p))

print "Subscription = %s"%con.subscription()
print "con.seek_to_beginning() = %s"%con.seek_to_beginning()
print "con.seek_to_end() = %s"%con.seek_to_end()

从一些文档看来,如果没有发出 fetch,我可能会得到这种行为。但我找不到强制执行的方法。我做错了什么?

或者是否有不同/更简单的方法来获取主题的最新偏移量?

【问题讨论】:

不是 100% 肯定的,但我认为您的代码在 kafka-python 实际连接到代理之前返回高水位值。由于KafkaConsumer 是异步的,我认为您必须实际使用一条消息才能填充高水位值:github.com/dpkp/kafka-python/issues/509#issuecomment-178114516 【参考方案1】:

使用confluent-kafka-python

你可以使用position:

检索分区列表的当前位置(偏移量)。

from confluent_kafka import Consumer, TopicPartition


consumer = Consumer("bootstrap.servers": "localhost:9092")
topic = consumer.list_topics(topic='topicName')
partitions = [TopicPartition('topicName', partition) for partition in list(topic.topics['topicName'].partitions.keys())] 

offset_per_partition = consumer.position(partitions)

或者,您也可以使用get_watermark_offsets,但您必须一次传递一个分区,因此需要多次调用:

检索分区的低偏移量和高偏移量。

from confluent_kafka import Consumer, TopicPartition


consumer = Consumer("bootstrap.servers": "localhost:9092")
topic = consumer.list_topics(topic='topicName')
partitions = [TopicPartition('topicName', partition) for partition in list(topic.topics['topicName'].partitions.keys())] 

for p in partitions:
    low_offset, high_offset = consumer.get_watermark_offsets(p)
    print(f"Latest offset for partition p: high_offset")

使用kafka-python

你可以使用end_offsets:

获取给定分区的最后一个偏移量。 a的最后一个偏移量 partition 是即将到来的消息的偏移量,即 最后一条可用消息 + 1。

此方法不会改变当前消费者位置 分区。

from kafka import TopicPartition
from kafka.consumer import KafkaConsumer


consumer = KafkaConsumer(bootstrap_servers = "localhost:9092" )
partitions= = [TopicPartition('myTopic', p) for p in consumer.partitions_for_topic('myTopic')]
last_offset_per_partition = consumer.end_offsets(partitions)

【讨论】:

Exception has occurred: TypeError expected cimpl.TopicPartition 这两个库中的哪一个? confluent_kafka @DachuanZhao 哪条线路导致了问题?【参考方案2】:

使用kafka-python>=1.3.4,您可以使用:

kafka.KafkaConsumer.end_offsets(partitions)

获取给定分区的最后一个偏移量。分区的最后一个偏移量是即将到来的消息的偏移量,即最后一条可用消息的偏移量+1。

from kafka import TopicPartition
from kafka.consumer import KafkaConsumer

con = KafkaConsumer(bootstrap_servers = brokers)
ps = [TopicPartition(topic, p) for p in con.partitions_for_topic(topic)]

con.end_offsets(ps)

【讨论】:

【参考方案3】:

实现这一点的另一种方法是轮询消费者以获取上次消费的偏移量,然后使用 seek_to_end 方法获取最近可用的偏移量分区。

from kafka import KafkaConsumer
consumer = KafkaConsumer('my-topic',
                     group_id='my-group',
                     bootstrap_servers=['localhost:9092'])
consumer.poll()
consumer.seek_to_end()

这种方法在使用消费者组时特别有用。

来源:

    https://kafka-python.readthedocs.io/en/master/apidoc/kafka.consumer.html#kafka.consumer.KafkaConsumer.poll https://kafka-python.readthedocs.io/en/master/apidoc/kafka.consumer.html#kafka.consumer.KafkaConsumer.seek_to_end

【讨论】:

我的服务器有数百条消息,但 consumer.poll() 返回了 如果您运行的消费者实例多于该主题的分区数,则可能会发生这种情况。 好点。事后我能够确定我们没有调用 .close,所以这种情况发生了,但我们认为只有 1 个。【参考方案4】:
from kafka import KafkaConsumer, TopicPartition

TOPIC = 'MYTOPIC'
GROUP = 'MYGROUP'
BOOTSTRAP_SERVERS = ['kafka01:9092', 'kafka02:9092']

consumer = KafkaConsumer(
        bootstrap_servers=BOOTSTRAP_SERVERS,
        group_id=GROUP,
        enable_auto_commit=False
    )


for p in consumer.partitions_for_topic(TOPIC):
    tp = TopicPartition(TOPIC, p)
    consumer.assign([tp])
    committed = consumer.committed(tp)
    consumer.seek_to_end(tp)
    last_offset = consumer.position(tp)
    print("topic: %s partition: %s committed: %s last: %s lag: %s" % (TOPIC, p, committed, last_offset, (last_offset - committed)))

consumer.close(autocommit=False)

【讨论】:

正如我所见,这个问题仍然引起我的注意,我想解释一下,而我上面的回答并没有真正回答我认为主题/分区的最后偏移量仅在以下情况下相关的问题一个消费群体。 kafka 是为许多消费群体从相同主题消费相同数据而构建的,我认为重要的是群体的消费率或更重要的是滞后。【参考方案5】:

在花了一天的时间和几次错误的开始之后,我终于找到了一个解决方案并让它发挥作用。发给她,让其他人可以参考。

from kafka import SimpleClient
from kafka.protocol.offset import OffsetRequest, OffsetResetStrategy
from kafka.common import OffsetRequestPayload

client = SimpleClient(brokers)

partitions = client.topic_partitions[topic]
offset_requests = [OffsetRequestPayload(topic, p, -1, 1) for p in partitions.keys()]

offsets_responses = client.send_offset_request(offset_requests)

for r in offsets_responses:
    print "partition = %s, offset = %s"%(r.partition, r.offsets[0])

【讨论】:

有没有办法获取每个消费者/组每个分区的当前/下一个偏移量? 遗憾的是,SimpleClient 已被弃用,并且上面的 offsets_responses 会产生 FailedPayloadsError: FailedPayloadsError @dreynold 它对我有用,但 Itamar Lavender 使用下面未弃用的部分的回答也适用。如果您还没有组,请跳过“滞后”部分,这也可以。【参考方案6】:

如果您希望使用 kafka/bin 中存在的 Kafka shell 脚本,那么您可以使用 kafka-run-class.sh 获取最新和最小的偏移量。

获取最新的偏移量命令如下所示

bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --time -1 --topic topiname

获取最小偏移量的命令如下所示

bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --time -2 --topic topiname

您可以从以下link 中找到有关 Get Offsets Shell 的更多信息

希望这会有所帮助!

【讨论】:

最简单的解决方案??

以上是关于如何获取 kafka 主题分区的最新偏移量?的主要内容,如果未能解决你的问题,请参考以下文章

获取主题和分区偏移量

如何重置 Kafka 偏移量以匹配尾部位置?

什么命令显示 Kafka 中分区的所有主题和偏移量?

如何正确使用 Kafka 消费者“寻找”以返回所有分区的未提交偏移量?

golang kafka Shopify/sarama 消费者重置新增分区偏移量并进行重新消费

Kafka consumerGroup 丢失了所有分区中提交的偏移量信息,并从头开始消费偏移量