消息中间件之Kafka

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了消息中间件之Kafka相关的知识,希望对你有一定的参考价值。

参考技术A

Producer: 生产者,发送消息的一方。生产者负责创建消息,然后将其发送到 Kafka 服务器上。

Consumer: 消费者,接受消息的一方。消费者连接到 Kafka 服务器上并接收消息,进而进行相应的业务逻辑处理。

Consumer Group: 消费者集合,一个消费者组可以包含一个或者多个消费者。使用多分区 + 多消费者的方式,可以极大提高下游系统处理速度。同一消费者组中的消费者不会重复消费消息,不同的消费者组之间不会互相影响,都能收到全部消息。kafka就是通过消费组来实现P2P模式和广播模式的。

Broker: Kafka 服务器。

Topic: Kafka中的消息维度,一个Topic类似一个queue。生产者将消息发送到特定的Topic,消费者通过Topic进行消费。

Partition: 分区,分区是属于Topic逻辑概念下的一个分区,每个分区只属于一个Topic,一个Topic通常有多个分区,每个分区包含的消息是不同的,分区在存储层面可以看做一个可追加的日志文件,消息在被追加到分区日志文件时,会分配一个特定的便宜了(offset)。

Offset: 分区中的消息的唯一标识,用它来保证消息在分区内的顺序性,offset不跨分区,也就是说,Kafka保证消息在分区内的有序性,不保证消息在Topic下的有序性

Replication: 副本,是Kafka保证数据高可用的方式。同一Partition的数据可以在多个Broker(kafka服务器)上存在多个副本,通常只有主副本提供读写服务,当主副本发生故障,Kafka会在Controller的管理下,选择新的副本作为主副本提供读写服务

Follower: 从副本,相对于主副本,从副本只同步主副本数据,不提供读写服务。

Record: 写入kafka中的消息,每个消息包含了key、value和timestamp。

生产者-消费者是一种设计模式,是在生产者和消费者之间添加一个中间件来达到解耦的目的。

Zookeeper是一个成熟的分布式协调服务,它可以为分布式服务提供分布式配置服务、同步服务和命名注册等能力。任何分布式服务都需要一种协调任务的方法,Kafka使用Zookeeper来进行任务协调,也有一些其他技术具有自己的内置任务协调机制。

Kafka将Broker、Topic和Partitin的元数据存储在Zookeeper上。

Kafka使用Zookeeper完成以下功能:

Controller是从Broker中选举出来的,负责分区 Leader 和 Follower 的管理。当某个分区的 leader 副本发生变化,由Controller负责为该分区选举新的 leader 副本。当某个分区的同步副本集合发生变化时,由Controller负责通知所有Broker更新元数据信息。

Controller的选举依赖Zookeeper,成功竞选为控制器的Broker会在Zookeeper中创建一个/controller临时节点。

选举过程: Broker首先尝试读取/controller节点中的brokerid值,如果brokerid值不为-1,表示已经存在Broker当选Controller,否则尝试创建/controller节点,创建成功后将当前brokerid写入/controller节点,作为 activeControllerId

主要职责: controller选举出来作为整个Broker集群的管理者,管理所有集群信息和元数据。

Kafka 的网络通信模型是基于 NIO 的Reactor 多线程模型来设计的。其中包含一个Acceptor线程用于处理连接,多个 Processor 线程 select 和 read socket 请求,一个Processor 由包含多个 Handler 线程处理请求并响应。

顺序写:

零拷贝:

PageCache: producer 生成消息到 Broker 时,Broker 会使用 pwrite() 系统调用,按偏移量写入数据。写入时,会先写入 page cache。Consumer 消费消息时,Broker会使用sendfile() 系统调用,零拷贝的将数据从 page cache 传输到 Broker 的 Socket Buffer,通过网络传输。因此当Kafka的生产速率和消费速率相差不大时,就能几乎只靠 page cache 的读写完成整个生产-消费过程,磁盘访问非常少

网络模型: Kafka基于NIO,采用Reactor线程模型,实现了自己的RPC通信。 一个Acceptor线程处理新的连接,多个Processor线程select 和 read socket请求,多个Handler线程处理请求并响应(I/O多路复用)。

批量与压缩: Kafka Producer 向 Broker 发送消息不是一条一条发送,而是按批发送。且roducer、Broker 和 Consumer 使用相同的压缩算法,在 producer 向 Broker 写入数据,Consumer 向 Broker 读取数据时甚至可以不用解压缩,最终在 Consumer Poll 到消息时才解压,这样节省了大量的网络和磁盘开销。

分区并发: Kafka 的 Topic 可以分成多个 Partition,每个 Paritition 类似于一个队列,保证数据有序。同一个 Group 下的不同 Consumer 并发消费 Paritition,分区实际上是调优 Kafka 并行度的最小单元,因此,可以说,每增加一个 Paritition 就增加了一个消费并发。

文件结构:

Kafka 消息是以 Topic 为单位进行归类,各个 Topic 之间是彼此独立的,互不影响。每个 Topic 又可以分为一个或多个分区。每个分区各自存在一个记录消息数据的日志文件。

Kafka 每个分区日志在物理上实际按大小被分成多个 Segment。

index 采用稀疏索引,这样每个 index 文件大小有限,Kafka 采用mmap的方式,直接将 index 文件映射到内存,这样对 index 的操作就不需要操作磁盘 IO。

Kafka 充分利用二分法来查找对应 offset 的消息位置

和其他消息队列相比,Kafka的优势在哪里?

队列模型了解吗?Kafka 的消息模型知道吗?

Kafka 如何保证消息不重复消费?

kafka出现消息重复消费的原因:

解决方案:

参考1: Kafka性能篇:为何Kafka这么"快"?

参考2: Kafka原理篇:图解kakfa架构原理

消息中间件之kafka

发送端的可选配置

acks
acks 配置表示 producer 发送消息到 broker 上以后的确认值。有三个可选项
Ø 0:表示 producer 不需要等待 broker 的消息确认。这个选项时延最小但同
时风险最大(因为当 server 宕机时,数据将会丢失)。
Ø 1:表示 producer 只需要获得 kafka 集群中的 leader 节点确认即可,这个
选择时延较小同时确保了 leader 节点确认接收成功。
Ø all(-1):需要 ISR 中所有的 Replica 给予接收确认,速度最慢,安全性最高,
但是由于 ISR 可能会缩小到仅包含一个 Replica,所以设置参数为 all 并不能一 定避免数据丢失

batch.size
生产者发送多个消息到 broker 上的同一个分区时,为了减少网络请求带来的
性能开销,通过批量的方式来提交消息,可以通过这个参数来控制批量提交的 字节数大小,默认大小是 16384byte,也就是 16kb,意味着当一批消息大小达
到指定的 batch.size 的时候会统一发送

linger.ms
Producer 默认会把两次发送时间间隔内收集到的所有 Requests 进行一次聚合
然后再发送,以此提高吞吐量,而 linger.ms 就是为每次发送到 broker 的请求
增加一些 delay,以此来聚合更多的 Message 请求。 这个有点想 TCP 里面的
Nagle 算法,在 TCP 协议的传输中,为了减少大量小数据包的发送,采用了
Nagle 算法,也就是基于小包的等-停协议。
Ø batch.size 和 linger.ms 这两个参数是 kafka 性能优化的关键参数,很多同
学会发现 batch.size 和 linger.ms 这两者的作用是一样的,如果两个都配置
了,那么怎么工作的呢?实际上,当二者都配置的时候,只要满足其中一个要 求,就会发送请求到 broker 上

max.request.size
设置请求的数据的最大字节数,为了防止发生较大的数据包影响到吞吐量,默 认值为 1MB

(1.0以后,kafka‘都是走的异步发送)

消费端的可选配置

group.id

consumer group 是 kafka 提供的可扩展且具有容错性的消费者机制。既然是
一个组,那么组内必然可以有多个消费者或消费者实例(consumer instance),
它们共享一个公共的 ID,即 group ID。组内的所有消费者协调在一起来消费订
阅主题(subscribed topics)的所有分区(partition)。当然,每个分区只能由同一
个消费组内的一个 consumer 来消费.如下图所示,分别有三个消费者,属于两
个不同的 group,那么对于 firstTopic 这个 topic 来说,这两个组的消费者都
能同时消费这个 topic 中的消息,对于此事的架构来说,这个 firstTopic 就类
似于 ActiveMQ 中的 topic 概念。如右图所示,如果 3 个消费者都属于同一个
group,那么此事 firstTopic 就是一个 Queue 的概念

技术图片

 

 

enable.auto.commit
消费者消费消息以后自动提交,只有当消息提交以后,该消息才不会被再次接 收到,还可以配合 auto.commit.interval.ms 控制自动提交的频率。
当然,我们也可以通过 consumer.commitSync()的方式实现手动提交

auto.offset.reset 

这个参数是针对新的 groupid 中的消费者而言的,当有新 groupid 的消费者来
消费指定的 topic 时,对于该参数的配置,会有不同的语义

auto.offset.reset=latest 情况下,新的消费者将会从其他消费者最后消费的
offset 处开始消费 Topic 下的消息
auto.offset.reset= earliest 情况下,新的消费者会从该 topic 最早的消息开始 消费
auto.offset.reset=none 情况下,新的消费者加入以后,由于之前不存在
offset,则会直接抛出异常。

max.poll.records 

此设置限制每次调用 poll 返回的消息数,这样可以更容易的预测每次 poll 间隔
要处理的最大值。通过调整此值,可以减少 poll 间隔

关于 Topic 和 Partition 

Topic

在kafka中,topic是一个存储消息的逻辑概念,可以认为 是一个消息集合。每条消息发送到 kafka 集群的消息都有 一个类别。物理上来说,不同的 topic 的消息是分开存储 的, 每个 topic 可以有多个生产者向它发送消息,也可以有多 个消费者去消费其中的消息。 

技术图片

 

 Partition

每个 topic 可以划分多个分区(每个 Topic 至少有一个分 区),同一topic下的不同分区包含的消息是不同的。每个 消息在被添加到分区时,都会被分配一个offset(称之为偏 移量),它是消息在此分区中的唯一编号,kafka通过offset 保证消息在分区内的顺序,offset的顺序不跨分区,即kafka 只保证在同一个分区内的消息是有序的。

下图中,对于名字为test的topic,做了3个分区,分别是 p0、p1、p2. 

? 每一条消息发送到 broker 时,会根据 partition 的规则 选择存储到哪一个partition。如果partition规则设置合 理,那么所有的消息会均匀的分布在不同的partition中, 这样就有点类似数据库的分库分表的概念,把数据做了 分片处理。 

技术图片

 

 Topic&Partition 的存储 

 Partition 是以文件的形式存储在文件系统中,比如创建一 个名为 firstTopic 的 topic,其中有 3 个 partition,那么在 kafka 的数据目录(/tmp/kafka-log)中就有 3 个目录, firstTopic-0~3, 命名规则是<topic_name>-<partition_id>

./kafka-topics.sh --create --zookeeper 192.168.11.156:2181 --replication-factor 1 --partitions 3 -topic firstTopic 

 消息分发

消息是kafka中最基本的数据单元,在kafka中,一条消息 由key、value两部分构成,在发送一条消息时,我们可以 指定这个 key,那么 producer 会根据 key 和 partition 机 制来判断当前这条消息应该发送并存储到哪个partition中。 我们可以根据需要进行扩展producer的partition机制。

消息默认分发机制

默认情况下,kafka采用的是hash取模的分区算法。如果 Key为null,则会随机分配一个分区。这个随机是在这个参 数”metadata.max.age.ms”的时间范围内随机选择一个。对 于这个时间段内,如果 key 为 null,则只会发送到唯一的 分区。这个值值哦默认情况下是10分钟更新一次。 关于 Metadata,简单理解就是 Topic/Partition 和 broker 的映射关系,每一个 topic 的每 一个partition,需要知道对应的broker列表是什么,leader 是谁、follower是谁。这些信息都是存储在Metadata这个类里面。

ps:consumer和partition的数量最好保持一致。如果有3个partition,5个consumer,那么只有三个consumer工作,只有两个consumer,那么只有两个consumer工作

kafka 消息消费原理

在实际生产过程中,每个topic都会有多个partitions,多 个partitions的好处在于,一方面能够对broker上的数据 进行分片有效减少了消息的容量从而提升io性能。另外一 方面,为了提高消费端的消费能力,一般会通过多个 consumer 去消费同一个 topic ,也就是消费端的负载均衡机制,在多个partition以 及多个consumer的情况下,消费者是如何消费消息的 ,kafka存在consumer group 的概念,也就是 group.id 一样的 consumer,这些 consumer属于一个consumer group,组内的所有消费者 协调在一起来消费订阅主题的所有分区。当然每一个分区 只能由同一个消费组内的 consumer 来消费,那么同一个 consumer group 里面的 consumer 是怎么去分配该消费 哪个分区里的数据的呢?如下图所示,3个分区,3个消费 者,那么哪个消费者消分哪个分区? 

技术图片

 

 对于上面这个图来说,这3个消费者会分别消费test这个 topic 的 3 个分区,也就是每个 consumer 消费一个 partition。 

分区分配策略 

同一个group中的消费者对于一个 topic 中的多个 partition,存在一定的 分区分配策略。 在kafka中,存在两种分区分配策略,一种是Range(默认)、

另一种另一种还是 RoundRobin(轮询)。 通过 partition.assignment.strategy这个参数来设置。

Range strategy(范围分区) 

Range 策略是对每个主题而言的,首先对同一个主题里面 的分区按照序号进行排序,并对消费者按照字母顺序进行 排序。假设我们有10个分区,3个消费者,排完序的分区 将会是0, 1, 2, 3, 4, 5, 6, 7, 8, 9;消费者线程排完序将会是 C1-0, C2-0, C3-0。然后将partitions的个数除于消费者线 程的总数来决定每个消费者线程消费几个分区。如果除不 尽,那么前面几个消费者线程将会多消费一个分区。在我 们的例子里面,我们有10个分区,3个消费者线程, 10 / 3 = 3,而且除不尽,那么消费者线程 C1-0 将会多消费一 个分区,所以最后分区分配 的结果看起来是这样的: C1-0 将消费 0, 1, 2, 3 分区  C2-0 将消费 4, 5, 6 分区  C3-0 将消费 7, 8, 9 分区 假如我们有 11 个分区,那么最后分区分配的结果看起来是这样的: 

C1-0 将消费 0, 1, 2, 3 分区 

C2-0 将消费 4, 5, 6, 7 分区 

C3-0 将消费 8, 9, 10 分区

假如我们有 2 个主题(T1 和 T2),分别有 10 个分区,那么最后 分区分配的结果看起来是这样的:

C1-0 将消费 T1主题的 0, 1, 2, 3 分区以及 T2主题的 0, 1, 2, 3分区 

C2-0 将消费 T1主题的 4, 5, 6 分区以及 T2主题的 4, 5, 6分区 

C3-0 将消费 T1主题的 7, 8, 9 分区以及 T2主题的 7, 8, 9分区

可以看出,C1-0 消费者线程比其他消费者线程多消费了 2 个 分区,这就是Range strategy的一个很明显的弊端 

RoundRobin strategy(轮询分区) 

轮询分区策略是把所有partition和所有consumer线程都 列出来,然后按照 hashcode 进行排序。最后通过轮询算 法分配partition给消费线程。如果所有consumer实例的 订阅是相同的,那么partition会均匀分布。

在我们的例子里面,假如按照 hashCode 排序完的topicpartitions组依次为T1-5, T1-3, T1-0, T1-8, T1-2, T1-1, T1-4, T1-7, T1-6, T1-9,我们的消费者线程排序为C1-0, C1-1, C20, C2-1,最后分区分配的结果为: 

C1-0 将消费 T1-5, T1-2, T1-6 分区;

C1-1 将消费 T1-3, T1-1, T1-9 分区;

C2-0 将消费 T1-0, T1-4 分区;

C2-1 将消费 T1-8, T1-7 分区;

使用轮询分区策略必须满足两个条件

1. 每个主题的消费者实例具有相同数量的流

2. 每个消费者订阅的主题必须是相同的 

rebalance策略的触发

当出现以下几种情况时,kafka 会进行一次分区分配操作, 也就是kafka consumer的rebalance

1. 同一个consumer group内新增了消费者

2. 消费者离开当前所属的 consumer group,比如主动停 机或者宕机

3. topic新增了分区(也就是分区数量发生了变化) 

coordinator角色

谁来执行 Rebalance 以及管理 consumer 的 group 呢? Kafka提供了一个角色:coordinator来执行对于consumer group的管理,Kafka提供了一个角色:coordinator来执 行对于consumer group的管理,当consumer group的 第一个 consumer 启动的时候,它会去和 kafka server 确 定谁是它们组的 coordinator。之后该 group 内的所有成 员都会和该coordinator进行协调通信 

coordinator角色的确定

consumer group如何确定自己的coordinator是谁呢, 消 费者向 kafka 集群中的任意一个 broker 发送一个 GroupCoordinatorRequest 请求,服务端会返回一个负载 最小的 broker 节点的 id,并将该 broker 设置为 coordinator 

整个reblance过程分为两个步骤,

Join和Sync join: 表示加入到consumer group中,在这一步中,所有 的成员都会向 coordinator 发送 joinGroup 的请求。一旦所有成员都发送了 joinGroup 请求,那么 coordinator 会 选择一个consumer担任leader角色,并把组成员信息和 订阅信息发送消费者 

技术图片

 

 

完成分区分配之后,就进入了Synchronizing Group State阶段,主要逻辑是向 GroupCoordinator 发送 SyncGroupRequest请求,并且处理SyncGroupResponse 响应,简单来说,就是leader将消费者对应的partition分 配方案同步给consumer group 中的所有consumer 

技术图片

 

 每个消费者都会向 coordinator 发送 syncgroup 请求,不 过只有leader节点会发送分配方案,其他消费者只是打打 酱油而已。当 leader 把方案发给 coordinator 以后, coordinator会把结果设置到SyncGroupResponse中。这 样所有成员都知道自己应该消费哪个分区。 

? consumer group 的分区分配方案是在客户端执行的! Kafka 将这个权利下放给客户端主要是因为这样做可以 有更好的灵活性 

如何保存消费端的消费位置

如何保存消费端的消费位置 

前面在讲解 partition 的时候,提到过 offset, 每个 topic可以划分多个分区(每个 Topic 至少有一个分区),同一 topic下的不同分区包含的消息是不同的。每个消息在被添 加到分区时,都会被分配一个offset(称之为偏移量),它 是消息在此分区中的唯一编号,kafka通过offset保证消息 在分区内的顺序,offset的顺序不跨分区,即kafka只保证 在同一个分区内的消息是有序的; 对于应用层的消费来说, 每次消费一个消息并且提交以后,会保存当前消费到的最 近的一个offset。那么offset保存在哪里? 

技术图片

 

 消息的存储

消息的保存路径
消息发送端发送消息到broker上以后,消息是如何持久化 的呢?那么接下来去分析下消息的存储 首先我们需要了解的是,kafka是使用日志文件的方式来保 存生产者和发送者的消息,每条消息都有一个 offset 值来 表示它在分区中的偏移量。Kafka中存储的一般都是海量的 消息数据,为了避免日志文件过大,Log 并不是直接对应 在一个磁盘上的日志文件,而是对应磁盘上的一个目录, 这个目录的明明规则是<topic_name>_<partition_id> 比如创建一个名为firstTopic的topic,其中有3个partition, 那么在kafka的数据目录(/tmp/kafka-log)中就有3个目 录,firstTopic-0~3

多个分区在集群中的分配 

如果我们对于一个topic,在集群中创建多个partition,那 么partition是如何分布的呢?

1.将所有N Broker和待分配的i个Partition排序

2.将第i个Partition分配到第(i mod n)个Broker上 

技术图片

 

 paitition的高可用副本机制

我们已经知道Kafka的每个topic都可以分为多个Partition, 并且多个partition会均匀分布在集群的各个节点下。虽然 这种方式能够有效的对数据进行分片,但是对于每个 partition来说,都是单点的,当其中一个partition不可用 的时候,那么这部分消息就没办法消费。所以 kafka 为了提高partition的可靠性而提供了副本的概念(Replica) ,通 过副本机制来实现冗余备份。 每个分区可以有多个副本,并且在副本集合中会存在一个 leader的副本,所有的读写请求都是由leader副本来进行 处理。剩余的其他副本都做为 follower 副本,follower 副 本会从 leader 副本同步消息日志。这个有点类似 zookeeper中leader和follower的概念,但是具体的时间 方式还是有比较大的差异。所以我们可以认为,副本集会 存在一主多从的关系。 一般情况下,同一个分区的多个副本会被均匀分配到集群 中的不同broker上,当leader副本所在的broker出现故 障后,可以重新选举新的 leader 副本继续对外提供服务。 通过这样的副本机制来提高kafka集群的可用性。

副本分配算法

将所有N Broker和待分配的i个Partition排序.

将第i个Partition分配到第(i mod n)个Broker上.

将第i个Partition的第j个副本分配到第((i + j) mod n)个 Broker上. 

创建一个带副本机制的 topic 

通过下面的命令去创建带2个副本的topic,

./kafka-topics.sh --create --zookeeper 192.168.11.156:2181 --replication-factor 2 --partitions 3 -topic secondTopic 

然后我们可以在/tmp/kafka-log 路径下看到对应 topic 的 副本信息了。我们通过一个图形的方式来表达。

? 针对 secondTopic 这个 topic 的 3 个分区对应的 3 个副 本 

技术图片

 

 如何知道那个各个分区中对应的leader是谁呢?

在zookeeper服务器上,通过如下命令去获取对应分区的 信息, 比如下面这个是获取 secondTopic 第 1 个分区的状 态信息。 

get /brokers/topics/secondTopic/partitions/1/state

? {"controller_epoch":12,"leader":0,"version":1,"leader_ep och":0,"isr":[0,1]} 

leader表示当前分区的leader是那个broker-id。下图中。

绿色线条的表示该分区中的 leader 节点。其他节点就为 follower

技术图片

 

 Kafka 提供了数据复制算法保证,如果 leader 发生故障或 挂掉,一个新leader被选举并被接受客户端的消息成功写 入。Kafka确保从同步副本列表中选举一个副本为leader; leader 负责维护和跟踪 ISR(in-Sync replicas , 副本同步 队列)中所有 follower 滞后的状态。当 producer 发送一条 消息到broker后,leader写入消息并复制到所有follower。 消息提交之后才被成功复制到所有的同步副本。

? 既然有副本机制,就一定涉及到数据同步的概念,那接 下来分析下数据是如何同步的?

需要注意的是,大家不要把 zookeeper 的 leader 和 follower 的同步机制和 kafka 副本的同步机制搞混了。虽 然从思想层面来说是一样的,但是原理层面的实现是完全 不同的。

 kafka 副本机制中的几个概念 

Kafka 分区下有可能有很多个副本(replica)用于实现冗余, 从而进一步实现高可用。副本根据角色的不同可分为3类:
leader副本:响应clients端读写请求的副本

follower副本:被动地备份leader副本中的数据,不能响 应clients端读写请求。

ISR副本:包含了leader副本和所有与leader副本保持同 步的 follower 副本——如何判定是否与 leader 同步后面 会提到每个 Kafka 副本对象都有两个重要的属性:LEO 和 HW。注意是所有的副本,而不只是leader副本。 

LEO:即日志末端位移(log end offset),记录了该副本底层 日志(log)中下一条消息的位移值。注意是下一条消息!也 就是说,如果LEO=10,那么表示该副本保存了10条消息, 位移值范围是[0, 9]。另外,leader LEO和follower LEO的 更新是有区别的。我们后面会详细说

HW:即上面提到的水位值。对于同一个副本对象而言,其 HW值不会大于LEO值。小于等于HW值的所有消息都被 认为是“已备份”的(replicated)。同理,leader 副本和 follower副本的HW更新是有区别的 

副本协同机制 

刚刚提到了,消息的读写操作都只会由leader节点来接收 和处理。follower副本只负责同步数据以及当leader副本 所在的 broker 挂了以后,会从 follower 副本中选取新的 leader。 

技术图片

 

 

 写请求首先由 Leader 副本处理,之后 follower 副本会从 leader 上拉取写入的消息,这个过程会有一定的延迟,导 致follower副本中保存的消息略少于leader副本,但是只 要没有超出阈值都可以容忍。但是如果一个 follower 副本,出现异常,比如宕机、网络断开等原因长时间没有同步到 消息,那这个时候,leader就会把它踢出去。kafka通过ISR 集合来维护一个分区副本信息 

ISR

ISR 表示目前“可用且消息量与 leader 相差不多的副本集合, 这是整个副本集合的一个子集”。怎么去理解可用和相差不多 这两个词呢?具体来说,ISR集合中的副本必须满足两个条件 

1. 副本所在节点必须维持着与zookeeper的连接

2. 副本最后一条消息的 offset 与 leader 副本的最后一条消息 的 offset 之 间 的 差 值 不 能 超 过 指 定 的 阈值 (replica.lag.time.max.ms) replica.lag.time.max.ms:如果该 follower 在此时间间隔 内一直没有追上过leader的所有消息,则该follower就 会被剔除isr列表

? ISR 数 据 保 存 在 Zookeeper 的 /brokers/topics/<topic>/partitions/<partitionId>/stat e节点中

HW&LEO

 关于follower副本同步的过程中,还有两个关键的概念, HW(HighWatermark)和 LEO(Log End Offset). 这两个参数跟ISR集合紧密关联。HW标记了一个特殊的offset,当 消费者处理消息的时候,只能拉去到HW之前的消息,HW 之后的消息对消费者来说是不可见的。也就是说,取 partition 对应 ISR 中最小的 LEO 作为 HW,consumer 最 多只能消费到 HW 所在的位置。每个 replica 都有 HW, leader和follower各自维护更新自己的HW的状态。一条 消息只有被 ISR 里的所有 Follower 都从 Leader 复制过去 才会被认为已提交。这样就避免了部分数据被写进了 Leader,还没来得及被任何Follower复制就宕机了,而造 成数据丢失(Consumer 无法消费这些数据)。而对于 Producer 而言,它可以选择是否等待消息 commit,这可 以通过acks来设置。这种机制确保了只要ISR有一个或以 上的Follower,一条被commit的消息就不会丢失。

数据的同步过程 

了解了副本的协同过程以后,还有一个最重要的机制,就 是数据的同步过程。它需要解决 

1. 怎么传播消息

2. 在向消息发送端返回 ack 之前需要保证多少个 Replica 已经接收到这个消息 

数据的处理过程是 

Producer 在发布消息到某个 Partition 时,先通过ZooKeeper 找到该 Partition 的 Leader 【 get /brokers/topics/<topic>/partitions/2/state】,然后无论该 Topic的Replication Factor为多少(也即该Partition有多 少个Replica),Producer只将该消息发送到该Partition的 Leader。Leader会将该消息写入其本地Log。每个Follower 都从Leader pull数据。这种方式上,Follower存储的数据 顺序与Leader保持一致。Follower在收到该消息并写入其 Log后,向Leader发送ACK。一旦Leader收到了ISR中 的所有Replica的ACK,该消息就被认为已经commit了, Leader 将增加 HW(HighWatermark)并且向 Producer 发送 ACK。

初始状态

初始状态下,leader 和 follower 的 HW 和 LEO 都是 0, leader副本会保存remote LEO,表示所有follower LEO, 也会被初始化为0,这个时候,producer没有发送消息。 follower会不断地个leader发送FETCH请求,但是因为没 有数据,这个请求会被leader寄存,当在指定的时间之后 会 强 制 完 成 请 求 , 这 个 时 间 配 置 是 (replica.fetch.wait.max.ms),如果在指定时间内 producer 有消息发送过来,那么kafka会唤醒fetch请求,让leader 继续处理 

 

技术图片

这里会分两种情况,第一种是 leader 处理完 producer 请 求之后,follower 发送一个 fetch 请求过来、第二种是 follower 阻塞在 leader 指定时间之内,leader 副本收到 producer的请求。这两种情况下处理方式是不一样的。先 来看第一种情况 

follower 的 fetch 请求是当 leader 处理消息以后执行的 

生产者发送一条消息 

? leader 处理完 producer 请求之后,follower 发送一个 fetch请求过来 。状态图如下 

技术图片

 

 

 

 

 leader副本收到请求以后,会做几件事情 

1. 把消息追加到log文件,同时更新leader副本的LEO

2. 尝试更新leader HW值。这个时候由于follower副本还 没有发送fetch请求,那么leader的remote LEO仍然 是0。leader会比较自己的LEO以及remote LEO的值 发现最小值是0,与HW的值相同,所以不会更新HW

follower fetch 消息 

技术图片

 

 follower 发送fetch请求,leader副本的处理逻辑是:

1. 读取log数据、更新remote LEO=0(follower还没有写 入这条消息,这个值是根据 follower 的 fetch 请求中的 offset来确定的)

2. 尝试更新 HW,因为这个时候 LEO 和 remoteLEO 还是 不一致,所以仍然是HW=0

3. 把消息内容和当前分区的HW值发送给follower副本

follower副本收到response以后   

 

 

1. 将消息写入到本地log,同时更新follower的LEO

2. 更新 follower HW,本地的 LEO 和 leader 返回的 HW 进行比较取小的值,所以仍然是0 

第一次交互结束以后,HW仍然还是0,这个值会在下一次 follower发起fetch请求时被更新

 

技术图片

 

 follower发第二次fetch请求,leader收到请求以后 

1. 读取log数据

2. 更新remote LEO=1, 因为这次fetch携带的offset是 1.

3. 更新当前分区的HW,这个时候leader LEO和remote LEO都是1,所以HW的值也更新为1

4. 把数据和当前分区的HW值返回给follower副本,这个 时候如果没有数据,则返回为空 

follower副本收到response以后

1. 如果有数据则写本地日志,并且更新LEO

2. 更新follower的HW值 

到目前为止,数据的同步就完成了,意味着消费端能够消 费offset=0这条消息。

follower 的 fetch 请求是直接从阻塞过程中触发 

前面说过,由于 leader 副本暂时没有数据过来,所以 follower 的 fetch 会被阻塞,直到等待超时或者 leader 接 收到新的数据。当leader收到请求以后会唤醒处于阻塞的 fetch请求。处理过程基本上和前面说的一直 

1. leader将消息写入本地日志,更新Leader的LEO

2. 唤醒follower的fetch请求

3. 更新HW 

kafka使用HW和LEO的方式来实现副本数据的同步,本身是一个好的设计,但是在这个地方会存在一个数据丢失 的问题,当然这个丢失只出现在特定的背景下。我们回想 一下,HW的值是在新的一轮FETCH 中才会被更新。我们 分析下这个过程为什么会出现数据丢失 

数据丢失的问题

前提:min.insync.replicas=1的时候。 ->设定ISR中的最小 副本数是多少,默认值为 1, 当且仅当 acks 参数设置为-1 (表示需要所有副本确认)时,此参数才生效. 表达的含义 是,至少需要多少个副本同步才能表示消息是提交的

所以,当min.insync.replicas=1的时候

一旦消息被写入 leader 端 log 即被认为是“已提交”,而延 迟一轮 FETCH RPC 更新 HW 值的设计使得 follower HW 值是异步延迟更新的,倘若在这个过程中leader发生变更, 那么成为新 leader 的 follower 的 HW 值就有可能是过期 的,使得clients端认为是成功提交的消息被删除。

技术图片

 

 数据丢失的解决方案

在 kafka0.11.0.0 版本以后,提供了一个新的解决方案,使 用leader epoch来解决这个问题,leader epoch实际上是 一对之(epoch,offset), epoch 表示 leader 的版本号,从 0 开始,当leader变更过1次时epoch就会+1,而 offset则 对应于该 epoch 版本的 leader 写入第一条消息的位移。 比如说

(0,0) ; (1,50); 

表示第一个leader从offset=0开始写消息, 一共写了50条,第二个leader版本号是1,从50条处开 始写消息。这个信息保存在对应分区的本地磁盘文件中, 文 件 名 为 : /tml/kafka-log/topic/leader-epochcheckpoint

leader broker中会保存这样的一个缓存,并定期地写入到 一个checkpoint文件中。 当 leader 写 log 时它会尝试更新整个缓存——如果这个 leader 首次写消息,则会在缓存中增加一个条目;否则就 不做更新。而每次副本重新成为leader时会查询这部分缓 存,获取出对应leader版本的offset 

技术图片

 

 如何处理所有的Replica不工作的情况 

在 ISR 中至少有一个 follower 时,Kafka 可以确保已经 commit 的数据不丢失,但如果某个 Partition 的所有 Replica都宕机了,就无法保证数据不丢失了 

1. 等待 ISR 中的任一个 Replica“活”过来,并且选它作为 Leader 

2. 选择第一个“活”过来的Replica(不一定是ISR中的)作 为Leader

这就需要在可用性和一致性当中作出一个简单的折衷。 如果一定要等待 ISR 中的 Replica“活”过来,那不可用的时 间就可能会相对较长。而且如果ISR中的所有Replica都无 法“活”过来了,或者数据都丢失了,这个 Partition 将永远不可用。 

选择第一个“活”过来的 Replica 作为 Leader,而这个 Replica 不是 ISR 中的 Replica,那即使它并不保证已经包 含了所有已 commit 的消息,它也会成为 Leader 而作为 consumer 的数据源(前文有说明,所有读写都由 Leader 完成)。在我们课堂讲的版本中,使用的是第一种策略。

ISR 的设计原理

在所有的分布式存储中,冗余备份是一种常见的设计方式,而 常用的模式有同步复制和异步复制,按照kafka这个副本模型 来说 

如果采用同步复制,那么需要要求所有能工作的 Follower 副 本都复制完,这条消息才会被认为提交成功,一旦有一个 follower副本出现故障,就会导致HW无法完成递增,消息就 无法提交,消费者就获取不到消息。这种情况下,故障的 Follower副本会拖慢整个系统的性能,设置导致系统不可用 如果采用异步复制,leader副本收到生产者推送的消息后,就 认为次消息提交成功。follower 副本则异步从 leader 副本同 步。这种设计虽然避免了同步复制的问题,但是假设所有 follower副本的同步速度都比较慢他们保存的消息量远远落后 于 leader 副本。而此时 leader 副本所在的 broker 突然宕机, 则会重新选举新的 leader 副本,而新的 leader 副本中没有原来leader副本的消息。这就出现了消息的丢失。

kafka 权衡了同步和异步的两种策略,采用 ISR 集合,巧妙解 决了两种方案的缺陷:当follower副本延迟过高,leader副本 则会把该follower副本提出ISR集合,消息依然可以快速提交。 当 leader 副本所在的 broker 突然宕机,会优先将 ISR 集合中 follower 副本选举为 leader,新 leader 副本包含了 HW 之前 的全部消息,这样就避免了消息的丢失。

 

以上是关于消息中间件之Kafka的主要内容,如果未能解决你的问题,请参考以下文章

消息中间件之kafka

大数据消息中间件之Kafka02

消息中间件之kafka

消息中间件之kafka

MQ之Kafka

初识中间件之消息队列