监控Kafka消费进度

Posted eugene0

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了监控Kafka消费进度相关的知识,希望对你有一定的参考价值。

使用Kafka作为消息中间件消费数据时,监控Kafka消费的进度很重要。其中,在监控消费进度的过程中,主要关注消费Lag。

常用监控Kafka消费进度的方法有三种,分别是使用Kafka自带的命令行工具、使用Kafka Consumer API和Kafka自带的JMX监控指标,这里介绍前两种方法。
注: 内网IP:10.12.100.126 10.12.100.127 10.12.100.128 外网IP:47.90.133.76 47.90.133.77 47.90.133.78 用户名:server1 server2 server3

1 使用kafka自带的命令行工具

针对Kafka高级消费API,使用kafka自带的命令行工具kafka-consumer-groups.sh脚本直接查看Kafka消费进度

1.1 列出存在的所有消费者组

(base) root@node3:/opt/kafka/kafka_2.11-0.10.2.2/bin#  kafka-consumer-groups.sh new-consumer --bootstrap-server 10.12.100.126:9092,10.12.100.127:9092,10.12.100.128:9092 --list
Note: This will only show information about consumers that use the Java consumer API (non-ZooKeeper-based consumers).

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/hadoop/hadoop-2.7.6/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/kafka/kafka_2.11-0.10.2.2/libs/slf4j-log4j12-1.7.21.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
consumer
consumers

1.2 使用kafka-consumer-groups.sh查看消费进度

(base) root@node3:/opt/kafka/kafka_2.11-0.10.2.2/bin# kafka-consumer-groups.sh --bootstrap-server 10.12.100.126:9092,10.12.100.127:9092,10.12.100.128:9092 --describe --group consumers
Note: This will only show information about consumers that use the Java consumer API (non-ZooKeeper-based consumers).

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/hadoop/hadoop-2.7.6/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/kafka/kafka_2.11-0.10.2.2/libs/slf4j-log4j12-1.7.21.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]

TOPIC                          PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG        CONSUMER-ID                                       HOST                           CLIENT-ID
test                        0          17734           17734           0          consumer-1-dd342b6b-176a-4127-9a0b-81a3b46bd388   /47.90.133.76                  consumer-1
test                        1          17736           17736           0          consumer-1-dd342b6b-176a-4127-9a0b-81a3b46bd388   /47.90.133.76                   consumer-1
test                        2          17735           17735           0          consumer-1-dd342b6b-176a-4127-9a0b-81a3b46bd388   /47.90.133.76                  consumer-1

GROUP TOPIC PID OFFSET LOGSIZE LAG
消费者组 话题id 分区id 当前已消费的条数 总条数 未消费的条数

注意:LAG的单位时消息条数,LAG为0,表示消费者实时消费生产者产生的消息,无滞后;LAG越大,表示消费者不能及时消费生产者生产的消息,有滞后。

2 使用kafka Consumer API

from kafka import SimpleClient, KafkaConsumer
from kafka.common import OffsetRequestPayload, TopicPartition
def get_topic_offset(brokers, topic):
    client = SimpleClient(brokers)
    partitions = client.topic_partitions[topic]
    offset_requests = [OffsetRequestPayload(topic, p, -1, 1) for p in  partitions.keys()]
    offsets_responses = client.send_offset_request(offset_requests)
    return sum([r.offsets[0] for r in offsets_responses])
def get_group_offset(brokers, group_id, topic):
    consumer = KafkaConsumer(bootstrap_servers=brokers,
                             group_id=group_id,
                             )
    pts = [TopicPartition(topic=topic, partition=i) for i in
           consumer.partitions_for_topic(topic)]
    result = consumer._coordinator.fetch_committed_offsets(pts)
    return sum([r.offset for r in result.values()])
if __name__ == '__main__':
    topic_offset =  get_topic_offset("47.90.133.76:9092,47.90.133.77:9092,47.90.133.78:9092", "test")
    group_offset =  get_group_offset("47.90.133.76:9092,47.90.133.77:9092,47.90.133.78:9092", "consumers", "test")
    lag = topic_offset - group_offset
    print(topic_offset) # topic的offset总和
    print(group_offset) # topic特定group已消费的offset的总和
    print(lag) # 未消费的条数

(base) root@server3:~# python  getKafkaLag.py
17735
17735
0

代码参考:https://www.jianshu.com/p/e48af92e199d

以上是关于监控Kafka消费进度的主要内容,如果未能解决你的问题,请参考以下文章

flink自定义metrics监控kafka消费

kafka将消费进度重置到最新的位置

Kafka1.0.0如何查看队列消费进度

kafka通过java api 获取当前消费组offset/logsize/lag信息,实现消费延迟监控

kafka通过java api 获取当前消费组offset/logsize/lag信息,实现消费延迟监控

zabbix监控kafka消费