如何保证kafka生产者发送消息的可靠性

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了如何保证kafka生产者发送消息的可靠性相关的知识,希望对你有一定的参考价值。

参考技术A 继一年前的 kafka介绍的学习总结 ,生产者Producers按照主题topic把消息发给kafka集群的主分区,其他分区从主分区同步该消息。

具体来看kafka的分布特性性:kafka消息的分区分布在Kafka集群的某些服务器上,每个分区都有一个服务器充当leader,有0个或多个充当follower。leader处理分区的所有读请求和写请求,同时follower被动的从leader同步数据。假如leader异常了,其他follower会自动的选出一个新leader。每个服务器有可能充当某些分区的leader,同时也充当其他分区的follower,因此集群负载得到了很好的平衡和实现容错功能。

Kafka默认的副本因子是3,即每个分区只有1个leader副本和2个follower副本。

由于Kafka是一个分布式系统,follower必然会存在与leader不能实时同步的风险,那么follower副本在什么条件下才算与Leader同步?ISR同步副本机制解决这个问题。

In-sync replica(ISR)称之为同步副本,ISR中的副本都是与Leader进行同步的副本。ISR中是什么副本呢?首先可以明确的是:Leader副本总是存在于ISR中,而follower副本是否在ISR中,取决于该follower副本是否与leader副本保持了“同步”。

Kafka的broker服务端有一个参数replica.lag.time.max.ms, 该参数表示follower副本滞后与Leader副本的最长时间间隔,默认是10秒。只要follower副本落后于leader副本的时间间隔不超过10秒,就认为该follower副本与leader副本是同步的,即使follower副本落后于Leader副本几条消息,只要在10秒之内赶上Leader副本,就不会被踢出局。 如果follower副本被踢出ISR列表, 等到该副本追上了leader副本的进度,该副本会被再次加入到ISR列表中,所以ISR是一个动态列表,并不是静态不变的。

acks参数主要决定了kafka集群leader分区副本接收消息成功就响应成功还是fellower分区从leader同步成功才响应成功,这个参数对于消息是否丢失起着重要作用:

1) acks=0,生产者在成功写入消息之前不会等待任何来自服务器的响应。由于不需要等到服务器的响应,可以以网络支持的最大速度发送消息,从而达到很高的吞吐量。

2) acks=1,只要集群的leader分区副本接收到了消息,就会向生产者响应成功。一旦消息无法写入leader分区副本(比如网络原因、leader节点崩溃),生产者会收到一个错误响应,为了避免数据丢失,生产者会重新发送消息。这种方式的吞吐量取决于使用的是异步发送还是同步发送。

3) acks =all,只要ISR同步副本数大于等于最小同步副本数min.insync.replicas( 提醒:ISR是动态的 )收到消息时,生产者才会接收到服务器的成功响应。这种模式是最高级别的,也是最安全的,可以确保不止一个Broker接收到了消息,该模式的延迟会很高。

当acks=all时,只要ISR同步副本中有主备副本都同步了才会响应成功给生产者。其实这里面存在一个问题:ISR同步副本是动态的,有可能仅仅含有一个leader副本(相当于acks=1),也有可能的全部副本(这个也没必要,拜占庭将军场景只要保证一半以上的副本正常同步)。需要一个参数决定至少有几个副本需要同步成功才能响应成功给生产者。

为了解决这个问题,Kafka的Broker端提供了一个参数**min.insync.replicas**,该参数控制着至少被写入的副本数,该值默认值为1,生产环境中可以根据部署的是单节点还是多节点,多节点要能够满足拜占庭将军场景,我们以3节点场景为例。

3节点场景1: 当min.insync.replicas=2且acks=all时,如果ISR列表只有[1,2],3被踢出ISR列表,只要保证2个副本同步了,生产者就会收到成功响应。

3节点场景2 :当min.insync.replicas=2时,如果ISR列表只有[1,2],3被踢出ISR列表。当acks=all时,则响应失败(需要生产者重新发消息直到响应成功);当acks=0或者acks=1时成功写入数据。

ps:该场景下acks=all,kafka集群一旦同步失败就直接响应失败嘛?还是有超时时长?生产者已经将消息发送到leader分区,kafka(响应失败)对这个消息如何处理?入集群持久化嘛?后续重试发送的消息如何处理?

3节点场景3 :如果min.insync.replicas=2且acks=all,此时ISR列表为[1,2,3],只要2副本同步成功还是等到所有的副本都同步了,才会向生产者发送成功响应?因为min.insync.replicas=2只是一个最低限制,同步副本少于该配置值,则会抛异常,而acks=all,是需要保证所有的ISR列表的副本都同步了才可以发送成功响应。

1) 要想系统的可靠性,从来不是一方能决定的, kafka生产者发送消息的可靠性 主要由 kafka服务端 的动态同步副本列表ISR和最小同步副本数min.insync.replicas以及 生产者 参数ack=all。

2) kafka服务端的最小同步副本数min.insync.replicas由部署的集群和节点个数来决定,满足拜占庭将军场景(节点个数一半以上即可)。

3) 生产者的 副本个数 由部署的集群和节点个数来决定,满足拜占庭将军场景(节点个数一半以上即可)。如果是单副本的话,本文讨论的就没意义了。

4) kafka单节点场景,kafka服务端的动态同步副本列表ISR和最小同步副本数min.insync.replicas均为1,生产者的副本数为1(如果大于1估计会失败),ack=all和ack=1的效果一样。

5) kafka 3节点场景,kafka服务端的动态同步副本列表ISR为3个,最小同步副本数min.insync.replicas均为2,生产者的副本数为2和ack=all。

6) kafka 奇数n节点场景,kafka服务端的动态同步副本列表ISR为n个,最小同步副本数min.insync.replicas均为(n+1)/2,生产者的副本数为(n+1)/2和ack=all。

Kafka生产者ack机制剖析

kafka官网给出的kafka生产者配置参数

Kafka生产者——结合spring开发

Kafka生产者端

可靠性保证:

producer向broker发送消息数据,需要有一定的可靠性,至少要保证数据:

1、不丢失

2、不重复

producer提供了一些参数,在编写producer是进行合理设置和编写,就可以保证数据的可靠性。

acks 参数配置

为保证producer发送的数据能够可靠的发送到指定topic,topic的每个partition收到消息后,都需要向producer发送ack(acknowledgement确认收到),如果producer收到 ack,就会进行下一轮的发送,否则重新发送数据。

0: producer 不等待 broker 的 ack,这一操作提供了一个最低的延迟, broker 一接收到还没有写入磁盘就已经返回,当 broker 故障时有可能丢失数据;

1: producer 等待 broker 的 ack, partition 的 leader 落盘成功后返回 ack,如果在 follower同步成功之前 leader 故障,那么将会丢失数据;

-1(all) : producer 等待 broker 的 ack, partition 的 leader 和 follower 全部落盘成功后才返回 ack。但是如果在 follower 同步完成后, broker 发送 ack 之前leader 发生故障,那么会造成数据重复。

Exactly Once 语义

当ack级别设置为-1的时候,可以保证producer到broker之间不会丢失数据,即At
Least Once 语义 。相对的,将服务器ack级别设置为0,可以保证生产者每条消息只会被发送一次,即At Most Once 语义 。

At Least Once 可以保证数据不丢失,但是不能保证数据不重复;相对的, At Least Once可以保证数据不重复,但是不能保证数据不丢失。

对于一些重要信息,我们要求既不能重复也不能丢失,这时我们需要使用Exactly Once 语义 。0.11 版本的 Kafka,引入了一项重大特性:幂等性。 所谓幂等性就是producer无论向broker发送了多少次重复数据,broker都只会持久化一条。幂等性结合At Least Once语义,就结合成了Kafka的Exactly Once语义。
At Least Once + 幂等性 = Exactly Once
启动幂等性,只需要将Producer的参数enable.idompotence 设置为true,ack设置为-1即可。

开启幂等性的Producer在初始化的时候会被分配一个PID,发往同一个分区的消息会附带Sequence Number(自动增长)。Broker端会对<PID,Partition,SeqNumber>做缓存,当具有相同主键的消息提交的时候,broker只会持久化一条消息。
msg1<PID:1,Partition:1,SeqNumber:0,message : a >
msg2<PID:1,Partition:1,SeqNumber:1,message : b >
msg2<PID:1,Partition:1,SeqNumber:2,message : c >

但是,PID重启就会变化,同时不同分区也会有不同主键,所以幂等性无法保证跨分区跨会话。这里我们就需要引进kafka事务。

事务

Kafka 从 0.11 版本开始引入了事务支持。事务可以保证 Kafka 在 Exactly Once 语义的基础上,生产和消费可以跨分区和会话,要么全部成功,要么全部失败 。为了实现跨分区跨会话事务,引入一个全局唯一的Transaction id ,将pproducer的pid和Transaction id进行绑定,这样,当producer重启后,就可以通过Transaction ID 获得原来的 PID。这个参数通过客户端程序来进行设定 。

我们使用kafka消息事务的场景有以下两种:

  1. 在一次业务中,存在消费消息,又存在生产消息。此时如果消息生产失败,那么消费者需要回滚。这种情况称为consumer-transform-producer
  2. 在一次业务中,存在多次生产消息,其中后续生产的消息抛出异常,前置生产的消息需要回滚。

事务要求生产者开启幂等性特性,因此通过将transactional.id参数设置为非空从而开启事务特性的同时
需要将ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG设置为true(默认值为true),如果显示设
置为false,则会抛出异常。


以上是保证producer发送数据可靠性保证的相关参数,结合spring-kafka的具体使用如下。

spring-kafka生产端

spring-kafkaProducer.xml配置:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns="http://www.springframework.org/schema/beans" xmlns:aop="http://www.springframework.org/schema/aop"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
    http://www.springframework.org/schema/beans/spring-beans.xsd
    http://www.springframework.org/schema/context
    http://www.springframework.org/schema/context/spring-context.xsd
    http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd">
    <context:component-scan base-package="producer" />
    <bean id="producerProperties" class="java.util.HashMap">
        <constructor-arg>
            <map>
                <!--broker集群地址-->
                <entry key="bootstrap.servers" value="192.168.25.10:9092,192.168.25.11:9092,192.168.25.12:9092"/>
                <!--acks 参数配置-->
                <entry key="acks" value="all"/>
                <!--发送失败重试次数-->
                <entry key ="retries" value="3"/>
                <!--批次发送大小的内存阀值-->
                <entry key="batch.size" value="16384"/>
                <!--批处理延迟时间上限-->
                <entry key="linger.ms" value="1"/>
                <!--开启幂等性-->
                <entry key="enable.idempotence" value="true"/>
                <!--批处理缓冲区-->
                <entry key="buffer.memory" value="33554432"/>
                <!--key序列化器-->
                <entry key="key.serializer" value="org.apache.kafka.common.serialization.StringSerializer"/>
                <!--value序列化器-->
                <entry key="value.serializer" value="org.apache.kafka.common.serialization.StringSerializer"/>
            </map>
        </constructor-arg>
    </bean>
    <!--配置一个生产者监听器,在该类写发送成功或失败的回调方法-->
    <bean id="producerLisener" class="producer.KafkaSendResultHandler"></bean>
    <!--springkafka提供的发送类,对kafka发送方法进行加强性的封装-->
    <bean id="kafkaTemplate" class="org.springframework.kafka.core.KafkaTemplate">
        <constructor-arg ref="producerFactory"/>
        <constructor-arg name="autoFlush" value="true"/>
        <property name="defaultTopic" value="myTopic"/>
        <property name="producerListener" ref="producerLisener"></property>
    </bean>
    <!--producer工厂-->
    <bean id="producerFactory" class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
        <constructor-arg>
            <ref bean="producerProperties"/>
        </constructor-arg>
    </bean>
</beans>

部分重要参数详解:

acks:

? 这个参数用来指定分区中必须有多少个副本收到这条消息,之后生产者才会认为这条消息时写入成功
的。

  • ack=0, 生产者在成功写入消息之前不会等待任何来自服务器的相应。如果出现问题生产者是感知
    不到的,消息就丢失了。不过因为生产者不需要等待服务器响应,所以它可以以网络能够支持的最
    大速度发送消息,从而达到很高的吞吐量。

  • ack=1,默认值为1,只要集群的首领节点收到消息,生产这就会收到一个来自服务器的成功响
    应。如果消息无法达到首领节点(比如首领节点崩溃,新的首领还没有被选举出来),生产者会收
    到一个错误响应,为了避免数据丢失,生产者会重发消息。但是,这样还有可能会导致数据丢失,
    如果收到写成功通知,此时首领节点还没来的及同步数据到follower节点,首领节点崩溃,就会导
    致数据丢失。

  • ack=-1, 只有当所有参与复制的节点都收到消息时,生产这会收到一个来自服务器的成功响应,
    这种模式是最安全的,它可以保证不止一个服务器收到消息。

    注意:acks参数配置的是一个字符串类型,而不是整数类型,如果配置为整数类型会抛出异常

retries :

? 生产者从服务器收到的错误有可能是临时性的错误(比如分区找不到首领)。在这种情况下,如果达到
了 retires 设置的次数,生产者会放弃重试并返回错误。默认情况下,生产者会在每次重试之间等待
100ms,可以通过 retry.backoff.ms 参数来修改这个时间间隔。

batch.size :

? 当有多个消息要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可
以使用的内存大小,按照字节数计算,而不是消息个数。当批次被填满,批次里的所有消息会被发送出
去。不过生产者并不一定都会等到批次被填满才发送,半满的批次,甚至只包含一个消息的批次也可能
被发送。所以就算把 batch.size 设置的很大,也不会造成延迟,只会占用更多的内存而已,如果设置
的太小,生产者会因为频繁发送消息而增加一些额外的开销。

max.request.size :

? 该参数用于控制生产者发送的请求大小,它可以指定能发送的单个消息的最大值,也可以指单个请求里
所有消息的总大小。 broker 对可接收的消息最大值也有自己的限制( message.max.size ),所以两
边的配置最好匹配,避免生产者发送的消息被 broker 拒绝。

linger.ms:批处理延迟时间上限

buffer.memory:批处理缓冲区

enable.idempotence:是否开启幂等性

ProducerListener类

消息发送后的回调方法,注意的是,这里的监听回显的数据时要发送的数据,不是返回的数据,可以通过日志来观察发送数据是否正确。

public class KafkaSendResultHandler implements ProducerListener {
   private static final Logger log = LoggerFactory.getLogger(KafkaSendResultHandler.class);
    public void onSuccess(String topic, Integer partition, Object key, Object value, RecordMetadata recordMetadata) {
        log.info("kafka message send successful : "+"---topic:"+topic+"---partition:"+partition+"---key:"+key+"---value:"+value+"---RecordMetadata:"+recordMetadata);
    }

    public void onError(String topic, Integer partition, Object key, Object value, Exception e) {
        log.error("kafka message send fail : "+"---topic:"+topic+"---partition:"+partition+"---key:"+key+"---value:"+value);
        e.printStackTrace();
    }

    public boolean isInterestedInSuccess() {
        log.info("ProducerListener started");
        return true;
    }
}

ProducerClient类

对kafkaTemplate的再一次封装,kafka在消息发送的时候发送方式可以分为同步发送和异步发送。

同步发送:

? 同步发送的意思就是,一条消息发送之后,会阻塞当前线程, 直至返回 ack。

  //同步发送
   public void syncSend(){
    testTemplate.send("topic",result.toString()).get(10, TimeUnit.SECONDS);
   }
    

异步发送:

//异步发送
   public void asyncSend() {

      ListenableFuture<SendResult<Integer, String>> future = testTemplate.send("topic",result.toString());

      future.addCallback(new ListenableFutureCallback<SendResult<Integer, String>>() {
                    @Override
                    public void onSuccess(SendResult<Integer, String> result) {
                       log.info("success");
                    }
                    @Override
                    public void onFailure(Throwable ex) {
                       log.error("failure");
                    }
                });
}

ProducerClient对kafkaTemplate的封装(不带事务)

这里只封装了最简单的发送方法,同时可对其他发送方法进行封装,只需要修改传参即可。

public class ProducerClient {
    @Autowired
    private KafkaTemplate<String,String> kafkaTemplate;
    /*同步发送*/
    //轮询方式发送
    public void sendMessage(String topicName,String message){
        Map<String,Object> m = new HashMap<String,Object>();
        SendResult<String, String> sendResult = null;
        try {
            sendResult = kafkaTemplate.send(topicName, message).get();
            /*检查recordMetadata的offset数据,不检查producerRecord*/
            if(sendResult!=null) {
                Long offsetIndex = sendResult.getRecordMetadata().offset();
                if (offsetIndex != null && offsetIndex >= 0) {
                    m.put("code", KafkaMesConstant.SUCCESS_CODE);
                    m.put("message", KafkaMesConstant.SUCCESS_MES);
                } else {
                    m.put("code", KafkaMesConstant.KAFKA_NO_OFFSET_CODE);
                    m.put("message", KafkaMesConstant.KAFKA_NO_OFFSET_MES);
                }
            }else {
                m.put("code", KafkaMesConstant.KAFKA_NO_RESULT_CODE);
                m.put("message", KafkaMesConstant.KAFKA_NO_RESULT_MES);
            }
        }  catch (InterruptedException e) {
            e.printStackTrace();
            m.put("code", KafkaMesConstant.KAFKA_SEND_ERROR_CODE);
            m.put("message", KafkaMesConstant.KAFKA_SEND_ERROR_MES);
        } catch (ExecutionException e) {
            e.printStackTrace();
            m.put("code", KafkaMesConstant.KAFKA_SEND_ERROR_CODE);
            m.put("message", KafkaMesConstant.KAFKA_SEND_ERROR_MES);
        }
        System.out.println("kafkaServers response : "+m);
    }
}
public class KafkaMesConstant {
    public static final String SUCCESS_CODE = "00000";
    public static final String SUCCESS_MES = "成功";

    /*kakfa-code*/
    public static final String KAFKA_SEND_ERROR_CODE = "30001";
    public static final String KAFKA_NO_RESULT_CODE = "30002";
    public static final String KAFKA_NO_OFFSET_CODE = "30003";

    /*kakfa-mes*/
    public static final String KAFKA_SEND_ERROR_MES = "发送消息超时,联系liuhui";
    public static final String KAFKA_NO_RESULT_MES = "未查询到返回结果,联系liuhui";
    public static final String KAFKA_NO_OFFSET_MES = "未查到返回数据的offset,联系liuhui";
}

测试一下

public class excuter {

    @Test
    public void producer() throws InterruptedException {
        ApplicationContext context = new ClassPathXmlApplicationContext("producer.xml");
        ProducerClient producerClient = (ProducerClient) context.getBean("producerClient");
        producerClient.sendMessage("topic2", new SimpleDateFormat("yyyy-MM-dd hh:mm:ss").format(new Date()).toString());
        Thread.sleep(1000);
    }

}

控制台结果:(我这里没有使用日志输出,在实际开发中需要使用日志开发)

ProducerListener started
kafka message send successful : ---topic:topic2---partition:null---key:null---value:2019-11-19 02:57:07---RecordMetadata:topic2-2@4928
kafkaServers response : {code=00000, message=成功}

以上是关于如何保证kafka生产者发送消息的可靠性的主要内容,如果未能解决你的问题,请参考以下文章

Kafka架构组成及相关内容

Kafka架构组成及相关内容

Kafka生产者——结合spring开发

如何保证消息队列的可靠性传输?

Kafka - 消息排序保证

如何保证消息的可靠性传输