Kafka 消息队列
Posted Blue Protocol
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kafka 消息队列相关的知识,希望对你有一定的参考价值。
目录
主流的消息队列
目前我们市面上比较常见的消息队列主要有Kafka、ActiveMQ、RabbitMQ、RocketMQ等。但是这么多的消息队列我们如何进行选择呢?
在大数据的场景中我们主要采用Kafka作为消息队列。我们在JavaEE的开发当中主要采用ActiveMQ、RabbitMQ、RocketMQ。
消息队列的应用场景
虽然我现在学了几个消息队列,但是我自己对他没有什么明确的定义。
那现在就一定要记住它的功能。
传统的消息队列的主要应用场景包括:缓存/肖锋、解耦和异步通信。
缓存/肖锋
缓存/肖锋:有利于控制和优化数据流经过系统的速度,解决生产消息和消费消息的处理速度不一致的情况。
比如有下面这种情况:我们在大型活动中比如像双十一这样的活动。比如双十一参与的用户,每秒可以达到10亿/s,来进行一次秒杀,但是我们后台的最大的处理能力只有1千万/s,但是我这个系统扛不住这个海量的数据,但是我还要搞这次活动怎么办,那么我们就可以使用消息队列来处理。
这个情况下,我们的消息队列就起到了非常重要的作用。我们可以将我们的10亿次/s的数据先缓存到消息队列MQ中,一条一条缓存进来。然后再按照我们的处理系统1千万/s的处理能力,多花个几秒钟的时间就可以把它给处理完了。这也是消息队列中一个非常典型的应用场景。
解耦
解耦:允许在消息队列两边修改或者是独立拓展处理,只需要确保他们遵循MQ统一的约束接口就行。
比如我们需要将我们的多个数据源如mysql、MongoDB等传送到我们的数据处理框架中如SpringBoot、Spark中。如果没有消息队列的话,那我们就要对他们进行一一处理,编写相应的框架代码,这样开发成本很高、而且后续的修改也需要一一修改,这样麻烦。但是如果我们有了消息队列,我们的数据源只需要往消息队列里面写数据就行了,然后我们后端处理框架需要数据的,只需要从MQ中去取就行了,这样就减少了开发成本,有一个解耦的作用。
异步处理
异步处理区别于同步处理
同步处理就是一步一步把所有的事情都给做完,比如我们用户填写注册信息,然后后台将用户信息写入数据库,然后在调用发送短信接口来发送短信,假设发送短信这个过程需要等待3秒,然后之后页面才响应注册成功。
而**异步处理是我们把核心的事情处理完毕之后,其他另一些不太重要的事情交给MQ去处理,哪怕失败了没有关系,不影响核心任务的完成。**如我们上述说的如果用异步来进行处理的话,用户填完注册信息之后,我们后台将用户信息写入数据库,这个时候,我们就可以响应给用户注册成功了,而发送短信的这个过程交给异步来做。这样子用户就不需要进行等待了。
但是异步处理也不代表不重要,像现在大多数电商分布式场景基本都是使用异步处理的,相反异步处理带来的重试机制反而可以提高业务的成功率的。
我们使用异步处理的主要目的是:减少请求响应时间,实现流程异步化,提高系统响应性能。
Kafka
Kafka的定义
什么是Kafka?
Kafka是一个基于分布式的、具有高吞吐量的发布、订阅模式的消息队列,主要应用于大数据领域,它可以承担大数据的存储、分析和计算。
Kafka的底层基础架构
首先我们先来说一下,Kafka的基本组成。
首先是Kafka 的Producer生产者,用来对接外部传进来的数据的。
然后是Kafka 的Consumer消费者,是用来消费Kafka的数据的。
再有就是Kafka的中间部分,我们Kafka一般都是基于分布式的,所以中间是我们的Kafka集群,集群中存储的是Kafka的Topic主题,用来存储各种各样的数据的地方。(我们Topic主题的分区,可以放在不同的服务器上)
假设我们现在有100T的数据,我们需要把这100T的数据传入到Kafka里面,同时做一个消费,100T的数据,一般情况下,我们的单独一台服务器来存储,肯定是存不下的。所以需要海量的数据,我们需要将数据分而治之,所以Kafka也采用了这种思想。Kafka把一个Topic主题分成多个Partition分区,这样我们一个Topic每个Partition就可以分别承担存储数据的压力。
为了配合Kafka集群中主题分区的设计,Kafka官方提出了消费者组的概念,组内可以多个消费者,并且为了后续的管理,主题中的一个分区,只能由消费者组内一个消费者来消费。
而为了提高kafka的高可靠,为每一个Partition分区增加了副本,并且分区的副本有Leader和Follower之分,这里需要注意的是,无论是Kafka的生产者和消费者,来处理数据的时候,只针对Leader分区进行生产和消费,Follower主要是用来当作备份,如果Kafka分区Leader挂掉之后,Follower副本有条件可以成为Leader。
在Kafka2.8.0之前,Kafka必须配置Zokeeper进行使用的,Kafka一些数据是存储在Zookeeper里面的,Zookeeper集群主要存储Kafka中哪些brokers服务器上线了,并且记录了每一个分区谁是Leader。而在Kafka2.8.0之后,Zookeeper是可选的,我们可以选择Zookeeper来存储信息,也可以使用Kafka的Kraft模式。
这就是Kafka的基础架构。
Kafka分区如何保证Leader选举
Kafka集群中每一个broker都会有一个Controller,而只有一个broker的Controller会被选举为Controller Leader,它负责管理Kafka broker集群的上下线并负责Leader的选举工作。而Kafka分区的Leader和Follower的信息同步工作是依赖于Zookeeper的。
假设我们已经启动了Zookeeper集群和Kafka集群。
Kafka每启动一个节点Broker就会在Zookeeper当中注册一个节点信息。
而Kafka的每一个节点Broker都会有Controller,而这些Controller会抢着去Zookeeper集群中注册controller,谁先注册到,谁就可以成为Controller,负责Leader的选举工作。
假如我们Kafka Broker0抢到了Controller Leader,那么Broker0第一时间会监控Zookeeper当中对应的kafka集群信息brokers/ids/里面的节点变化,里面如果有任何信息变化都能快速的捕捉到。
下面我们来讲一下Kafka中Leader真正的选举策略。
选举规则:在ISR 中存活为前提,安装AR中排在前面的优先。
例如ISR[1,0,2],AR[1,0,2],那么Leader就会安装1,0,2的顺序轮询。
选举出来后,Controller Leader将对应的主题的ISR和Leader信息上传到Zookeeper,防止Controller Leader节点挂了,其他节点可以快速识别信息。
现在理顺一下:假设我们有三个Kafka Broker0/1/2 , 1 为Leader,假设broker1挂掉了,那么在Zookeeper中kafka集群信息brokers/ids/中就会去掉broker1,而broker0(Leader Controller)已经监控了Zookeeper中kafka集群信息的变化,首先它会在Zookeeper集群中拉出对应的主题的ISR和Leader信息,拉过来之后,就按照上述的选举规则进行选举,Leader选举出来之后,Controller Leader将对应的主题的ISR和Leader信息上传到Zookeeper。
Kafka分区如何保证Leader和Follower数据的一致性
Leader和Follower数据的一致性、Kafka数据的同步机制都是一致的说法。
在我们探讨Kafka底层的基础架构的时候,我们说Kafka的Topic主题分区有Leader和Follower之分,而无论是Kafka的生产者和消费者,来处理数据的时候,只针对Leader分区进行生产和消费,而所有的Follower都是从Leader中拉取数据进行数据 同步的,由于生产和消费只针对Leader,所以Leader一般数据较多,而Follower较少,那Kafka到底是怎么样同步的?
假设我们有三个Kafka节点broker0/1/2,broker0为Leader。
首先我们需要明白几个概念
LEO:每个副本最后一个offset+1。就像我们的数组a[8],LEO就是8。
HW(High Watermark)高水位线:所有副本最小的LEO。消费者能够看到最大的数据就是HW-1。
Leader和Follower数据的一致性分为两种情况
1)Leader故障
(1)Leader发生故障之后会被踢出ISR,然后根据Leader选举规则选举新的Leader。
(2)由于生产和消费只针对Leader,所以Leader一般数据较多,而Follower较少,为保证多个副本数据之间的一致性,其余的Follower会先将给的log文件高于Leader的HW的部分截掉,然后大家的数据就都一致了。
2)Follower故障
(1)Follower发送故障之后会被踢出ISR。
(2)这个期间存活的节点继续接收数据。
(3)待该Follower恢复后,Follower会读取本地磁盘记录上一次的HW,并通过log文件将高于HW的部分截取掉,从HW开始向Leader继续同步。
(4)等该Follower的LEO达到该Topic的HW,该Follower就可以重新加入ISR了。
由上述的过程我们可以知道Kafka只能保证数据的一致性,并不能保证数据不丢失或者不重复。
Kafka 中消费者的消费方式
在正常情况下,MQ消息队列中,消费者有两种消费方式:由MQ集群推(push)和由消费者进行拉(pull)。
而Kafka采用的是消费者主动拉取(pull)的方式。
如果Kafka采用Push推的方式,消费者方面各消费速度可能不同,很难适应所有的消费者的消费速率。如Broker是推10ms/s、20m/s还是50m/s。
而且push的方式速度固定,忽略了消费消费能力,如果速度太快可能导致拒绝服务或者排队堵塞的情况。
拉取(pull)模式中
优点:
(1)可以根据消费者consumer的消费能力进行拉取速率,起到一个控制速率的作用。
(2)pull中的拉取可以是批量拉取,也可以是单条数据的拉取。
缺点:
如果kafka集群中没有数据,消费者可能陷入空循环当中,一种返回空数据,消耗资源。
Kafka 高效读写数据的原因(高性能吞吐的原因)(重点)
(1)Kafka本身是分布式集群,采用了分区技术,也采用了消费者组并行消费数据,并行度高。
Topic主题采用了分区技术,针对于生产者,提高了生产者存储数据的并行度,分区也解决了存储大数据量的问题,针对于消费者,采用了消费者组的方式来消费分区的数据,提高消费者消费数据的并行度。
(2)Kafka写数据的时候,是按照顺序写入磁盘的。
Kafka生产者生产数据的时候,要写入log文件中,写的过程是一直追加到文件的末端,为顺序写。官方有数据表名,同样的磁盘,顺序写的速度能到600M/s,而随机写只有100K/s。这样磁盘的结构有关,顺序写之所以快,是因为省去了大量磁头寻址的时间。
(3)Kafka读数据的时候,采用稀疏索引,可以快速定位要消费的数据。
这个我们可以从Kafka的文件存储机制说起,在Kafka集群中,Topic是逻辑上的概念,而Partition分区是物理上的概念,所以Topic主题在硬盘上是以分区的形式存在的。一个Topic分为多个Partition,一个Partition对应一个log,而这个log文件中存储的就是Producer生成的数据。Producer生产的数据会被不断追加到该log文件的末端。为防止log文件过大导致数据定位效率低下,kafka采用了分片和索引机制,将每个partition分为多个Segment。每个Segment大约是1G左右,其中每个segment包括:.index文件,.log文件和.timeindex文件。Kafka生产者在生产日志的时候,Kafka集群存储日志的时候,索引是按照稀疏索引的方式进行存储的,大约每往.log文件中存储4KB的数据,就会在.index中写入一条索引。因为Segment大约1个G,如果我们直接查找1个G的文件,会比较慢,所以Kafka在Segment中创建了一个索引,方便对数据的定位。
(4)零拷贝技术和页缓存。
pageCache叶缓存:kafka依赖底层操作系统提供的pageCache功能。当上层有数据操作时,操作系统将数据写入叶缓存中,当读数据操作发生时,先从页缓存中找,如果找不到,再从磁盘中读取。pageCache页缓存是让多的内存当做磁盘缓存来使用。
零拷贝技术:Kafka对数据的加工和处理都交由Kafka的生产者和Kafka消费者去做,Kafka 集群应用层不关心存储的数据,所以就不用走应用层,传输效率高。
Kafka在页缓存中获取到数据之后,如果没有零拷贝技术,走的是应用层,需要把数据传输到应用层,然后再重新发送到操作系统内核中的Socket套接字缓存,之后再传输给网卡。而又了零拷贝技术,获取到页缓存获取到数据之后直接走网卡,传输效率高。
Kafka 数据可靠性(如何实现高可靠)
Kafka 数据的高可靠,需要靠Kafka生产者、Kafka集群和Kafka消费者三方面共同合作完成。
生产者数据可靠性
生产者数据的可靠性,只要是根据ack应答机制来决定的。
- ack = 0:生产者发送消息过来之后,不需要进行ack应答,生产者可以发送下一波数据。
- ack = 1:生产者发送新数据过来,Leader收到数据就进行ack应答,生产者可以发送下一波数据。
- ack = -1:生产者发送新数据过来,Leader和所有的Follower节点收到数据之后就行ack应答。
所以,在Kafka生产者中保证数据的可靠性,我们需要设置ack=-1。
Kafka集群数据可靠性
如果生产者ack应答设置为-1,可能会出现一个问题,生产者发送数据过来之后,Leader收到数据,所有Follower都开始同步数据,但如果有一个Follower因为发送故障,迟迟不能与Leader进行同步,怎么解决。
我们来看看kafka是如何解决的,Kafka让Leader维护一个动态的ISR队列,ISR队列为和Leader保持同步的Follower+Leader集合。其过程是如果Follower长时间未向Leader发送通信请求或者同步数据,那么该Follower将会被踢出ISR队列。
这样就不需要等长期联系不上或者是已经故障的节点了,保障了Kafka集群的健康。
那么,我们来假设一种情况,如果Topic主题分区的副本数设置为1个,或者ISR里的副本数量只有1个,这样的效果就相当于只有一个Leader,和ack=1的效果是一样的,这样如果Leader挂掉之后就会产生丢数据的风险了。
所以,在Kafka集群中数据的可靠性,我们需要保证主题分区副本数>=2 + ISR里最小副本数>=2。
数据完全可靠的条件 : ack = -1 + 主题分区副本数>=2 + ISR里最小副本数>=2
数据虽然完全可靠,但是会出现数据重复问题,比如Leader在和Follower中的节点同步之后,已经存了数据了,在要发送ack应答的一瞬间,这个时候,Leader挂掉了,应答没有成功,这个时候又会重新选举出来新的Leader,由于没有应答成功,Kafka生产者会进行重试,会再次发送一次一模一样的数据,这个时候就出现数据重复了。
我们数据完全可靠会实现至少一次,而如果我们需要做到数据精确一次,既数据不能重复也不能丢失,比如和钱相关的数据就需要这样。
所以,我们在保证数据完全可靠的条件下,我们还需要使用Kafka的幂等性务来实现精准一次。
Kafka幂等性是指:无论生产者发送多少重复数据给Kafka集群,Kafka集群之后持久化一次,保证了数据不重复。而新版的Kafka幂等性配置默认是打开的。
1)生产者角度
- acks设置为-1 (acks=-1)。
- 幂等性(enable.idempotence = true) + 事务 。
2)broker服务端角度
- 分区副本大于等于2 (–replication-factor 2)。
- ISR里应答的最小副本数量大于等于2 (min.insync.replicas = 2)。
3)消费者
- 事务 + 手动提交offset (enable.auto.commit = false)。
- 消费者输出的目的地必须支持事务(MySQL、Kafka)。
Kafka 消息丢失的场景及解决方案
Kafka消息丢失的场景:
1、ack = 0,生产者发送完消息之后不应答,如果发送失败消息就丢失了。
这种情况我们可以让ack = -1来解决,让生产者发送消息之后,Leader和Follower收到消息之后再进行应答。
2、Topic主题分区的副本数设置为1个,或者ISR里的副本数量只有1个,这样的效果就相当于只有一个Leader,和ack=1的效果是一样的,这样如果Leader挂掉之后就会产生丢数据的风险了。
这种情况下,要保证分区副本数>=2 + ISR里面的副本数量>=2。
Kafka 数据乱序(重点)
数据乱序,是指Kafka生产者通过Sender线程最多给Kafka集群发送五个请求,假如现在发送4个数据1234的时候出现了这种情况,1、2发送成功,3发送失败了进行重试,4发送成功了,这个时候就出现了1、 2 、4 、3的顺序,这就导致了数据乱序。
解决方案如下:
1)Kafka在1.x版本之前要保证数据单分区有序,
在不需要考虑是否开启幂等性的情况下,设置max.in.flight.requests.per.connection = 1
2)Kafka1.x版本之后要保证数据单分区有序,
(1)在未开启幂等性的情况下,设置max.in.flight.requests.per.connection = 1。
(2)在开启幂等性的情况下,需要设置max.in.flight.requests.per.connection 小于等于5,原因是在Kafka1.x以后,启用幂等性后,Kafka服务端会缓存producer发来最近5个的元数据,故无论如何,都可以保证最近5个request的数据都是有序的。
Kafka 提高吞吐量(Kafka调优)
我们来说一种数据积压的场景,在Kafka当中默认的日志存储时间为7天,过了7天之后就会被删除,如果我们消费某个主题的数据,我们已经消费4天了,但是也只是消费者到10%,那么再过三天也消费不了,就这是数据积压。
解决数据积压的方式:提高消费者的吞吐量。
消费者:
(1)如果是Kafka的消费能力不足,我们可以考虑增加Topic主题的分区数,并同时提升消费者组的数据,让消费者数 = 分区数。
(2)如果是我们消费者消费数据能力过剩,而从集群中拉取的批次数据过少,使我们处理的数据小于生成的数据,也会造成数据积压。这个时候我们可以通过参数提高我们批次拉取的数据,从500条增加到1000条(max.poll.records),这里需要注意一下,默认每批次拉取的数据是50M,计算方式是:条数*日志大小,如果总大小大于50M,我们也需要通过参数提高50M这个值(fetch.max.bytes)。
提高生产者的吞吐量,在提生产者的吞吐量前,我们需要知道几个参数的意义
bitch.size:批次发送数据大小,默认16K。
linger.ms:等待时间,默认0ms。
compression.type:压缩类型。
RecordAccumulator:Kafka发送数据的缓冲区大小,默认是32M
了解完这几个核心参数之后,我们还需要知道生产者吞吐数据的流程:
在生产者发送数据的队列里面,在linger.ms等待时间里面,如果我们的数据迟迟未到达bitch.size,生产者sender等待linger.ms之后就会发送数据。linger.ms的默认值为0ms,说明如果我们不对它进行设置,那么队列里面只要有数据,sender线程就可以从队列里面拉取数据并发送到Kafka集群中,0延迟。并且batch.size在linger.ms为0ms的面前是不起作用的,触发我们把这个时间加长,bitch.siez才会起作用。这种来一个拉一个的效率其实不高。
一般我们会这样设置这几个参数的大小来提高Kafka生产者的吞吐量。
batch.size:每批发送数据的大小从默认的16K修改为32K。
linger.ms:等待时间,从默认的0ms修改为5-100ms(但是我们需要注意修改linger.ms会导致延迟变大,适当调节就好)
compression.type:压缩类型,我们使用snappy压缩,如果我们将数据压缩之后,能拉取的数据就更多了。
RecordAcumulator:缓冲区大小,也就是我们的队列,我们可以把默认的32M修改为64M。(buffer.memory)
以上是关于Kafka 消息队列的主要内容,如果未能解决你的问题,请参考以下文章