大数据之Kafka看这一篇就够了
Posted 进击的-小胖子
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了大数据之Kafka看这一篇就够了相关的知识,希望对你有一定的参考价值。
Kafka
异步通信原理
观察者模式
- 观察者模式(Observer),又叫发布-订阅模式(Publish/Subscribe)
- 定义对象间一种一对多的依赖关系,使得每当一个对象改变状态,则所有依赖于它的对象都会得到通知并自动更新。
- 一个对象(目标对象)的状态发生改变,所有的依赖对象(观察者对象)都将得到通知。
生产者消费者模式
-
传统模式
-
生产者直接将消息传递给指定的消费者
-
耦合性特别高,当生产者或者消费者发生变化,都需要重写业务逻辑
- 生产者消费者模式
-
-
生产者消费者模式
-
通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯
-
数据传递流程
-
生产者负责向缓冲区里面添加数据单元
-
消费者负责从缓冲区里面取出数据单元
- 一般遵循先进先出的原则
-
-
缓冲区
-
解耦
- 假设生产者和消费者分别是两个类。如果让生产者直接调用消费者的某个方法,那么生产者对于消费者就会产生依赖
-
支持并发
- 生产者直接调用消费者的某个方法过程中函数调用是同步的
- 万一消费者处理数据很慢,生产者就会白白糟蹋大好时光
-
支持忙闲不均
- 缓冲区还有另一个好处。如果制造数据的速度时快时慢,缓冲区的好处就体现出来了。
- 当数据制造快的时候,消费者来不及处理,未处理的数据可以暂时存在缓冲区中。
- 等生产者的制造速度慢下来,消费者再慢慢处理掉。
数据单元
-
关联到业务对象
- 数据单元必须关联到某种业务对象
-
完整性
- 就是在传输过程中,要保证该数据单元的完整
-
独立性
- 就是各个数据单元之间没有互相依赖
- 某个数据单元传输失败不应该影响已经完成传输的单元;也不应该影响尚未传输的单元。
-
颗粒度
- 数据单元需要关联到某种业务对象。那么数据单元和业务对象应该处于的关系(一对一?一对多)
- 如果颗粒度过小会增加数据传输的次数
- 如果颗粒度过大会增加单个数据传输的时间,影响后期消费
消息系统原理
点对点消息传递
- 在点对点消息系统中,消息持久化到一个队列中一条消息只能被消费一次
- 当一个消费者消费了队列中的某条数据之后,该条数据则从消息队列中删除
- 该模式即使有多个消费者同时消费数据,也能保证数据处理的顺序
- 基于推送模型的消息系统,由消息代理记录消费状态
发布订阅消息传递
- 在发布-订阅消息系统中,消息被持久化到一个topic中
- 消费者可以订阅一个或多个topic,消费者可以消费该topic中所有的数据,同一条数据可以被多个消费者消费,数据被消费后不会立马删除
- Kafka 采取拉取模型(Poll),由自己控制消费速度,消费者可以按照任意的偏移量进行消费
Kafka简介
概述
-
流处理平台,本质上是一个 MQ(Message Queue)
-
使用消息队列的好处
- 解耦:允许我们独立的扩展或修改队列两边的处理过程。
- 可恢复性:即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理。
- 缓冲:有助于解决生产消息和消费消息的处理速度不一致的情况。
- 灵活性&峰值处理能力:不会因为突发的超负荷的请求而完全崩溃,消息队列能够使关键组件顶住突发的访问压力。
- 异步通信:消息队列允许用户把消息放入队列但不立即处理它
设计目标
- 以时间复杂度为O(1)的方式提供消息持久化能力,即使对TB级以上数据也能保证常数时间的访问性能。
- 高吞吐率。即使在非常廉价的商用机器上也能做到单机支持每秒100K条消息的传输。
- 支持Kafka Server间的消息分区,及分布式消费,同时保证每个partition内的消息顺序传输。
- 同时支持离线数据处理和实时数据处理。
- 支持在线水平扩展
Kafka的优点
-
解耦
-
冗余
-
扩展性
- 因为消息队列解耦了你的处理过程,所以增大消息入队和处理的频率是很容易的,只要另外增加处理过程即可
-
灵活性&峰值处理能力
- 在访问量剧增的情况下,应用仍然需要继续发挥作用
-
可恢复性
- 系统的一部分组件失效时,不会影响到整个系统
- 消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理
-
顺序保证
- Kafka保证一个Partition内的消息的有序性
-
缓冲
- 消息队列通过一个缓冲层来帮助任务最高效率的执行
-
异步通信
- 消息队列提供了异步处理机制,允许用户把一个消息放入队列,但并不立即处理它
Kafka系统架构
Broker
- Kafka 集群包含一个或多个服务器,服务器节点称为broker
Topic
- 每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。
- 类似于数据库的table或者ES的Index
- 物理上不同Topic的消息分开存储
- 逻辑上一个Topic的消息虽然保存于一个或多个broker上但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存于何处)
Partition
-
topic中的数据分割为一个或多个partition
-
每个topic至少有一个partition,当生产者产生数据的时候,根据分配策略,选择分区,然后将消息追加到指定的分区的末尾(队列)
-
每条消息都会有一个自增的编号
- 标识顺序
- 用于标识消息的偏移量
- 每个Partition都有自己独立的编号
-
每个partition中的数据使用多个segment文件存储。
-
partition中的数据是有序的,不同partition间的数据丢失了数据的顺序。
-
如果topic有多个partition,消费数据时就不能保证数据的顺序。严格保证消息的消费顺序的场景下,需要将partition数目设为1
Leader
- 每个partition有多个副本,其中有且仅有一个作为Leader,Leader是当前负责数据的读写的partition
Follower
- Follower跟随Leader,所有写请求都通过Leader路由,数据变更会广播给所有Follower,Follower与Leader保持数据同步。
- 如果Leader失效,则从Follower中选举出一个新的Leader。
- 当Follower挂掉、卡住或者同步太慢,leader会把这个follower从“in sync replicas”(ISR)列表中删除,重新创建一个Follower
replication
-
数据会存放到topic的partation中,但是有可能分区会损坏
-
我们需要对分区的数据进行备份(备份多少取决于你对数据的重视程度)
-
我们将分区的分为Leader(1)和Follower(N)
- Leader负责写入和读取数据
- Follower只负责备份
- 保证了数据的一致性
-
备份数设置为N,表示主+备=N(参考HDFS)
producer
- 生产者即数据的发布者,该角色将消息发布到Kafka的topic中。
- broker接收到生产者发送的消息后,broker将该消息追加到当前用于追加数据的segment文件中。
- 生产者发送的消息,存储到一个partition中,生产者也可以指定数据存储的partition
consumer
- 消费者可以从broker中读取数据。消费者可以消费多个topic中的数据。
Consumer Group
- 每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group name,若不指定group name则属于默认的group)。
- 将多个消费者集中到一起去处理某一个Topic的数据,可以更快的提高数据的消费能力
- 整个消费者组共享一组偏移量(防止数据被重复读取),因为一个Topic有多个分区
offset偏移量
- 可以唯一的标识一条消息
- 偏移量决定读取数据的位置,不会有线程安全的问题,消费者通过偏移量来决定下次读取的消息
- 消息被消费之后,并不被马上删除,这样多个业务就可以重复使用kafka的消息
- 我们某一个业务也可以通过修改偏移量达到重新读取消息的目的,偏移量由用户控制
- 消息最终还是会被删除的,默认生命周期为1周(7*24小时)
Zookeeper
- kafka 通过 zookeeper 来存储集群的 meta 信息
- 新版本由partition自己保存
- 辅助选举
Kafka数据存储
topic在物理层面以partition为分组,一个topic可以分成若干个partition
partition还可以细分为Segment,一个partition物理上由多个Segment组成
-
segment 的参数有两个
- log.segment.bytes:单个segment可容纳的最大数据量,默认为1GB
- log.segment.ms:Kafka在commit一个未写满的segment前,所等待的时间(默认为7天)
LogSegment 文件由两部分组成,分别为“.index”文件和“.log”文件,分别表示为 Segment 索引文件和数据文件
- partition全局的第一个segment从0开始,后续每个segment文件名为上一个segment文件最后一条消息的offset值
- 数值大小为64位,20位数字字符长度,没有数字用0填充
消息都具有固定的物理结构,包括:offset(8 Bytes)、消息体的大小(4 Bytes)、crc32(4 Bytes)、magic(1 Byte)、attributes(1 Byte)、key length(4 Bytes)、key(K Bytes)、payload(N Bytes)等等字段,可以确定一条消息的大小,即读取到哪里截止
“.index” 文件存储大量的索引信息,“.log” 文件存储大量的数据,索引文件中的元数据指向对应数据文件中 Message 的物理偏移量
- .index中顺序存储,存在硬盘中,预分配10M
其他文件
- .index 位移索引
- .timeindex 时间戳索引
- .snapshot文件,记录了producer的事务信息。(todo)
生产者数据安全
数据分区
-
分区原因
- 方便在集群中扩展,每个 Partition 可以通过调整以适应它所在的机器,
- 一个 Topic 又可以有多个 Partition 组成,因此可以以 Partition 为单位读写了。
- 可以提高并发,因此可以以 Partition 为单位读写了
数据可靠性保证
-
ACK机制
- 为保证 Producer 发送的数据,能可靠地发送到指定的 Topic
- Topic 的每个 Partition 收到 Producer 发送的数据后,都需要向 Producer 发送ACK(ACKnowledge 确认收到)。
- 如果 Producer 收到 ACK,就会进行下一轮的发送,否则重新发送数据。
-
ACK时机
-
部分 Follower 与 Leader 同步完成,Leader 发送 ACK
-
全部 Follower 与 Leader 同步完成,Leader 发送 ACK。
-
ISR(部分)
-
关键词
- AR : Assigned Replicas 用来标识副本的全集
- OSR :out -sync Replicas 离开同步队列的副本
- ISR :in -sync Replicas 加入同步队列的副本
- ISR = Leader + 没有落后太多的副本;
- AR = OSR+ ISR。
-
Leader维护了一个动态的 in-sync replica set(ISR 和 Leader 保持同步的 Follower 集合)
-
当 ISR 集合中的 Follower 完成数据的同步之后,Leader 就会给 Follower 发送 ACK。
-
如果 Follower 长时间未向 Leader 同步数据,则该 Follower 将被踢出 ISR 集合,判断标准
-
超过10秒钟没有同步数据
- replica.lag.time.max.ms=10000
-
主副节点差4000条数据
- rerplica.lag.max.messages=4000
-
-
Leader 发生故障后,就会从 ISR 中选举出新的 Leader。 可设置从OSR中选举
- kafka采用一种降级措施来处理:
- 选举第一个恢复的node作为leader提供服务,以它的数据为基准,这个措施被称为脏leader选举
-
-
-
ACK应答机制
-
Kafka 为用户提供了三种可靠性级别,用户根据可靠性和延迟的要求进行权衡
-
acks=0
- 这意味着 Producer 无需等待来自 Leader的确认而继续发送下一批消息
- 当 Broker 故障时有可能丢失数据
- 最多一条
-
acks=1
- Producer 在 ISR 中的 Leader 已成功收到的数据并得到确认后发送下一条 Message
- 如果在 Follower 同步成功之前 Leader 故障,那么将会丢失数据
- 最少一条
-
acks=-1
- Producer 需要等待 ISR 中的所有 Follower 都确认接收到数据后才算一次发送完成,可靠性最高
- 不会出现脏选举
- 在 Broker 发送 ACK 时,Leader 发生故障,则会造成数据重复
- 最少一条
-
-
-
故障处理
-
LEO:每个副本最大的 Offset
-
HW:消费者能见到的最大的 Offset,ISR 队列中最小的 LEO
-
Follower 故障
- Follower 发生故障后会被临时踢出 ISR 集合,待该 Follower 恢复后,Follower 会 读取本地磁盘记录的上次的 HW,并将 log 文件高于 HW 的部分截取掉,从 HW 开始向 Leader 进行同步数据操作
- 等该 Follower 的 LEO 大于等于该 Partition 的 HW,即 Follower 追上 Leader 后,就可以重新加入 ISR 了
-
Leader 故障
- Leader 发生故障后,会从 ISR 中选出一个新的 Leader,之后,为保证多个副本之间的数据一致性,其余的 Follower 会先将各自的 log 文件高于 HW 的部分截掉,然后从新的 Leader 同步数据
- 这只能保证副本之间的数据一致性,并不能保证数据不丢失或者不重复
- ack=-1可以避免数据丢失
-
-
Exactly Once 语义
-
将服务器的 ACK 级别设置为 -1或1,可以保证 Producer 到 Server 之间不会丢失数据,即 At LeastOnce 语义
-
将服务器 ACK 级别设置为 0,可以保证生产者每条消息只会被发送一次,即 At Most Once 语义
- At Least Once 可以保证数据不丢失,但是不能保证数据不重复;
- At Most Once 可以保证数据不重复,但是不能保证数据不丢失。
-
Exactly Once
- 重要数据既不重复也不丢失
- 0.11 版本的 Kafka,引入了幂等性:Producer 不论向 Server 发送多少重复数据,Server 端都只会持久化一条
-
消费者数据安全
消费方式
- Consumer 采用 Pull(拉取)模式从 Broker 中读取数据
- Pull 模式不足之处是,如果 Kafka 没有数据,消费者可能会陷入循环中,一直返回空数据
- 因为消费者从 Broker 主动拉取数据,需要维护一个长轮询,针对这一点, Kafka 的消费者在消费数据时会传入一个时长参数 timeout
- 如果当前没有数据可供消费,Consumer 会等待一段时间之后再返回,这段时长即为 timeout
分区分配策略
-
概述
-
将分区的所有权从一个消费者移到另一个消费者称为重新平衡(rebalance)
-
分区分配的时机
- 同一个 Consumer Group 内新增消费者
- 消费者离开当前所属的Consumer Group,包括shuts down 或 crashes
- 订阅的主题新增分区
-
Kafka 有三种分配策略
- 一个是RangeAssignor(默认)
- 一个是 RoundRobinAssignor
- 一个是StickyAssignor(0.11.x版本开始引入)
-
-
RangeAssignor分配策略
-
原理
- 按照消费者总数和分区总数进行整除运算来获得一个跨度,然后将分区按照跨度进行平均分配
- 如果不够平均分配,那么字典序靠前的消费者会被多分配一个分区且同一个topic内分到的分区连续
-
例
-
-
RoundRobinAssignor分配策略
-
策略的原理是将消费组内所有消费者以及消费者所订阅的所有topic的partition按照字典序排序,然后通过轮询消费者方式逐个将分区分配给每个消费者
-
例
-
消费者订阅相同 Topic
-
消费者订阅不同 Topic
-
-
-
StickyAssignor分配策略
-
“sticky”这个单词可以翻译为“粘性的”,Kafka从0.11.x版本开始引入这种分配策略
-
目的
- ① 分区的分配要尽可能的均匀;
- ② 分区的分配尽可能的与上次分配的保持相同。
- ③ 当两者发生冲突时,第一个目标优先于第二个目标。
-
OffSet
-
生产端offset
-
Kafka接收到生产者发送的消息实际上是以日志文件的形式保存在对应分区的磁盘上。每条消息都有一个offset值来表示它在分区中的位置。每次写入都是追加到文件的末尾
-
如上图所示,它代表一个日志文件,这个日志文件中有 9 条消息
- 第一条消息的 offset( logStartOffset)为 0,最后一条消息的 offset 为 8,LEO(Log EndOffset)为 9 ,代表下一条待写入的消息。
- 日志文件的 HW(Low Watermark)为 6,表示消费者只能拉取到 offset 在 0 至 5 之间的消息, 而 offset 为 6 的消息对消费者而言是不可见的。
- 每个分区副本都会维护自身的LEO,而ISR集合中最小的LEO即为分区的HW。
-
-
消费端offset
-
消费者在消费时,也维护一个offset,表示消费到分区中的某个消息所在的位置。
-
如上图所示,ConsumerA的offset=9,表示ConsumerA已经消费完offset为8的那条数据,提交的offset值为9,下次消费从offset为9的数据开始消费
-
消费者提交的offset值维护在consumer_offsets这个Topic中,具体维护在哪个分区中,是由消费者所在的消费者组groupid决定
- 计算方式是:groupid的hashCode值对50取余
-
消费者提交offset方式可以是手动提交也可以是自动提交,相关的参数设置是enable.auto.commit
- 参数默认为true,表示每5秒拉取分区中最大的消息位移进行提交。
- 参数设置为false时,需要手动提交offset
-
提交方式有同步提交(commitSync)和异步提交(commitAsync)两种方式
- 同步提交会根据poll方法拉取最新位移进行提交,只要没有发生不可恢复的错误,它就会阻塞消费线程直至提交完成
- 异步提交执行时不会阻塞消费线程,但有可能出现先提交的位移失败了而后提交的位移成功了,
- 如果重试,就会发生重复消费。对此,可设置递增的序号来维护异步提交顺序,也可以在退出或者rebalance前使用同步提交。
-
消费者消费时
- 如果没有对应的offset记录会按auto.offset.reset的配置来消费,默认值为latest,表示从分区末尾开始消费。
- 如果配置为earliest表示从分区起始处开始消费。在代码中也可以通过seek()方法指定分区具体的offset处开始消费。
- 另外,我们也可以重置消费者组的offset
-
消费者消费提交的offset也会被定期清理,对应的参数是
-
offsets.retention.check.interval.ms:
- offset定期检查数据过期周期
-
offsets.retention.minutes
- offset保留时长超过offsets.retention.minutes时间且offset没有改变时,消费者提交的offset会被清理掉
- 再次消费时会按auto.offset.reset配置去消费。此时,会有数据丢失或者重复,可通过重置offset来解决
-
-
存储位置
- Kafka 0.9 版本之前,Consumer 默认将 Offset 保存在 Zookeeper 中,
- 从 0.9 版本开始,Consumer 默认将 Offset 保存在 Kafka 一个内置的 Topic 中,该 Topic 为__consumer_offsets
-
-
checkpoint的offset
-
Consumer重置Offset
- 更新Offset由三个维度决定:Topic的作用域、重置策略、执行方案
Kafka的事务性
前言
- 幂等和事务是Kafka 0.11.0.0版本引入的两个特性,以此来实现EOS(exactly once semantics,精确一次处理语义)
Kafka幂等性
-
幂等,就是指多接口的多次调用所产生的结果和只调用一次是一致的。没有幂等性的情况下就会重复发送数据
-
Kafka的幂等性机制能保证单个分区不会重复写入数据,而实现幂等性的核心就是引入了producerid 和 sequence number这两个概念
-
判断流程
-
理解一
-
每个新的生产者实例在初始化的时候都会被分配一个PID,这个PID对用户而言是完全透明的
-
对于每个PID,消息发送到的每一个分区都有对应的序列号,这些序列号从0开始单调递增。生产者每发送一条消息就会将对应的序列号的值加1
-
broker端会在内存中为每一对维护一个序列号
- 如果SN_new = SN_old + 1时,broker才会接收它。
- 如果SN_new< SN_old + 1,那么说明消息被重复写入,broker可以直接将其丢弃。
- 如果SN_new> SN_old + 1,那么说明中间有数据尚未写入,出现了乱序,暗示可能有消息丢失,这个异常是一个严重的异常。
-
-
理解二
- Kafka内部会自动为每个Producer分配一个producer id(PID),broker端会为producer每个Partition维护一个<PID,Partition> -> sequence number映射。sequence number时从0开始单调递增的。
- 如果新消息的sequence number正好是broker端维护的<PID,Partition> -> sequencenumber大1,说broker会接受处理这条消息。
- 如果新消息的sequence number比broker端维护的sequence number要小,说明时重复消息,broker可以将其直接丢弃
- 如果新消息的sequence number比broker端维护的sequence number要大过1,说明中间存在了丢数据的情况
-
-
开启
- Properties.put(“enable.idempotence”,true);
Kafka事务
- Kafka事务性主要是为了解决幂等性无法跨Partition运作的问题,事务性提供了多个Partition写入的原子性
- 即写入多个Partition要么全部成功,要么全部失败,不会出现部分成功部分失败这种情况
Flume+Kafka
Kafka Eagle
Kafka 集群消息监控系统
以上是关于大数据之Kafka看这一篇就够了的主要内容,如果未能解决你的问题,请参考以下文章