[转载]kafka入门笔记

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了[转载]kafka入门笔记相关的知识,希望对你有一定的参考价值。

参考技术A 小马最近学习了《深入理解kafka 核心设计与实践原理》朱忠华 著 一书,机缘巧合中又看到了这篇文章,觉得整理得很是详细和全面,图文并茂很直观,在此摘录。

精华总结:依靠主题分区来类似分库分表的方式提高性能,用 副本主从 同步+ ISR(偏移量和HW) 来保证消息队列的可靠性,消费者提交 消费位移 来保证消息不丢失和重复消费等,用ZK来处理 服务发现 ,负载均衡,选举,集群管理,消费位移记录(以被推荐记录于kafka主题内)等。

HW之前的消息才能被消费者拉取,理解为都同步备份完了,才算生产者消息提交成功,对消费者可见。这种ISR机制影响了性能但是保证了可靠性,保证消息不丢失。消费位移提交,默认的是自动提交,异常下消息会重复消费会丢失,但可以参数配置手动提交,自行在业务处理完再提交。消费者拉的方式自主获取消费,便于消费者自行控制消费速率。默认分区规则是哈希一致性方式。

相比 Redis消息队列 本身的可靠性就不如,被消费者拉取完就认为消费完了,消息丢失,所以一般需要自行维护ack机制。

Kafka的消息是保存或缓存在磁盘上的,一般认为在磁盘上读写数据是会降低性能的,因为寻址会比较消耗时间,但是实际上,Kafka的特性之一就是高吞吐率。即使是普通的服务器, Kafka也可以轻松支持每秒百万级的写入请求 ,超过了大部分的消息中间件,这种特性也使得Kafka在日志处理等海量数据场景广泛应用。 Kafka速度的秘诀在于 ,它把所有的消息都变成一个批量的文件,并且进行合理的批量压缩,减少网络IO损耗,通过mmap提高I/O速度,写入数据的时候由于单个Partion是末尾添加所以速度最优;读取数据的时候配合sendfile直接暴力输出。

一个典型的 Kafka 体系架构包括若干 Producer(消息生产者),若干 broker(作为 Kafka 节点的服务器),若干 Consumer(Group),以及一个 ZooKeeper 集群。Kafka通过 ZooKeeper 管理集群配置、选举 Leader 以及在 consumer group 发生变化时进行 Rebalance(即消费者负载均衡,在下一课介绍)。Producer 使用 push(推)模式将消息发布到 broker,Consumer 使用 pull(拉)模式从 broker 订阅并消费消息。

Kafka 节点的 broker涉及 Topic、Partition 两个重要概念

在 Kafka 架构中,有几个术语:

Producer :生产者,即消息发送者,push 消息到 Kafka 集群中的 broker(就是 server)中;

Broker :Kafka 集群由多个 Kafka 实例(server) 组成,每个实例构成一个 broker,说白了就是服务器;

Topic :producer 向 kafka 集群 push 的消息会被归于某一类别,即Topic,这本质上只是一个逻辑概念,面向的对象是 producer 和 consumer,producer 只需要关注将消息 push 到哪一个 Topic 中,而 consumer 只需要关心自己订阅了哪个 Topic;

Partition :每一个 Topic 又被分为多个 Partitions,即物理分区;出于负载均衡的考虑,同一个 Topic 的 Partitions 分别存储于 Kafka 集群的多个 broker 上;而为了提高可靠性,这些 Partitions 可以由 Kafka 机制中的 replicas 来设置备份的数量;如上面的框架图所示,每个 partition 都存在两个备份;

Consumer :消费者,从 Kafka 集群的 broker 中 pull 消息、消费消息;

Consumer group :high-level consumer API 中,每个 consumer 都属于一个 consumer-group,每条消息只能被 consumer-group 中的一个 Consumer 消费,但可以被多个 consumer-group 消费;

replicas :partition 的副本,保障 partition 的高可用;

leader :replicas 中的一个角色, producer 和 consumer 只跟 leader 交互;

follower :replicas 中的一个角色,从 leader 中复制数据,作为副本,一旦 leader 挂掉,会从它的 followers 中选举出一个新的 leader 继续提供服务;

controller :Kafka 集群中的其中一个服务器,用来进行 leader election 以及 各种 failover;

ZooKeeper :Kafka 通过 ZooKeeper 来存储集群的 meta 信息等,文中将详述。

一个 topic 可以认为是一类消息,每个 topic 将被分成多个 partition,每个 partition 在存储层面是 append log 文件。任何发布到此 partition 的消息都会被追加到log文件的尾部,每条消息在文件中的位置称为 offset(偏移量),offset 为一个 long 型的数字,它唯一标记一条消息。 Kafka 机制中,producer push 来的消息是追加(append)到 partition 中的,这是一种顺序写磁盘的机制,效率远高于随机写内存,如下示意图:

Kafka 中 topic 的每个 partition 有一个预写式的日志文件,虽然 partition 可以继续细分为若干个 segment 文件,但是对于上层应用来说,仍然可以将 partition 看成最小的存储单元(一个有多个 segment 文件拼接的 “巨型” 文件),每个 partition 都由一些列有序的、不可变的消息组成,这些消息被连续的追加到 partition 中。

上图中有两个新名词:HW 和 LEO。这里先介绍下 LEO,LogEndOffset 的缩写,表示每个 partition 的 log 最后一条 Message 的位置。HW 是 HighWatermark 的缩写,是指 consumer 能够看到的此 partition 的位置,这个涉及到多副本的概念,这里先提及一下,下文再详述。

言归正传,为了提高消息的可靠性,Kafka 每个 topic 的 partition 有 N 个副本(replicas),其中 N(大于等于 1)是 topic 的复制因子(replica fator)的个数。Kafka 通过多副本机制实现故障自动转移,当 Kafka 集群中出现 broker 失效时,副本机制可保证服务可用。对于任何一个 partition,它的 N 个 replicas 中,其中一个 replica 为 leader,其他都为 follower,leader 负责处理 partition 的所有读写请求,follower 则负责被动地去复制 leader 上的数据。如下图所示,Kafka 集群中有 4 个 broker,某 topic 有 3 个 partition,且复制因子即副本个数也为 3:

如果 leader 所在的 broker 发生故障或宕机,对应 partition 将因无 leader 而不能处理客户端请求,这时副本的作用就体现出来了:一个新 leader 将从 follower 中被选举出来并继续处理客户端的请求。

上一节中讲到了同步副本队列 ISR(In-Sync Replicas)。虽然副本极大的增强了可用性,但是副本数量对 Kafka 的吞吐率有一定影响。默认情况下 Kafka 的 replica 数量为 1,即每个 partition 都只有唯一的 leader,无 follower,没有容灾能力。为了确保消息的可靠性,生产环境中,通常将其值(由 broker 的参数 offsets.topic.replication.factor 指定)大小设置为大于 1,比如 3。 所有的副本(replicas)统称为 Assigned Replicas,即 AR。ISR 是 AR 中的一个子集,由 leader 维护 ISR 列表,follower 从 leader 同步数据有一些延迟(由参数 replica.lag.time.max.ms 设置超时阈值),超过阈值的 follower 将被剔除出 ISR, 存入 OSR(Outof-Sync Replicas)列表,新加入的 follower 也会先存放在 OSR 中。AR=ISR+OSR。

上面一节还涉及到一个概念,即 HW。HW 俗称高水位,HighWatermark 的缩写,取一个 partition 对应的 ISR 中最小的 LEO 作为 HW,consumer 最多只能消费到 HW 所在的位置。另外每个 replica 都有 HW,leader 和 follower 各自负责更新自己的 HW 的状态。对于 leader 新写入的消息,consumer 不能立刻消费,leader 会等待该消息被所有 ISR 中的 replicas 同步后更新 HW,此时消息才能被 consumer 消费。这样就保证了如果 leader 所在的 broker 失效,该消息仍然可以从新选举的 leader 中获取。对于来自内部 broker 的读取请求,没有 HW 的限制。

下图详细的说明了当 producer 生产消息至 broker 后,ISR 以及 HW 和 LEO 的流转过程:

由此可见,Kafka 的复制机制既不是完全的同步复制,也不是单纯的异步复制。事实上,同步复制要求所有能工作的 follower 都复制完,这条消息才会被 commit,这种复制方式受限于复制最慢的 follower,会极大的影响吞吐率。而异步复制方式下,follower 异步的从 leader 复制数据,数据只要被 leader 写入 log 就被认为已经 commit,这种情况下如果 follower 都还没有复制完,落后于 leader 时,突然 leader 宕机,则会丢失数据,降低可靠性。而 Kafka 使用 ISR 的策略则在可靠性和吞吐率方面取得了较好的平衡。

Kafka 的 ISR 的管理最终都会反馈到 ZooKeeper 节点上,具体位置为:

/brokers/topics/[topic]/partitions/[partition]/state

目前,有两个地方会对这个 ZooKeeper 的节点进行维护。

Controller 来维护:Kafka 集群中的其中一个 Broker 会被选举为 Controller,主要负责 Partition 管理和副本状态管理,也会执行类似于重分配 partition 之类的管理任务。在符合某些特定条件下,Controller 下的 LeaderSelector 会选举新的 leader,ISR 和新的 leader_epoch 及 controller_epoch 写入 ZooKeeper 的相关节点中。同时发起 LeaderAndIsrRequest 通知所有的 replicas。

leader 来维护:leader 有单独的线程定期检测 ISR 中 follower 是否脱离 ISR,如果发现 ISR 变化,则会将新的 ISR 的信息返回到 ZooKeeper 的相关节点中。

考虑这样一种场景:acks=-1,部分 ISR 副本完成同步,此时leader挂掉,如下图所示:follower1 同步了消息 4、5,follower2 同步了消息 4,与此同时 follower2 被选举为 leader,那么此时 follower1 中的多出的消息 5 该做如何处理呢?

类似于木桶原理,水位取决于最低那块短板。

如上图,某个 topic 的某 partition 有三个副本,分别为 A、B、C。A 作为 leader 肯定是 LEO 最高,B 紧随其后,C 机器由于配置比较低,网络比较差,故而同步最慢。这个时候 A 机器宕机,这时候如果 B 成为 leader,假如没有 HW,在 A 重新恢复之后会做同步(makeFollower) 操作,在宕机时 log 文件之后直接做追加操作,而假如 B 的 LEO 已经达到了 A 的 LEO,会产生数据不一致的情况,所以使用 HW 来避免这种情况。 A 在做同步操作的时候,先将 log 文件截断到之前自己的 HW 的位置,即 3,之后再从 B 中拉取消息进行同步。

如果失败的 follower 恢复过来,它首先将自己的 log 文件截断到上次 checkpointed 时刻的 HW 的位置,之后再从 leader 中同步消息。leader 挂掉会重新选举,新的 leader 会发送 “指令” 让其余的 follower 截断至自身的 HW 的位置然后再拉取新的消息。

当 ISR 中的个副本的 LEO 不一致时,如果此时 leader 挂掉,选举新的 leader 时并不是按照 LEO 的高低进行选举,而是按照 ISR 中的顺序选举。

在 consumer 对指定消息 partition 的消息进行消费的过程中,需要定时地将 partition 消息的 消费进度 Offset 记录到 ZooKeeper上 ,以便在该 consumer 进行重启或者其它 consumer 重新接管该消息分区的消息消费权后,能够从之前的进度开始继续进行消息消费。Offset 在 ZooKeeper 中由一个专门节点进行记录,其节点路径为:

#节点内容就是Offset的值。/consumers/[group_id]/offsets/[topic]/[broker_id-partition_id]

PS:Kafka 已推荐将 consumer 的 Offset 信息保存在 Kafka 内部的 topic 中,即:

__consumer_offsets(/brokers/topics/__consumer_offsets)

并且默认提供了 kafka_consumer_groups.sh 脚本供用户查看consumer 信息(命令:sh kafka-consumer-groups.sh –bootstrap-server * –describe –group *)。在当前版本中,offset 存储方式要么存储在本地文件中,要么存储在 broker 端,具体的存储方式取决 offset.store.method 的配置,默认是存储在 broker 端。

在基于 Kafka 的分布式消息队列中,ZooKeeper 的作用有:broker 注册、topic 注册、producer 和 consumer 负载均衡、维护 partition 与 consumer 的关系、记录消息消费的进度以及 consumer 注册等。

参考原文:

再谈基于 Kafka 和 ZooKeeper 的分布式消息队列原理

Apache Kafka 核心组件和流程-日志管理器-设计-原理(入门教程轻松学)

作者:稀有气体
来源:CSDN
原文:https://blog.csdn.net/liyiming2017/article/details/82805479
版权声明:本文为博主原创文章,转载请附上博文链接!

本入门教程,涵盖Kafka核心内容,通过实例和大量图表,帮助学习者理解,任何问题欢迎留言。

目录:

上一节介绍了协调器。协调器主要负责消费者和kafka集群间的协调。那么消费者消费时,如何定位消息呢?消息是如何存储呢?本节将为你揭开答案。

3 日志管理器

3.1 日志的存储

Kafka的消息以日志文件的形式进行存储。不同主题下不同分区的消息是分开存储的。同一个分区的不同副本也是以日志的形式,分布在不同的broker上存储。

这样看起来,日志的存储是以副本为单位的。在程序逻辑上,日志确实是以副本为单位的,每个副本对应一个log对象。但实际在物理上,一个log又划分为多个logSegment进行存储。

举个例子,创建一个topic名为test,拥有3个分区。为了简化例子,我们设定只有1个broker,1个副本。那么所有的分区副本都存储在同一个broker上。

第二章中,我们在kafka的配置文件中配置了log.dirs=/tmp/kafka-logs。此时在/tmp/kafka-logs下面会创建test-0,test-1,test-2三个文件夹,代表三个分区。命名规则为“topic名称-分区编号”

我们看test-0这个文件夹,注意里面的logSegment并不代表这个文件夹,logSegment代表逻辑上的一组文件,这组文件就是.log、.index、.timeindex这三个不同文件扩展名,但是同文件名的文件。

  1. .log存储消息
  2. .index存储消息的索引
  3. .timeIndex,时间索引文件,通过时间戳做索引。

这三个文件配合使用,用来保存和消费时快速查找消息。

刚才说到同一个logSegment的三个文件,文件名是一样的。命名规则为.log文件中第一条消息的前一条消息偏移量,也称为基础偏移量,左边补0,补齐20位。比如说第一个LogSegement的日志文件名为00000000000000000000.log,假如存储了200条消息后,达到了log.segment.bytes配置的阈值(默认1个G),那么将会创建新的logSegment,文件名为00000000000000000200.log。以此类推。另外即使没有达到log.segment.bytes的阈值,而是达到了log.roll.ms或者log.roll.hours设置的时间触发阈值,同样会触发产生新的logSegment。

 

3.2 日志定位

日志定位也就是消息定位,输入一个消息的offset,kafka如何定位到这条消息呢?

日志定位的过程如下:

1、根据offset定位logSegment。(kafka将基础偏移量也就是logsegment的名称作为key存在concurrentSkipListMap中)

2、根据logSegment的index文件查找到距离目标offset最近的被索引的offset的position x。

3、找到logSegment的.log文件中的x位置,向下逐条查找,找到目标offset的消息。

结合下图中例子,我再做详细的讲解:

这里先说明一下.index文件的存储方式。.index文件中存储了消息的索引,存储内容是消息的offset及物理位置position。并不是每条消息都有自己的索引,kafka采用的是稀疏索引,说白了就是隔n条消息存一条索引数据。这样做比每一条消息都建索引,查找起来会慢,但是也极大的节省了存储空间。此例中我们假设跨度为2,实际kafka中跨度并不是固定条数,而是取决于消息累积字节数大小。

例子中consumer要消费offset=15的消息。我们假设目前可供消费的消息已经存储了三个logsegment,分别是00000000000000000,0000000000000000010,0000000000000000020。为了讲解方便,下面提到名称时,会把前面零去掉。

下面我们详细讲一下查找过程。

  1. kafka收到查询offset=15的消息请求后,通过二分查找,从concurrentSkipListMap中找到对应的logsegment名称,也就是10。
  2. 从10.index中找到offset小于等于15的最大值,offset=14,它对应的position=340
  3. 从10.log文件中物理位置340,顺序往下扫描文件,找到offset=15的消息内容。

可以看到通过稀疏索引,kafka既加快了消息查找的速度,也顾及了存储的开销。

以上是关于[转载]kafka入门笔记的主要内容,如果未能解决你的问题,请参考以下文章

JavaScript 入门笔记 - 上 - 基本语法

ADO.NET 学习笔记 入门教程

spark学习笔记——sparkStreaming-概述/特点/构架/DStream入门程序wordcount

jQuery学习笔记:入门

jQuery学习笔记:入门

docker入门详细笔记