kafka可靠数据传递

Posted PacosonSWJTU

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了kafka可靠数据传递相关的知识,希望对你有一定的参考价值。

【README】

本文阐述了kafka可靠消息传递机制;

本文部分内容总结于《kafka权威指南》(一本好书,墙裂推荐),再加上自己的理解;


【1】可靠性保证

1,在讨论可靠性时,一般使用保证这个词;

保证指的是, 确保系统在各种不同的环境下能够发生一致的行为; 

2,kafka在哪些方面做了保证呢?

  1. 保证分区消息顺序;
  2. 只有当消息被写入分区所有副本时,它才被认为是已提交的;(无论生产者acks设置为多少)
  3. 只要有一个副本是活跃的,则已提交消息就不会丢失;
  4. 消费者只能读取已提交消息;

【2】复制

1,kafka复制机制和分区多副本架构是kafka可靠性保证的核心;

2,把消息写入多个副本可以使kafka 在发生崩溃时仍能保证消息的持久性;

3,分区:分区是kafka存储数据的基本单位,一个主题的数据被分到多个分区进行存储;分区内的数据是有序的;

4,分区副本:每个分区可以有多个副本;副本又分为首领副本,跟随者副本;生产者消费者只与首领副本交互;跟随者副本只需要及时从首领副本复制最新事件,以与首领副本保持同步;

当首领副本不可用时,其中一个同步副本选举为新首领;(注意是同步副本才可以选举为新首领)

5,跟随者副本成为同步副本的条件

  1. 与zk有一个活跃的会话,即在过去6s内(可配置)向zk发送心跳;
  2. 在过去10s内(可配置)从首领获取过最新消息;

【补充】非同步副本问题

  • 1,如果一个副本在同步与不同步状态间频繁切换,说明集群内部出现了问题,通常是由于jvm 不恰当gc导致的,需要优化系统性能;
  • 2,一个滞后的同步副本会导致生产者消费者变慢因为消息被认为已提交前,客户端会等待所有同步副本接收消息;
    • 2.1, 当一个副本不再同步了,我们就不需要关心它是否接收到消息(参见跟随者副本称为同步副本的条件,即一个同步副本变为不同步中间的时长是可以配置的);
    • 2.2,更少的同步副本,意味着更低的有效复制系统,发生宕机时丢失数据的风险更大;

【3】broker配置

1, broker指的是kakfa服务器,又称中心点(算是知识review了);

2,broker有3个参数影响可靠性(非常重要*)

  1. 副本系数(复制系数,每个分区副本数量);
  2. 是否不完全首领选举(非同步副本在首领不可用时,是否可以被选为首领);
  3. 最少同步副本(消息被认为已提交时,消息被写入的最少副本个数);

2.1)创建topic的复制系数,replication.factor

  • 每个分区有多少个副本; 建议在要求可用性场景里把副本系数设置为3

2.2)是否允许不完全的首领选举,unclean.leader.election

  • 在首领不可用但其他副本都是不同步的,我们应该怎么办?

情况1:分区有3个副本,1个正常的首领副本,2个不可用的跟随者副本;消息被写入首领副本后,首领所在broker宕机了;这个时候,如果之前的一个跟随者副本重新启动,他就成为了分区的唯一不同步副本;

问题来了:是否选择它作为首领副本,即便它是不同步副本?

情况2:分区有3个副本,1个正常首领副本,2个因为网络问题导致同步滞后的跟随者副本;尽管跟随者副本还在复制消息,但已经不同步了;首领副本作为唯一同步副本还在接收生产者消费者请求。这个时候如果首领不可用,另外两个副本就再也无法变成同步的了;

问题来了:是否选择它作为首领副本,即便它是不同步副本?

如何选择?

  • 选择1,如果不同步副本不能提升为新首领,则分区在旧首领恢复前都是不可用的;有时候这种状态会持续数个小时(在旧首领恢复前会导致整个集群不可用,甚至长时间不可用);
  • 选择2,如果不同步副本提升为新首领,则在这个副本变为不同步之后写入旧首领的消息全部丢失,这会导致数据不一致问题

这需要我们根据具体的业务场景在系统可用性和一致性两方面做出权衡

小结:

  1. 如果允许不同步副本成为首领,就要承担丢失数据和出现数据不一致的风险;
  2. 如果不允许不同步副本成为首领,就要接收较低的可用性;因为必须原地等待旧首领恢复正常;

不完全首领选举的意思就是, 允许不同步副本成为首领(unclean.leader.election 设置为true)

2.3)最少同步副本,min.insync.replicas:;

默认情况下,一条消息被写入所有副本,才被认为是已提交的;才可以继续向分区写入和消费下一条数据;

如果设置了 min.insync.replicas=X,则一条消息被写入了X个副本(而无需写入所有副本),则消息就会被认为已提交

如,对于一个包含3个副本的主题,若 min.insync.replicas 设置为2,则至少要存在两个同步副本才能向分区写入数据;若只剩下一个同步副本,集群就变成只读了,这是为了避免在发生不完全选举时数据的写入和读取出现非预期的行为(数据不一致);


【4】在可靠系统里使用生产者

即便把broker配置为可靠,生产者若没有进行可靠性配置,仍有可能发生数据丢失风险;

即kafka可靠系统依赖 broker,生产者,消费者这三者的可靠性配置

1)看个例子

情况1,为broker配置了3个副本,禁用不完全首领选举;生产者发送消息设置acks=1;

生产者发送消息A给首领,首领成功写入,并告诉生产者成功写入,但跟随者副本还没有收到这个消息;这是首领崩溃了,而此时,消息还没有被跟随者副本复制过去。

结果:另外两个副本仍然被认为是同步的(毕竟判断一个副本不同步需要一段时间),而其中一个副本称为新首领。

  1. 对于生产者来说, 消息A成功写入了;
  2. 对于消费者来说,因为消息A没有被复制到所有副本,即不会认为已提交,所以消费者是看不到消息A的,它认为数据是一致的,没有丢失消息;

小结:从生产者角度来讲,实际上就是丢失了一条消息;即便kafka系统看起来数据是一致的;

情况2,为broker配置了3个副本,禁用不完全首领选举;生产者发送消息设置acks=all;

生产者往kafka发送消息,分区首领刚好崩溃了,新首领正在选举中,kafka会向生产者返回首领不可用的响应;

这个时候,若生产者没有正确处理错误或没有重试直到发送成功,则消息就有可能丢失;

结果:

  1. 这不算broker可靠性问题,因为broker没有收到这个消息;
  2. 这也不是不一致性问题,因为消费者没有读到这个消息;

问题在于, 生产者没有正确处理错误,弄丢消息的是它自己;

2)如何避免上述两种问题?

  1. 根据可靠性需求配置恰当acks值
  2. 在参数配置和代码里正确处理错误

【4.1】发送确认

acks的3个值, 0, 1 , all;

  • 0 表示 发送到kafka broker就认为写入成功,而不管是否写入首领副本和所有副本;
  • 1 表示消息写入首领副本就算成功;
  • all 表示消息写入所有副本才算成功;

【4.2】配置生产者重试次数

1)错误: 生产者需要处理的错误分为两类,包括自动处理的错误,手动处理的错误;

2)重试: 若broker返回的错误可以通过重试来解决,则生产者自动处理这些错误;

3)错误响应码

  1. 重试之后可以解决的;如 首领不可用错误-LEADER_NOT_AVAILABLE,重试几次,首领选举完成,消息成功写入;
  2. 重试之后无法解决的;如 配置错误,消息大小超过阈值;

注意:重试发送一个已经失败的消息会带来风险,因为如果两个消息都写入成功,则消息重复;这需要消费者在处理消息时保证幂等性

幂等性: 服务器对先后多次相同客户端请求的响应是相同的;如转账;

4)其他错误场景

  1. 不可重试的broker错误,如消息大小超长错误,认证错误;
  2. 消息发送前的错误,如序列化错误;
  3. 生产者达到重试次数上限时或消息占用内存达到上限时的错误;

【5】在可靠系统里使用消费者

1)消费者读取消息时不丢失消息的关键

  • 消费者需要跟踪哪些消息是读取过的,哪些还没有读取;

2)丢失消息

如果消费者1 提交了消息X偏移量T, 却没有处理完消息X,那就有可能造成消息X丢失;

  • 因为如果消费者1宕机,其他消费者接手处理,它是不会再次消费消息X的,会被忽略;

【5.1】消费者可靠性配置

消费者需要注意以下4个参数配置, 如下:

  • 1)group.id:消费者组编号,两个消费者具有相同组id,每个消费者会分到主题消息的子集;如果希望看到所有消息,消费者组编号需要唯一;
  • 2)auto.offset.reset:重置消费者读取消息的偏移量;两个值如下:
    • earliest, 从分区开始位置读取数据;
    • latest,从分区末尾位置读取数据;
  • 3)enable.auto.commit, 启动自动提交偏移量,也可以在代码里手动提交; 取值 [true | false] 
  • 如果在轮询里处理所有数据,那么自动提交可以保证只提交已经处理过的消息的偏移量;
  • 但如果在子线程处理数据,自动提交可能在消息没有处理完就提交了(有风险);
  • 4)auto.commit.interval.ms: 设置自动提交偏移量的频率;默认值是每5秒自动提交一次; 

消费者属性配置例子

/* 1.创建消费者配置信息 */
Properties props = new Properties();
/*2.给配置信息赋值*/
/*2.1连接的集群*/
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "centos201:9092");
/*2.2开启自动提交 */
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
/*2.3 自动提交的间隔时间*/
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
/*2.4 key value的反序列化 */
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
/*2.5 消费者组 */
props.put(ConsumerConfig.GROUP_ID_CONFIG, "hello04Group01"); // group.id
/*2.6 重置消费者的offset */ 
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // 默认值是 lastest 
/*2.7 关闭自动提交 */
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

【5.2】显式提交偏移量(手动提交)

可靠消费者需要注意的8个事项

1)总是在处理完事件后再提交偏移量;

  • 如果消费处理过程都在轮询里完成,且不需要在轮询间维护状态,可以使用自动提交,或在轮询结束时使用手动提交; 

2)提交频度是性能和重复消息数量之间的权衡;

  • 如果消费处理过程都在轮询里完成,且不需要在轮询间维护状态,可以在一个循环里多次提交偏移量(每循环一次提交一次也是可以的),或者循环退出后提交一次;这取决于在性能和重复处理消息间做出的权衡;

3)确保对提交的偏移量心里有数;

  • 处理完消息后再提交偏移量是非常关键的,否则会导致消费者错过消息;

4)消费者再均衡;

  • 在分区被撤销之前提交偏移量,并在分配到新分区是清理之前的状态;
  • 消费者再均衡在同一个消费者组当中,分区所有权从一个消费者转移到另外一个消费者的机制;

5)消费者可能需要重试;

  • 情况1:在遇到可重试错误时,提交最后一个处理成功的偏移量,然后把没有处理好的消息保存到缓存里(下一个轮询就不会把它覆盖掉),调用消费者的 pause() 来确保其他轮询不会返回数据;如果重试成功或重试次数达到上限,把错误消息丢弃,调用 resume()  让消费者从轮询重新获取新数据;
  • 情况2:在遇到可重试错误时,把错误消息写入另外的主题B(解耦);由主题B的消费者来处理错误;类似于MQ的死信队列;

6)消费者可能需要维护状态;

  • 不建议多个轮询间维护状态,太复杂;
  • 建议尝试使用 kafkaStreams 类库,为聚合,连接,时间窗和其他复杂分析提供了高级的dsl api;

7)长时间处理;

  • 暂停轮询时间不能超过几秒钟;即使不想获取更多数据,也要保持轮询,这样消费者才会往 broker 发送心跳;才不会发生消费者再均衡;
  • 推荐做法: 把数据交给工作线程(线程池)去处理,然后暂停消费者但保持轮询(以防止消费者再均衡),不获取新数据;当工作线程处理完成后,让消费者继续获取新数据;(干货——消费者处理大量数据的推荐做法

注意: 区分暂停轮询 与 暂停消费者间的区别;

8)仅一次传递;

  • 方案1: 最常用的方法是把结果写到一个支持唯一键的系统里,如存储引擎,关系型数据库,es;要么消息自带一个唯一键,要么使用消费者组+主题+分区+偏移量的组合创建唯一键;这可以唯一标识一个kafka记录;且消费者逻辑保证幂等性即可
  • 方案2:若消费者系统支持事务,可以把消息和偏移量持久化到数据库;当消费者启动时,从数据库读取偏移量,并调用seek() 方法从该偏移量的下一个位置读取数据即可;

【6】验证系统可靠性

【6.1】配置验证

  1. 首领选举: 如停掉首领所在broker会发生什么?
  2. 控制器选举:如停掉(重启)控制器所在broker 会发生什么 ?
  3. 依次重启:依次重启broker 不会丢失数据吗?
  4. 不完全首领选举测试(不同步副本可以选举为首领副本):如果依次停止所有副本,然后启动一个不同步的broker会发生什么 ?要怎样才恢复正常 ?

【6.2】应用程序验证

  1. 客户端从服务器断开连接;
  2. 首领选举;
  3. 依次重启broker;
  4. 依次重启生产者;
  5. 依次重启消费者;

【6.3】在生产环境监控可靠性

1)kafka的java客户端包含了 JMX 度量指标;  可以监控客户端的状态和事件;

  • 对于生产者,最重要的可靠性指标是 error-rate 和 retry-rate ;
  • 对于消费者,最重要的可靠性指标是 consumer-lag ;

以上是关于kafka可靠数据传递的主要内容,如果未能解决你的问题,请参考以下文章

Kafka八股文梳理

kafka传递消息的三种方式

主流消息中间件优劣:ActiveMQ,RabbitMQ,Kafka,RocketMQ

学习笔记Kafka—— Kafka简介

学习笔记Kafka—— Kafka简介

rabbitmq和kafka的区别