Kafka3.x核心速查手册二客户端使用篇-6消息发送幂等性
Posted roykingw
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kafka3.x核心速查手册二客户端使用篇-6消息发送幂等性相关的知识,希望对你有一定的参考价值。
当你仔细看下源码中对于acks属性的说明,会看到另外一个单词,idempotence。这个单词的意思就是幂等性。在Producer发送消息到Broker的这个场景中,幂等性是表示Producer不论向Broker发送多少次重复的数据,Broker端都只会保留一条消息,而不会重复保存多条消息。
先来看生产者端对于幂等性属性的介绍
public static final String ENABLE_IDEMPOTENCE_CONFIG = "enable.idempotence";
public static final String ENABLE_IDEMPOTENCE_DOC = "When set to 'true', the producer will ensure that exactly one copy of each message is written in the stream. If 'false', producer "
+ "retries due to broker failures, etc., may write duplicates of the retried message in the stream. "
+ "Note that enabling idempotence requires <code>" + MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION + "</code> to be less than or equal to " + MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_FOR_IDEMPOTENCE
+ " (with message ordering preserved for any allowable value), <code>" + RETRIES_CONFIG + "</code> to be greater than 0, and <code>"
+ ACKS_CONFIG + "</code> must be 'all'. "
+ "<p>"
+ "Idempotence is enabled by default if no conflicting configurations are set. "
+ "If conflicting configurations are set and idempotence is not explicitly enabled, idempotence is disabled. "
+ "If idempotence is explicitly enabled and conflicting configurations are set, a <code>ConfigException</code> is thrown.";
这段介绍中涉及到另外两个参数,也一并列出来
// max.in.flight.requests.per.connection should be less than or equal to 5 when idempotence producer enabled to ensure message ordering
private static final int MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_FOR_IDEMPOTENCE = 5;
/** <code>max.in.flight.requests.per.connection</code> */
public static final String MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION = "max.in.flight.requests.per.connection";
private static final String MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DOC = "The maximum number of unacknowledged requests the client will send on a single connection before blocking."
+ " Note that if this config is set to be greater than 1 and <code>enable.idempotence</code> is set to false, there is a risk of"
+ " message re-ordering after a failed send due to retries (i.e., if retries are enabled)."
+ " Additionally, enabling idempotence requires this config value to be less than or equal to " + MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_FOR_IDEMPOTENCE + "."
+ " If conflicting configurations are set and idempotence is not explicitly enabled, idempotence is disabled.";
这一段描述更多的是在解释如何在Producer发送者端使用idempotence幂等性。只涉及到几个参数,ACKS_CONFIG、ENABLE_IDEMPOTENCE_CONFIG、MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION、MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_FOR_IDEMPOTENCE、RETRIES_CONFIG。但是却并没有详细解释幂等性如何实现。实际上这里涉及到了Kafka对于幂等性的一整套设计思想。
这里首先需要理解分布式数据传递过程中的三个数据语义:at-least-once:至少一次;at-most-once:最多一次;exactly-once:精确一次。
比如,你往银行存100块钱,这时银行往往需要将存钱动作转化成一个消息,发到MQ,然后通过MQ通知另外的系统去完成修改你的账户余额以及其他一些其他的业务动作。而这个业务动作的数据安全性,往往是需要分层次来设计的。首先,你要保证存钱的消息能够一定发送到MQ,如果一次发送失败了,那就重试几次,只到成功为止。这就是at-least-once至少一次。如果保证不了这个语义,那么你肯定不会接受。然后,接下来就要保证这个消息在MQ中最多只会记录一次,而不会因为网络抖动或者重试等其他原因被重复记录多次。这就是at-most-once最多一次。如果保证不了这个语义,那么银行肯定也不能接收。最后,这个业务动作要让双方都满意,就必须保证存钱这个消息正正好好被服务端记录了一次,不多也不少。这就是Exactly-once语义。
所以,通常意义上,at-least-once可以保证数据不丢失,但是不能保证数据不重复。而at-most-once保证数据不重复,但是又不能保证数据不丢失。这两种语义虽然都有缺陷,但是实现起来相对来说比较简单。但是对一些敏感的业务数据,往往要求数据即不重复也不丢失,这就需要支持Exactly-once语义。而要支持Exactly-once语义,需要有非常精密的设计。
回到Producer发消息给Broker这个场景,如果要保证at-most-once语义,可以将ack级别设置为0即可。而如果要保证at-least-once语义,就需要将分区的副本数设置为大于等于2,保证ISR里应答的最小副本数大于等于2,再将ack级别设置为-1或者all。那要支持Exactly-once语义怎么办呢?这就需要使用到idempotence幂等性属性了。
Kafka为了保证消息发送的Exactly-once语义,增加了几个概念:
- PID:每个新的Producer在初始化的过程中就会被分配一个唯一的PID。这个PID对用户是不可见的。
- Sequence Numer: 对于每个PID,这个Producer发送的数据的每个<Topic,Partition>目标组合都会分配一个Sequence Number。这是一个从0开始单调递增的数字。Broker端在写消息时,会同时保存这个Sequence Number。
这样,Kafka在打开idempotence幂等性控制后,在Broker端就会保证每个具有相同的<PID,Partition,Sequence Number>信息的消息,Broker端只会持久化一条。这样就能保证at-most-once语义。再加上ack级别为-1或all,保证at-least-once语义,这样就整体上保证了Exactaly-once语义。
以上是关于Kafka3.x核心速查手册二客户端使用篇-6消息发送幂等性的主要内容,如果未能解决你的问题,请参考以下文章
Kafka3.x核心速查手册二客户端使用篇-6消息发送幂等性