Kafka详解(上)——消息系统分类Kafka安装两种启动基本概念两种架构核心配置文件
Posted 98seven
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kafka详解(上)——消息系统分类Kafka安装两种启动基本概念两种架构核心配置文件相关的知识,希望对你有一定的参考价值。
1 消息和消息系统
消息(Message)是指在应用间传送的数据。消息可以非常简单,比如只包含文本字符串,也可以更复杂,可能包含嵌入对象。
1-1 消息系统是什么
消息系统负责将数据从一个应用程序传输到另一个应用程序,因此应用程序可以专注于数据,而不必担心如何共享数据。分布式消息传递基于可靠消息队列的概念。消息在客户端应用程序和消息传递系统之间异步排队。有两种类型的消息传递模式可用:一种是点对点,另一种是发布-订阅 (pub-sub) 消息传递系统。大多数消息传递模式都遵循pub-sub。
消息系统分类
-
Peer-to-Peer(点对点,一对一)
- 一般基于Pull接受消息
- 发送到队列中的消息被一个且仅仅一个接收者所接收,即便有多个接收者在同一队列中侦听同一消息
- 支持异步“即发即弃”的消息传递方式,也支持同步请求/应答传送方式
-
发布/订阅(一对多)
- 发布到一个主题的消息可以被多个订阅者接收
- 发布/订阅模式既可基于Push消费数据,也可基于Pull消费数据
- 解耦能力比P2P模型更强
Kafka
为这两种模型提供了单一的消费者抽象模型: 消费者组 (consumer group
)。 消费者用一个消费者组名标记自己。 一个发布在Topic
上消息被分发给此消费者组中的一个消费者。 假如所有的消费者都在一个组中,那么这就变成了P2P模型。 假如所有的消费者都在不同的组中,那么就完全变成了发布-订阅模型。
1-2 为什么要使用消息系统
- 解耦
在项目启动之初来预测将来项目会碰到什么需求,是极其困难的。消息系统在处理过程中间插入了一个隐含的、基于数据的接口层,两边的处理过程都要实现这一接口。这允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。 - 易扩展
因为消息队列解耦了你的处理过程,所以增大消息入队和处理的频率是很容易的,只要另外增加处理过程即可。不需要改变代码、不需要调节参数。扩展就像调大电力按钮一样简单。 - 峰值处理能力
在访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样的突发流量并不常见;如果为以能处理这类峰值访问为标准来投入资源随时待命无疑是巨大的浪费。使用消息队列能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷的请求而完全崩溃。 - 可恢复性
系统的一部分组件失效时,不会影响到整个系统。消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理。 - 顺序保证
在大多使用场景下,数据处理的顺序都很重要。大部分消息队列本来就是排序的,并且能保证数据会按照特定的顺序来处理。Kafka保证一个Partition内的消息的有序性。 - 异步通信
很多时候,用户不想也不需要立即处理消息。消息队列提供了异步处理机制,允许用户把一个消息放入队列,但并不立即处理它。想向队列中放入多少消息就放多少,然后在需要的时候再去处理它们。
1-3 常用消息队列对比
-
RabbitMQ
RabbitMQ是使用`Erlang`编写的一个开源的消息队列,本身支持很多的协议:AMQP,XMPP, SMTP, STOMP,也正因如此,它非常**重量级**,更适合于企业级的开发。对路由,负载均衡或者数据持久化都有很好的支持。同时支持peer-to-peer和发布/订阅模式。
-
Redis
Redis是一个基于Key-Value对的NoSQL数据库,虽然它是一个Key-Value数据库存储系统,但它本身支持MQ功能,可以当做一个轻量级的队列服务来使用。 Redis实现轻量级的消息队列与消息中间件相比,没有高级特性也没有ACK保证,无法做到数据不重不漏,如果业务简单而且对消息的可靠性不是那么严格可以尝试使用。
-
ZeroMQ
轻量级,不需要单独的消息服务器或中间件,应用程序将扮演这个服务器角色,它实质上是一个库,需要开发人员自己组合多种技术,使用复杂度高。特点是高性能、跨平台(支持Linux、Windows、OS X等)、多语言支持(C、C++、Java、Python等30多种语言)、可单独部署或集成到应用中使用。
-
Kafka
Kafka是一个高性能跨语言分布式**发布/订阅**消息队列系统,具有以下特性:快速持久化,可以在O(1)的系统开销下进行消息持久化;高吞吐,在一台普通的服务器上既可以达到`100W/s`的吞吐速率;完全的分布式系统,Broker、Producer、Consumer都原生自动支持分布式,自动实现负载均衡;支持Hadoop数据并行加载,对于像Hadoop的一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka通过Hadoop的并行加载机制统一了在线和离线的消息处理。
一般应用在大数据日志处理或对实时性(少量延迟),可靠性(少量丢数据)要求稍低的场景使用。
-
RocketMQ
RocketMQ是阿里开源的消息中间件,纯Java实现的发布/订阅消息系统,具有高吞吐量、高可用性、适合大规模分布式系统应用的特点。RocketMQ思路起源于Kafka,但并不是简单的复制,它对消息的可靠传输及事务性做了优化,目前在阿里集团被广泛应用于交易、充值、流计算、消息推送、日志流式处理、binglog分发等场景,支撑了阿里多次双十一活动。
因为是阿里内部从实践到产品的产物,因此里面很多接口、api并不是很普遍适用。可靠性毋庸置疑,而且与Kafka一脉相承(甚至更优),性能强劲,支持海量堆积。
横向对比
**
特性 | RabbitMQ | Kafka | RocketMQ |
---|---|---|---|
PRODUCER-COMSUMER | 支持 | 支持 | 支持 |
PUBLISH-SUBSCRIBE | 支持 | 支持 | 支持 |
REQUEST-REPLY | 支持 | - | 支持 |
API完备性 | 高 | 高 | 低(静态配置) |
多语言支持 | 语言无关 | 支持,JAVA优先 | 支持 |
单机呑吐量 | 万级 | 百万级 | 万级 |
消息延迟 | 微秒级 | 毫秒级 | - |
可用性 | 高 | 非常高(分布式) | 高 |
消息丢失 | 低 | 理论上不会丢失 | - |
文档的完备性 | 高 | 高 | 中 |
提供快速入门 | 有 | 有 | 无 |
首次部署难度 | 低 | 中 | 高 |
2 kafka流处理平台
官网:https://kafka.apache.org/
Kafka 起初是由 LinkedIn 公司采用 Scala 语言开发的一个多分区、多副本的分布式消息系统,现已被捐献给 Apache 基金会。目前 Kafka 已经定位为一个分布式流式处理平台,它以高吞吐、可持久化、可水平扩展、支持流数据处理等多种特性而被广泛使用。
2-1 kafka安装
1.安装:下载二进制版本
2.yum install lrzsz -y
3.检查是否安装成功:rpm -qa |grep lrzsz
4.执行rz
命令,打开本地选择文件对话框,选择文件
5.查看是否上传成功:
也可以在Linux中直接使用如下命令下载:
wget https://dlcdn.apache.org/kafka/3.0.0/kafka_2.13-3.0.0.tgz --no-check-certificate
6.解压:
tar xzf kafka_2.13-3.0.0.tgz
2-2 启动Kafka
(1)基于zookeeper启动
首先需要启动zookeeper,可以先通过wget https://dlcdn.apache.org/zookeeper/zookeeper-3.7.0/apache-zookeeper-3.7.0-bin.tar.gz
命令下载最新版本zookeeper,搭建zookeeper集群。但Kafka内置了一个zookeeper,也可以使用这个自带的zookeeper,以下演示是使用Kafka自带的zookeeper进行启动。
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-QNZmDuuQ-1650887263842)(Kafka.assets/image-20211015152747197.png)]
再启动Kafka服务:
bin/kafka-server-start.sh -daemon config/server.properties
启动 Kafka Broker 后,在终端上输入命令jps,将看到以下响应 :
现在可以看到在终端上运行了两个守护进程,其中 QuorumPeerMain
是 ZooKeeper
守护进程,另一个是 Kafka
守护进程。
(2)基于Kraft启动
首先使用kafka-storage.sh
为新集群生成ID:
./bin/kafka-storage.sh random-uuid
下一步是格式化存储目录。如果在单节点模式下运行,可以通过如下命令执行此操作。如果使用多个节点,则应在每个节点上运行format
命令,确保对每个节点使用相同的群集ID。
./bin/kafka-storage.sh format -t <uuid> -c ./config/kraft/server.properties
最后,启动Kafka服务:
./bin/kafka-server-start.sh -daemon ./config/kraft/server.properties
创建一个topic:
bin/kafka-topics.sh --bootstrap-server 192.168.8.128:9092 --create --topic topic-test-kraft --partitions 1 --replication-factor 1
2-3 基本概念
-
Producer
消息和数据的生产者,向Kafka的一个Topic发布消息的进程/代码/服务。
-
Consumer
消息和数据的消费者,订阅Topic并且处理其发布的消息的进程/代码/服务。
-
Consumer Group
逻辑概念,由多个 consumer 组成。消费者组是Kafka实现单播和广播两种消息模型的手段。对于同一个topic,会广播给不同的消费组,但一个消费组中只有一个消费者可以消费该消息,消费者组之间互不影响。消费者组中的每个消费者,都会实时记录自己消费到哪个offset,以便在出错恢复中,从上次的位置继续消费。
-
Broker
服务代理节点。对于 Kafka 而言,Broker 可以简单地看作一个独立的 Kafka 服务节点或 Kafka 服务实例。大多数情况下也可以将 Broker 看作一台 Kafka 服务器,前提是这台服务器上只部署了一个 Kafka 实例。一个或多个 Broker 组成了一个 Kafka 集群。
-
Topic
- 逻辑概念,kafka消息的类别,对数据进行分区、隔离;
- 同一个Topic的消息可分布在一个或多个Broker上;
- 一个Topic包含一个或多个Partition;
- Producer发布数据时,必须指定将该消息发布到哪一个Topic
- Consumer订阅消息消息时,也必须指定订阅哪个Topic的消息
-
Partition
- 物理概念,Kafka下数据存储的基本单元。一个Topic数据会被分散存储到多个Partition,每个Partion是有序的
- 一个partition只分布于一个Broker上(不考虑备份)
- 一个Partition物理上对应一个文件夹
- 一个Partition包含多个Segment,一个Segment对应三个文件
- Segment由一个个不可变记录组成
- 记录只会被append到Segment中,不会被单独删除或者修改
- 清除过期日志时,直接删除一个或多个Segment(默认情况,Kafka会将数据保留168小时,也可以设置每个分区所能保存数据的大小,超过这个大小之后将旧数据删除掉)
kafka采用了分片和索引机制,将每个partition分为多个segment。segment包括偏移量索引文件、日志文件、时间戳索引文件(如果需要按照时间来定位消息的offset,会先在这个文件里查找)以及可能的其他可能文件(比如事务索引文件)。这些文件位于一个文件夹下,该文件夹的命名规则为:topic名称+分区序号。例如:test这个topic有三个分区,则对应的文件夹为test-0,test-1,test-2。
index和log文件以当前segment的第一条消息的offset命名,一个segment默认存储1GB。
每条消息在被追加到分区日志文件的时候都会分配一个特定的偏移量(offset),索引文件中存储的是每个消息的起始偏移量和消息大小,log文件存储数据信息,索引文件中的元数据指向对应数据文件中消息的物理偏移地址。
如下图所示,以 “.index” 索引文件中的元数据 [3, 348] 为例,在 “.log” 数据文件表示第 3 个消息,即在全局 partition 中表示 170410+3=170413个消息,该消息的物理偏移地址为 348。
offset 是消息在分区中的唯一标识,Kafka 通过它来保证消息在分区内的顺序性,不过 offset 并不跨越分区,也就是说,Kafka 保证的是分区有序而不是主题有序。
如上图所示,主题中有4个分区,消息被顺序追加到每个分区日志文件的尾部。Kafka 中的分区可以分布在不同的服务器(broker)上,也就是说,一个主题可以横跨多个 broker,以此来提供比单个 broker 更强大的性能。
每一条消息被发送到 broker 之前,会根据分区规则选择存储到哪个具体的分区。如果分区规则设定得合理,所有的消息都可以均匀地分配到不同的分区中。如果一个主题只对应一个文件,那么这个文件所在的机器I/O将会成为这个主题的性能瓶颈,而分区解决了这个问题。在创建主题的时候可以通过指定的参数来设置分区的个数,当然也可以在主题创建完成之后去修改分区的数量,通过增加分区的数量可以实现水平扩展。
通常情况下,越多的partition会带来越高的吞吐量,但是同时也会给broker节点带来相应的性能损耗和潜在风险,虽然这些影响很小,但不可忽略,因此需要根据自身broker节点的实际情况来设置partition的数量以及replica的数量。
-
Replica:Kafka 为分区引入了多副本(Replica)机制,通过增加副本数量可以提升容灾能力。
- 当某个Topic的replication-factor为N且N大于1时,每个Partition都会有N个副本(Replica)
- Replica的个数小于等于Broker数,即对每个Partition而言,每个Broker上最多有一个副本,因此可用Broker ID表示Replica
- 所有Partiton的Replica默认情况会均匀分布到所有Broker上
-
Leader:每个分区多个副本的"主",生产者发送数据的对象,以及消费者消费数据的对象都是leader。
-
Follower:每个分区多个副本的"从",实时从leader中同步数据,保持和leader数据的同步。leader发生故障时,通过推举算法某个follower就会成为新的leader。
kafka采用的基于领导者的副本机制工作原理如下:
同一分区的不同副本中保存的是相同的消息,副本之间是“一主多从”的关系。在 Kafka 中,追follower副本是不对外提供服务的,任何一个follower副本都不能响应消费者和生产者的读写请求。所有的请求都必须由leader副本来处理,或者说,所有的读写请求都必须发往leader者副本所在的 Broker,由该 Broker 负责处理。follower副本不处理客户端请求,它唯一的任务就是从领导者副本异步拉取消息,并写入到自己的提交日志中,从而实现与领导者副本的同步。
副本处于不同的 broker 中,当 leader 副本出现故障时,从 follower 副本中重新选举新的 leader 副本对外提供服务。Kafka 通过多副本机制实现了故障的自动转移,当 Kafka 集群中某个 broker 失效时仍然能保证服务可用。
如上图所示,Kafka 集群中有4个 broker,某个主题中有3个分区,且副本因子(即副本个数)也为3,如此每个分区便有1个 leader 副本和2个 follower 副本。生产者和消费者只与 leader 副本进行交互,而 follower 副本只负责消息的同步,很多时候 follower 副本中的消息相对 leader 副本而言会有一定的滞后。
- ISR(In-sync Replicas):ISR 中的副本都是与 Leader 同步的副本,相反,不在 ISR 中的追随者副本就被认为是与 Leader 不同步的。
我们首先要明确的是,Leader 副本天然就在 ISR 中。也就是说,ISR 不只是追随者副本集合,它必然包括 Leader 副本。甚至在某些情况下,ISR 只有 Leader 这一个副本。
另外,能够进入到 ISR 的追随者副本要满足一定的条件: Follower 副本能够落后 Leader 副本的最长时间间隔replica.lag.time.max.ms
,超出这个时间就不会放到ISR中。
2-4 Kafka架构
1. 基于zookeeper架构
如上图所示,Kafka集群中会有一个broker被选举为Controller,负责跟Zookeeper
交互,管理整个Kafka集群中所有分区和副本的状态。其他broker负责监听Controller节点的数据变化。Controller
的选举工作依赖Zookeeper
,选举成功后,Zookeeper
会创建一个/controller
临时节点。
基于 Kafka-ZooKeeper 的分布式消息队列系统总体架构如下:如下图所示,一个典型的 Kafka 体系架构包括若干 Producer(消息生产者),若干 broker(作为 Kafka 节点的服务器),若干 Consumer(Group),以及一个 ZooKeeper 集群。Kafka通过 ZooKeeper 管理集群配置、选举 Leader 以及在 consumer group 发生变化时进行 Rebalance(即消费者负载均衡)。Producer 使用 push(推)模式将消息发布到 broker,Consumer 使用 pull(拉)模式从 broker 订阅并消费消息。
- Broker注册
Zookeeper
用一个专门节点保存Broker
服务列表,也就是/brokers/ids
。broker
在启动时,向Zookeeper
发送注册请求,Zookeeper
会创建这个broker
节点,节点路径为/brokers/ids/broker.id
;并保存broker
的IP
地址和端口。
这个节点属性为临时节点,一旦
broker
宕机,zookeeper将删除这个临时节点。
可以通过zookeeper-shell.sh脚本查看zookeeper中存储的数据:
2. Topic注册
在Kafka中,所有topic与broker的对应关系都由ZooKeeper进行维护,在ZooKeeper中,会为topic分配一个单独节点。每个topic
都会以/brokers/topics/[topic_name]
的形式记录在Zookeeper
。
一个topic
的消息会被保存到多个partition
,这些partition
跟broker
的对应关系也需要保存到Zookeeper
。
partition
是多副本保存的,当leader
副本所在的broker发生故障时,partition
需要重新选举leader
,这也是由Zookeeper
主导完成。
3. 消费者注册
当新的消费者组注册到 ZooKeeper 中时,ZooKeeper 会创建专用的节点来保存相关信息,其节点路径为 /consumers/group_id
,其节点下有三个子节点,分别为 [ids, owners, offsets]。
- ids 节点:记录该消费组中当前正在消费的消费者;
- owners 节点:记录该消费组消费的 topic 信息;
- offsets 节点:记录每个 topic 的每个分区的 offset。
每个消费者都要关注其所属消费者组中消费者数目的变化,即监听 /consumers/group_id/ids
下子节点的变化。一单发现消费者新增或减少,就会触发消费者的负载均衡。
4. 负载均衡
broker
向Zookeeper
进行注册后,生产者根据broker
节点来感知broker
服务列表变化,这样可以实现动态负载均衡。
consumer group
中的消费者,可以根据topic
节点信息来拉取特定分区的消息,实现负载均衡。
5. Controller选举
Kafka
集群中会有一个broker
被选举为Controller
负责跟Zookeeper
进行交互,它负责管理整个Kafka
集群中所有分区和副本的状态。其他broker
监听Controller
节点的数据变化。
Controller
的选举工作依赖于Zookeeper
,选举成功后,Zookeeper
会创建一个/controller
临时节点。
Controller
选举成功后,会从Zookeeper
集群中拉取一份完整的元数据初始化ControllerContext
,这些元数据缓存在Controller
节点。当集群发生变化时,比如增加topic
分区,Controller
不仅需要变更本地的缓存数据,还需要将这些变更信息同步到其他Broker
。
Controller
监听到Zookeeper
事件、定时任务事件和其他事件后,将这些事件按照先后顺序暂存到LinkedBlockingQueue
中,由事件处理线程按顺序处理,这些处理多数需要跟Zookeeper
交互,Controller
则需要更新自己的元数据。
6. 记录消费进度Offset
在 consumer 对指定消息 partition 的消息进行消费的过程中,需要定时地将 partition 消息的消费进度 Offset 记录到 ZooKeeper上,以便在该 consumer 进行重启或者其它 consumer 重新接管该消息分区的消息消费权后,能够从之前的进度开始继续进行消息消费。
注意:Kafka已推荐将consumer的Offset信息保存在 Kafka 内部的 topic 中,即:
__consumer_offsets(/brokers/topics/__consumer_offsets)
并且默认提供了 kafka_consumer_groups.sh 脚本供用户查看consumer信息。
7. 记录Partition和Consumer的关系
consumer group 下有多个 consumer(消费者),对于每个消费者组(consumer group),Kafka都会为其分配一个全局唯一的 group ID
,group 内部的所有消费者共享该 ID。订阅的 topic 下的每个分区只能分配给某个 group 下的一个consumer(当然该分区还可以被分配给其它 group)。同时,Kafka 为每个消费者分配一个 consumer ID,通常采用 hostname:UUID 形式表示。
在Kafka中,规定了每个 partition 只能被同组的一个消费者进行消费,因此,需要在 ZooKeeper 上记录下 partition 与 consumer 之间的关系,每个 consumer
一旦确定了对一个 partition
的消费权力,需要将其 consumer ID 写入到 ZooKeeper 对应消息分区的临时节点上,例如:
/consumers/[group_id]/owners/[topic]/[broker_id-partition_id]
其中,[broker_id-partition_id] 就是一个消息分区的标识,节点内容就是该消息分区消费者的 consumer ID。
基于zookeeper模式的最重要的配置文件server.properties的配置详解如下:
############################# Server Basics #############################
#每一个broker在集群中的唯一表示,要求是正数。当该服务器的IP地址发生改变时,broker.id没有变化,则不会影响consumers的消息情况。一般采用ip的后三位来用来标识是哪台kafka的broker,利于定位和排错
broker.id=0
############################# Socket Server Settings #############################
#broker服务器要监听的地址及端口;默认是localhost:9092;0.0.0.0的话,表示监听本机的所有ip地址。
#格式为<协议名称,主机名,端口号>,这里的协议名称可能是标准的名字,比如 PLAINTEXT 表示明文传输、SSL 表示使用SSL或TLS加密传输等;也可能是你自己定义的协议名字,比如CONTROLLER: //localhost:9092。
#内网连接服务的话,就只需要listener配置。
listeners=PLAINTEXT://192.168.8.128:9092
#暴露给外部的listeners,如果没有设置,会用listeners。对公网提供服务就需要配置 advertied.listener。
advertised.listeners=PLAINTEXT://192.168.8.128:9092
#broker处理网络请求的线程数量,也就是接收消息的线程数,一般情况下数量为cpu核数
num.network.threads=8
#消息从内存中写入磁盘时使用的线程数量,broker处理磁盘IO的线程数,数值为cpu核数2倍
num.io.threads=16
#发送缓冲区大小
socket.send.buffer.bytes=102400
#接收缓冲区大小
socket.receive.buffer.bytes=102400
#请求的最大数值,防止内存溢出
socket.request.max.bytes=104857600
############################# Log Basics #############################
#kafka数据的存放地址,多个地址的话用逗号分隔: /data/kafka-logs-1,/data/kafka-logs-2
#比起单块磁盘,多块物理磁盘同时读写数据有更高的吞吐量。
log.dirs=/opt/kafka_2.13-3.0.0/kafka-logs
#创建topic时默认的topic数,若创建时指定了分区数则以指定为准
num.partitions=1
#每个数据目录用于日志恢复启动和关闭时的线程
#注意,这个参数指的是每个日志目录的线程数,比如如果该参数设置为8,而log.dirs设置为了三个路径(多个路径由","分隔),则总共会启动24个线程来处理。
num.recovery.threads.per.data.dir=1
#一个topic,默认分区的replication(副本)个数,不能大于集群中broker的个数
offsets.topic.replication.factor=1
#事务主题的复制因子,设置更高以确保可用性。
transaction.state.log.replication.factor=1
#覆盖事务主题的min.insync.replicas配置。
transaction.state.log.min.isr=1
############################# Log Flush Policy #############################
#消息记录数达到1000时flush一次数据到磁盘
log.flush.interval.messages=10000
#仅仅通过消息条数来控制消息的磁盘写入时机是不足的。如果消息量始终没有达到阀值,但是离上一次磁盘同步的时间间隔达到log.flush.interval.ms的值,也将触发。例如:log.flush.interval.ms=1000,表示每间隔1000毫秒flush一次数据到磁盘。
log.flush.interval.ms=1000
############################# Log Retention Policy #############################
#日志清除策略——按时间,默认168小时(7天)
log.retention.hours=168
#topic每个分区的最大文件大小,一个topic的大小限制 = 分区数*log.retention.bytes。默认没有开启,如果设置该配置,和log.retention.hours满足任意一条件都会进行日志清除
log.retention.bytes=1073741824
# 一个片段存储数据的大小,默认1GB,超过这个大小将会创建新的segment
log.segment.bytes=1073741824
#文件大小检查的周期时间,是否触发日志清除中设置的策略
log.retention.check.interval.ms=300000
############################# Zookeeper #############################
#连接zookeeper的配置,连接地址和超时时间设置
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=18000
############################# Group Coordinator Settings #############################
#在执行第一次再平衡之前,group 协调员将等待更多消费者加入 group 的时间。
#延迟时间越长意味着重新平衡的可能性越小,但是等待处理开始的时间增加。
group.initial.rebalance.delay.ms=0
2.基于Kraft架构
Kafak 团队把通过 Raft 协议同步数据的方式 Kafka Raft Metadata mode
,简称 KRaft。
KRaft运行模式的Kafka集群 ,不会将元数据存储在 Apache ZooKeeper中,部署新集群的时候,无需部署ZooKeeper集群 ,因为 Kafka 将元数据存储在 Controller 节点的 KRaft Quorum中 。 KRaft可以带来很多好处,比如可以支持更多的分区,更快速的切换Controller,也可以避免Controller缓存的元数据和Zookeeper存储的数据不一致带来的一系列问题。
没有了ZK的辅助,Controller就要接手ZK的元数据存储,并且单点Controller失败会对集群造成破坏性的影响。因此,Controller会变为一个符合Quorum原则(过半原则)
的Broker集合(即Quorum Controller),在实际应用中要求Controller Quorum的节点数为奇数且大于等于3
,最多可以容忍(n / 2 - 1)个节点失败。当然,只有一个节点能成为领导节点即Active Controller,领导选举就依赖于内置的Raft协议变种(又称为KRaft)实现。按照介绍Raft的思路,首先来看看Controller Quorum节点的状态与转移规则。
Quorum节点状态机
在KRaft模式下,Quorum中的一个节点可以处于以下3种状态之一。
- Candidate(候选者)——主动发起选举;
- Leader(领导者)——在选举过程中获得多数票;
- Follower(跟随者)——已经投票给Candidate,或者正在从Leader复制日志;
领导选举
当满足以下三个条件之一时,Quorum中的某个节点就会触发选举:
1.向Leader发送请求后,在超时阈值quorum.fetch.timeout.ms之后仍然没有得到响应,表示Leader疑似失败;
2.从当前Leader收到了EndQuorumEpoch请求,表示Leader已退位;
3.Candidate状态下,在超时阈值quorum.election.timeout.ms之后仍然没有收到多数票,也没有Candidate赢得选举,表示此次选举作废,重新进行选举。
选举过程
下面以一个刚初始化的 Raft 集群为例:
- 初始状态
首先,在初始状态下,集群中所有的节点都是跟随者的状态。Raft每个节点初始化后的心跳超时时间都是随机的,如上所示,节点 C 的超时时间最短(120ms),任期编号都为 0,角色都是跟随者。
其实,Raft 算法巧妙地使用随机选举超时时间的方法,把超时时间都分散开来,在大多数
情况下只有一个服务器节点先发起选举,而不是同时发起选举,这样就能减少因选票瓜分导 致选举失败的情况。在 Raft 算法中,随机超时时间是有 2 种含义的:
- 跟随者等待领导者心跳信息超时的时间间隔,是随机的;
- 当没有候选人赢得过半票数,选举无效了,这时需要等待一个随机时间间隔,也就是说,等待选举超时的时间间隔是随机的。
-
请求投票
此时没有一个节点是领导者,节点等待心跳超时后,会推荐自己为候选人,向集群其他节点发起请求投票信息,此时任期编号 +1,自荐会获得自己的一票选票。
-
跟随着投票
跟随者收到请求投票信息后,在编号为 1 的这届任期内,也还没有进行过投票,那么它将把选票投给节点 A,同时更新任期编号为 1。(因为每个任期内跟随者只能投给先来的候选人一票,后面来的候选人则不能在投票给它了)
-
当选领导者
如果候选人在选举超时时间内赢得了大多数的选票,那么它就会成为本届任期内新的领导者。 -
领导者与跟随着保持心跳
领导者周期性发送心跳消息给其他节点,告知其他服务器自己是领导者,防止跟随者发起新的领导者选举。
关于任期
从以上的选举过程看,我们知道在 Raft 中的选举中是有任期机制的,顾名思义,每一任领导者,都有它专属的任期,当领导者更换后,任期也会增加,Raft 中的任期还要注意以下个细节:
-
如果某个节点,发现自己的任期编号比其他节点小,则会将自己的任期编号更新比自己更大的值;
-
从上面的选举过程看出,每次推荐自己成为候选人,都会得到自身的那一票;
-
如果候选人或者领导者发现自己的任期编号比其它节点好要小,则会立即更新自己为跟随者。
这点很重要,按照我的理解,这个机制能够解决同一时间内有多个领导者的情况,比如领导者 A 挂了之后,集群其他节点会选举出一个新的领导者 B,在节点 B 恢复之后,会接收来自新领导者的心跳消息,此时节点 A 会立即恢复成跟随者状态;
配置文件
Kraft目录下server.properties的配置文件详解如下:
#设置本服务器的角色
#如果Process.Roles = Broker, 服务器在KRaft模式中充当 Broker。
#如果Process.Roles = Controller, 服务器在KRaft模式下充当 Controller。
#如果Process.Roles = Broker,Controller,服务器在KRaft模式中同时充当 Broker 和Controller。同时充当Broker和Controller的节点称为“组合”节点。
#如果process.roles 没有设置。那么集群就假定是运行在ZooKeeper模式下。
process.roles=broker,controller
# 节点的ID,和节点所承担的角色相关联。不同于原来的broker.id,这个nodeid是用来投票用的
node.id=1
#这个配置标识有哪些节点是 Quorum 的投票者节点(选民)。所有想成为控制器的节点都需要包含在这个配置里面。这类似于在使用ZooKeeper时,使用ZooKeeper.connect配置时必须包含所有的ZooKeeper服务器。
controller.quorum.voters=1@localhost:9093
listeners=PLAINTEXT://:9092,CONTROLLER://:9093
inter.broker.listener.name=PLAINTEXT
#暴露给外部的listeners,如果没有设置会listeners的配置
advertised.listeners=PLAINTEXT://localhost:9092
#如果此broker的角色是Controller,则必须配置此选项,暴露给外部的名字
controller.listener.names=CONTROLLER
#配置监听者的安全协议
listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
#下面的配置和基于zookeeper模式的配置是一样的
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kraft-combined-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.flush.interval.messages=10000
log.flush.interval.ms=1000
log.retention.hours=168
log.retention.bytes=1073741824
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
2-5 为什么要弃用zookeeper
实际上,问题不在于ZooKeeper本身,而在于外部元数据管理的概念上。Kafka本身就是一个分布式系统,但是需要另一个分布式系统来管理,复杂性无疑增加了:
-
运维复杂度
使用了
Zookeeper
,部署Kafka
的时候必须要部署两套系统,Kafka
的运维人员必须要具备Zookeeper
的运维能力。 摒弃zookeeper的Kafka部署也会变得更加简单。
-
Controller故障处理
Kafaka
依赖一个单一Controller
节点跟Zookeeper
进行交互,如果这个Controller
节点发生了故障,就需要从broker
中选择新的Controller
。新的Controller
选举成功后,会重新从Zookeeper
拉取元数据进行初始化,并且需要通知其他所有的broker
更新ActiveControllerId
。老的Controller
需要关闭监听、事件处理线程和定时任务。分区数非常多时,这个过程非常耗时,而且这个过程中Kafka
集群是不能工作的。 Kraft模式中每个controller都拥有几乎update-to-date的Metadata,所以controller集群重新选主时恢复时间很短。
-
分区瓶颈
当分区数增加时,
Zookeeper
保存的元数据变多,Zookeeper
集群压力变大,达到一定级别后,监听延迟增加,给Kafaka
的工作带来了影响。 官方介绍,
Kraft模式
可以轻松支持百万级别的分区,partition的主选举将变得更快捷。
在大规模集群和云原生的背景下,使用Zookeeper
给Kafka
的运维和集群性能造成了很大的压力,去除Zookeeper
的必然趋势。
2-6 可视化工具
下载地址:https://www.kafkatool.com/download.html
提示配置相应的连接信息:
kafka术语和配置介绍
参考技术A producer 是生产者,负责消息生产,上游程序中按照标准的消息格式组装(按照每个消息事件的字段定义)发送到指定的topic。producer生产消息的时候,不会因为consumer处理能力不够,而阻塞producer的生产。consumer会从指定的topic 拉取消息,然后处理消费,并提交offset(消息处理偏移量,消费掉的消息并不会主动删除,而是kafka系统根据保存周期自动消除)。topic是消费分类存储的队列,可以按照消息类型来分topic存储。
replication是topic复制副本个数,用于解决数据丢失,防止leader topic宕机后,其他副本可以快代替。
broker是缓存代理,Kafka集群中的一台或多台服务器统称broker,用来保存producer发送的消息。Broker没有副本机制,一旦broker宕机,该broker的消息将都不可用。
partition是topic的物理分组,在创建topic的时候,可以指定partition 数量。每个partition是逻辑有序的,保证每个消息都是顺序插入的,而且每个消息的offset在不同partition的是唯一不同的
偏移量。kafka为每条在分区的消息保存一个偏移量offset,这也是消费者在分区的位置。比如一个偏移量是5的消费者,表示已经消费了从0-4偏移量的消息,下一个要消费的消息的偏移量是5。每次消息处理完后,要么主动提交offset,要么自动提交,把offset偏移到下一位,如处理offset=6消息。在kafka配置中,如果enable_auto_commit=True和auto_commit_interval_ms=xx,那表示每xx 毫秒自动提交偏移量
分组。是指在消费同一topic的不同consumer。每个consumer都有唯一的groupId,同一groupId 属于同一个group。不同groupId的consumer相互不影响。对于一个topic,同一个group的consumer数量不能超过 partition数量。比如,Topic A 有 16个partition,某一个group下有2个consumer,那2个consumer分别消费8个partition,而这个group的consumer数量最多不能超过16个。
kafka的配置主要分四类,分别是zookeeper、server、consumer、producer。其他的配置可以忽略。
zk的配置比较简单,也可以默认不改.dataDir是zk存储节点配置的目录地址,clientPort是zk启动的端口,默认2181,maxClientCnxns是限制ip的连接此处,设置0表示无连接次数,一般情况根据业务部署情况,配置合理的值。
以上是关于Kafka详解(上)——消息系统分类Kafka安装两种启动基本概念两种架构核心配置文件的主要内容,如果未能解决你的问题,请参考以下文章