kafka 基础概念命令行操作(查看所有topic创建topic删除topic查看某个Topic的详情修改分区数发送消息消费消息 查看消费者组 更新消费者的偏移位置)
Posted 但行益事莫问前程
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了kafka 基础概念命令行操作(查看所有topic创建topic删除topic查看某个Topic的详情修改分区数发送消息消费消息 查看消费者组 更新消费者的偏移位置)相关的知识,希望对你有一定的参考价值。
文章目录
前言
1. 基础概念
Broker
一台kafka服务器就是一个broker,可容纳多个topic。一个集群由多个broker组成;
Producer
生产者,即向kafka的broker-list发送消息的客户端;
Consumer
消费者,即向kafka的broker-list订阅消息的客户端;
Consumer Group
消费者组是逻辑上的一个订阅者
,由多个consumer组成。消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费
;所有的消费者都属于某个消费者组,消费者组之间互不影响;
Topic
可理解为一个队列,生产者和消费者均面向一个topic;
Partition
一个topic可以分为多个partition,每个partition是一个有序的队列
;这使得一个topic可分布到多个broker(即服务器)上,
Replica
副本,kafka提供了副本机制,一个topic的每个分区都有若干个副本,即一个leader和若干个follower,从而保证集群单点故障时节点上的partition数据不丢失
leader:每个分区多个副本的主
,生产者发送数据的对象,以及消费者消费数据的对象都是leader
follower:从
,实时从leader中同步数据,保持和leader数据的同步
。leader发生故障时,某个follower会成为新的leader
2. 命令行操作
2.1 查看所有topic
./kafka-topics.sh --bootstrap-server 192.168.0.23:29092,192.168.0.114:29092,192.168.0.110:29092 --list
2.2 创建topic
./kafka-topics.sh --bootstrap-server 192.168.0.23:29092,192.168.0.110:29092 --create --replication-factor 3 --partitions 3 --topic
charges
--topic
定义topic名
--replication-factor
定义副本数
--partitions
定义分区数
2.3 删除topic
./kafka-topics.sh --bootstrap-server 192.168.0.23:29092,192.168.0.114:29092,192.168.0.110:29092 --delete --topic charges
2.4 查看某个Topic的详情
./kafka-topics.sh --bootstrap-server 192.168.0.23:29092,192.168.0.114:29092,192.168.0.110:29092 --describe --topic charges
2.5 修改分区数
./kafka-topics.sh --bootstrap-server 192.168.0.23:29092,192.168.0.114:29092,192.168.0.110:29092 --alter --partitions 6 --topic charges
2.6 发送消息
./kafka-console-producer.sh --broker-list 192.168.0.23:29092,192.168.0.114:29092,192.168.0.110:29092 --topic charges
2.7 消费消息
./kafka-console-consumer.sh --bootstrap-server 192.168.0.23:29092,192.168.0.114:29092,192.168.0.110:29092 --from-beginning --topic charges --group htsc
--from-beginning
:若消费者没有offset,则从 the earliest message present in the log
而非默认的 the latest message
--group
: 指定消费者所属的消费者组
注:默认情况下,若zookeeper或__consumer_offsets
中有offset记录,则Kafka消费者从最后一次提交的偏移量位置(offset)开始消费消息
。若之前没有提交过偏移量,则消费消息开始位置取决于Topic的配置属性auto.offset.reset
的设置,默认为最新(latest 仅消费新消息
)
2.8 查看消费者组
./kafka-consumer-groups.sh --bootstrap-server 192.168.0.23:29092,192.168.0.114:29092,192.168.0.110:29092 --list
./kafka-consumer-groups.sh --bootstrap-server 192.168.0.23:29092,192.168.0.114:29092,192.168.0.110:29092 --describe --group htsc
注: 通过查看消费者组,可查看到topic所有分区当前的偏移量以及LOG-END-OFFSET(最后的偏移量)
,即消费消息的现状
2.9 更新消费者的偏移位置
将topic charges对应的消费组htsc 的分区编号0的偏移量设置为最早
./kafka-consumer-groups.sh --bootstrap-server 192.168.0.23:29092,192.168.0.114:29092,192.168.0.110:29092 --topic charges:0 --group htsc --reset-offsets --to-earliest --execute
--reset-offsets
Reset offsets of consumer group
--to-earliest
Reset offsets to earliest offset
--to-latest
Reset offsets to latest offset.
--to-offset <Long: offset>
Reset offsets to a specific offset
以上是关于kafka 基础概念命令行操作(查看所有topic创建topic删除topic查看某个Topic的详情修改分区数发送消息消费消息 查看消费者组 更新消费者的偏移位置)的主要内容,如果未能解决你的问题,请参考以下文章