Kafka相关

Posted 买糖买板栗

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kafka相关相关的知识,希望对你有一定的参考价值。

目录

1 为什么要使用 MQ 消息中间件:

2 如何保证高吞吐量和消息的可靠传输:

3 kafka如何保证消息的顺序性

4 kafka为什么那么快

4.1 内存缓存池

4.2 Reactor多路复用模型

4.3 写入数据的超高性能:页缓存技术 + 磁盘顺序写

4.4 零拷贝

4.5 Rebalance


1 为什么要使用 MQ 消息中间件:

  • 解耦,要做到系统解耦;
  • 面对大流量并发时容易被冲垮;设置流量缓冲池,可以让后端系统按照自身吞吐能力进行消费,不被冲垮
  • 存在性能问题(RPC接口基本上是同步调用)即整体的服务性能遵循“木桶理论”,即链路中最慢的那个接口;

2 如何保证高吞吐量和消息的可靠传输:

说明:

一个集群,为什么需要多个Broker(kafka实例),多实例保证一个broker挂了,整个集群还能继续使用,高可用

一个topic,为什么要有多个part:为了方便多个生产者忘多个part并发写入消息,提供吞吐量;

一个part,为什么要有多个副本:为了防止某个broker宕机,导致消息的丢失,提高可用性;

几个名词:

ISR(In-Sync Replicas):副本 同步队列,

OSR(Outof-Sync Replicas):副本 非同步队列

AR(Assigned Replicas)= ISR+OSR

Kafka消息保证生产的信息不丢失和重复消费问题:

  • 使用同步模式的时候,有3种状态保证消息被安全生产,在配置为1(只保证写入leader成功)的话,如果刚好leader partition挂了,数据就会丢失。
  • 还有一种情况可能会丢失消息,就是使用异步模式的时候,当缓冲区满了,如果配置为0(还没有收到确认的情况下,缓冲池一满,就清空缓冲池里的消息),数据就会被立即丢弃掉。

在数据生产时避免数据丢失的方法:
只要能避免上述两种情况,那么就可以保证消息不会被丢失。

  • 在同步模式的时候,确认机制设置为-1,也就是让消息写入leader和所有的副本。
  • 在异步模式下,如果消息发出去了,但还没有收到确认的时候,缓冲池满了,在配置文件中设置成不限制阻塞超时的时间,也就说让生产端一直阻塞,这样也能保证数据不会丢失。在数据消费时,避免数据丢失的方法:如果使用了storm,要开启storm的ackfail机制;如果没有使用storm,确认数据被完成处理之后,再更新offset值。低级API中需要手动控制offset值。

数据重复消费的情况,如何处理:

  • 去重:将消息的唯一标识保存到外部介质中,每次消费处理时判断是否处理过(幂等)
  • 不管:大数据场景中,报表系统或者日志信息丢失几条都无所谓,不会影响最终的统计分析结
  • Consumer端丢失消息的情形比较简单:如果在消息处理完成前就提交了offset,那么就有可能造成数据的丢失。由于Kafka consumer默认是自动提交位移的,所以在后台提交位移前一定要保证消息被正常处理了,因此不建议采用很重的处理逻辑,如果处理耗时很长,则建议把逻辑放到另一个线程中去做。为了避免数据丢失,现给出两点建议:enable.auto.commit=false  关闭自动提交位移在消息被完整处理之后再手动提交位移。

3 kafka如何保证消息的顺序性

举个例子,你在 mysql 里增删改一条数据,对应出来了增删改 3 条 binlog 日志,接着这三条 binlog 发送到 MQ 里面,再消费出来依次执行,起码得保证人家是按照顺序来的吧?不然本来是:增加、修改、删除;你楞是换了顺序给执行成删除、修改、增加,不全错了么。本来这个数据同步过来,应该最后这个数据被删除了;结果你搞错了这个顺序,最后这个数据保留下来了,数据同步就出错了。那kafka有什么措施能保证消息的顺序性呢?

原理:Kafka保证同一个partition中的消息是有序的,即如果生产者按照一定的顺序发送消息,broker就会按照这个顺序把他们写入partition,消费者也会按照相同的顺序读取他们。

先说说kafka如何发送(生产)消息:(消息格式:每个消息是一个ProducerRecord对象,必须指定消息所属的Topic和消息值Value,此外还可以指定消息所属的Partition以及消息的Key)

  1. 序列化ProducerRecord
  2. 如果ProducerRecord中指定了Partition,则Partitioner不做任何事情;否则,Partitioner根据消息的key得到一个Partition。这是生产者就知道向哪个Topic下的哪个Partition发送这条消息。
  3. 消息被添加到相应的batch中,独立的线程将这些batch发送到Broker上
  4. broker收到消息会返回一个响应。如果消息成功写入Kafka,则返回RecordMetaData对象,该对象包含了Topic信息、Patition信息、消息在Partition中的Offset信息;若失败,返回一个错误

比如说我们建了一个 topic,有三个 partition。生产者在写的时候,其实可以指定一个 key,比如说我们指定了某个订单 id 作为 key,那么这个订单相关的数据,一定会被分发到同一个 partition 中去,而且这个 partition 中的数据一定是有顺序的。
消费者从 partition 中取出来数据的时候,也一定是有顺序的。

以上即可保证消息顺序的从kafka中获取出来,但是无法保证:三条顺序从kafka中获取的消息,在你自己的业务代码中,有没有顺序执行,考虑到多线程的处理,第一个获取的消息不一定第一个被更新到数据库中;但不用多线程,效率肯定会降低很多;处理方式:写 N 个内存 queue,具有相同 key 的数据都到同一个内存 queue;然后对于 N 个线程,每个线程分别消费一个内存 queue 即可,这样就能保证顺序性。

4 kafka为什么那么快

4.1 内存缓存池

客户端发消息到服务端,不是一条一条发送的,而是有一个内存缓存池(18.2中有讲到这个概念)的设计思路在里面,当消息累计到一定数量后再发给服务端;内存缓存池避免每次新建立内存导致的GC过于频繁。

4.2 Reactor多路复用模型

Kafka采用的架构策略是Reactor多路复用模型,简单来说,就是搞一个acceptor线程,基于底层操作系统的支持,实现连接请求监听。如果有某个设备发送了建立连接的请求过来,那么那个线程就把这个建立好的连接交给processor线程。每个processor线程会被分配N多个连接,一个线程就可以负责维持N多个连接,他同样会基于底层操作系统的支持监听N多连接的请求。如果某个连接发送了请求过来,那么这个processor线程就会把请求放到一个请求队列里去。接着后台有一个线程池,这个线程池里有工作线程,会从请求队列里获取请求,处理请求,接着将请求对应的响应放到每个processor线程对应的一个响应队列里去。最后,processor线程会把自己的响应队列里的响应发送回给客户端。

4.3 写入数据的超高性能:页缓存技术 + 磁盘顺序写

1)kafka是以磁盘顺序写的方式来写的。也就是说,仅仅将数据追加到文件的末尾,不是在文件的随机位置来修改数据。普通的机械磁盘如果你要是随机写的话,确实性能极差,也就是随便找到文件的某个位置来写数据。但是如果你是追加文件末尾按照顺序的方式来写数据的话,那么这种磁盘顺序写的性能基本上可以跟写内存的性能本身也是差不多的。

2)操作系统本身有一层缓存,叫做page cache,是在内存里的缓存,我们也可以称之为os cache,意思就是操作系统自己管理的缓存。你在写入磁盘文件的时候,可以直接写入这个os cache里,也就是仅仅写入内存中,接下来由操作系统自己决定什么时候把os cache里的数据真的刷入磁盘文件中。仅仅这一个步骤,就可以将磁盘文件写性能提升很多了,因为其实这里相当于是在写内存,不是在写磁盘。

4.4 零拷贝

从Kafka里我们经常要消费数据,那么消费的时候实际上就是要从kafka的磁盘文件里读取某条数据然后发送给下游的消费者,如图一所示。假设要是kafka什么优化都不做,就是很简单的从磁盘读数据发送给下游的消费者,那么大概过程图二所示:先看看要读的数据在不在os cache里,如果不在的话就从磁盘文件里读取数据后放入os cache。接着从操作系统的os cache里拷贝数据到应用程序进程的缓存里,再从应用程序进程的缓存里拷贝数据到操作系统层面的Socket缓存里,最后从Socket缓存里提取数据后发送到网卡,最后发送出去给下游消费。Kafka为了解决这个问题,在读数据的时候是引入零拷贝技术。也就是说,直接让操作系统的cache中的数据发送到网卡后传输给下游的消费者,中间跳过了两次拷贝数据的步骤,Socket缓存中仅仅会拷贝一个描述符过去,不会拷贝数据到Socket缓存,如图三。通过零拷贝技术,就不需要把os cache里的数据拷贝到应用缓存,再从应用缓存拷贝到Socket缓存了,两次拷贝都省略了,所以叫做零拷贝。对Socket缓存仅仅就是拷贝数据的描述符过去,然后数据就直接从os cache中发送到网卡上去了,这个过程大大的提升了数据消费时读取文件数据的性能。而且大家会注意到,在从磁盘读数据的时候,会先看看os cache内存中是否有,如果有的话,其实读数据都是直接读内存的。如果kafka集群经过良好的调优,大家会发现大量的数据都是直接写入os cache中,然后读数据的时候也是从os cache中读。相当于是Kafka完全基于内存提供数据的写和读了,所以这个整体性能会极其的高。

 Kafka中哪里需要选主:

  • Kafka集群中存在多个Broker,需要选举一个当家的Broker;
  • Kafka每个Partition可以有多个副本,需要选举一个当家的副本;Leader副本处理Partition的所有读写请求并维护自身和Follower副本的状态信息,如LEO、HW

名词解释:

  • 控制器:在启动Kafka集群时,每个代理(Broker)都会实例化并启动一个KafkaController,并将该代理的brokerId注册到Zookeeper的相应节点中。Kafka集群中各代理会根据选举机制选出其中一个代理作为Leader,即Leader控制器,当Leader控制器宕机后其他代理再次竞选出新的控制器。控制器作用:负责主题的创建与删除、分区和副本的管理以及代理故障转移处理等。
  • HW(HighWatermark):consumer能够看到的此partition的位置,即消费者能消费到的消息,消费者可见的消息,这个涉及到多副本的概念,取一个partition对应的ISR中最小的LEO作为HW,consumer最多只能消费到HW所在的位置。
  • LEO(LogEndOffset):表示每个partition的log最后一条Message的位置。

控制器选举流程:leaderId = -1代表还没有成功选举出leader

副本同步机制:

如上图,分区test-0有三个副本,每个副本的LEO就是自己最后一条消息的offset。可以看到最小的LEO是Replica2的,等于3,也就是说HW=3。这代表offset=4的消息还没有被所有副本复制,是无法被消费的。而offset<=3的数据已经被所有副本复制,是可以被消费的。

4.5 Rebalance

什么是rebalance?

rebalance本质上是一种协议,规定了一个consumer group下的所有consumer如何达成一致来分配订阅topic的每个分区。比如某个group下有20个consumer,它订阅了一个具有100个分区的topic。正常情况下,Kafka平均会为每个consumer分配5个分区。这个分配的过程就叫rebalance。

什么时候rebalance?

这也是经常被提及的一个问题。rebalance的触发条件有三种:

  • 组成员发生变更(新consumer加入组、已有consumer主动离开组或已有consumer崩溃了——这两者的区别后面会谈到)
  • 订阅主题数发生变更——这当然是可能的,如果你使用了正则表达式的方式进行订阅,那么新建匹配正则表达式的topic就会触发rebalance
  • 订阅主题的分区数发生变更
Kafka 消息系统语义概述
  (1) 最多一次: 消息不会被重复发送,最多被传输一次,但也有可能一次不传输
  (2) 最少一次: 消息不会被漏发送,最少被传输一次,但也有可能被重复传输.
  (3) 精确的一次(Exactly once):不会漏传输也不会重复传输,每个消息都传输被一次而且仅被传输一次(kafka开启幂等enable.idempotence=true)        

Kafka 判断一个节点是否还活着有那两个条件?
  (1) 节点必须可以维护和 ZooKeeper 的连接,Zookeeper 通过心跳机制检查每个节点的连 接
  (2) 如果节点是个 follower,他必须能及时的同步 leader 的写操作,延时不能太久

以上是关于Kafka相关的主要内容,如果未能解决你的问题,请参考以下文章

TIDB - 使用 TICDC 将数据同步至下游 Kafka 中

TIDB - 使用 TICDC 将数据同步至下游 Kafka 中

TIDB - 使用 TiDB Binlog 将日志同步至下游 Kafka 中

Debezium系列之:联合主键数据发往kafka topic相同分区

分布式消息队列 NSQ 和 Kafka 对比

详解大数据中必不可少的消息中间件 kafka(3.x 新版本)