Kafka生产者——结合spring开发

Posted 6。

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kafka生产者——结合spring开发相关的知识,希望对你有一定的参考价值。

Kafka生产者端

可靠性保证:

producer向broker发送消息数据,需要有一定的可靠性,至少要保证数据:

1、不丢失

2、不重复

producer提供了一些参数,在编写producer是进行合理设置和编写,就可以保证数据的可靠性。

acks 参数配置

为保证producer发送的数据能够可靠的发送到指定topic,topic的每个partition收到消息后,都需要向producer发送ack(acknowledgement确认收到),如果producer收到 ack,就会进行下一轮的发送,否则重新发送数据。

0: producer 不等待 broker 的 ack,这一操作提供了一个最低的延迟, broker 一接收到还没有写入磁盘就已经返回,当 broker 故障时有可能丢失数据;

1: producer 等待 broker 的 ack, partition 的 leader 落盘成功后返回 ack,如果在 follower同步成功之前 leader 故障,那么将会丢失数据;

-1(all) : producer 等待 broker 的 ack, partition 的 leader 和 follower 全部落盘成功后才返回 ack。但是如果在 follower 同步完成后, broker 发送 ack 之前leader 发生故障,那么会造成数据重复。

Exactly Once 语义

当ack级别设置为-1的时候,可以保证producer到broker之间不会丢失数据,即At
Least Once 语义 。相对的,将服务器ack级别设置为0,可以保证生产者每条消息只会被发送一次,即At Most Once 语义 。

At Least Once 可以保证数据不丢失,但是不能保证数据不重复;相对的, At Least Once可以保证数据不重复,但是不能保证数据不丢失。

对于一些重要信息,我们要求既不能重复也不能丢失,这时我们需要使用Exactly Once 语义 。0.11 版本的 Kafka,引入了一项重大特性:幂等性。 所谓幂等性就是producer无论向broker发送了多少次重复数据,broker都只会持久化一条。幂等性结合At Least Once语义,就结合成了Kafka的Exactly Once语义。
At Least Once + 幂等性 = Exactly Once
启动幂等性,只需要将Producer的参数enable.idompotence 设置为true,ack设置为-1即可。

开启幂等性的Producer在初始化的时候会被分配一个PID,发往同一个分区的消息会附带Sequence Number(自动增长)。Broker端会对<PID,Partition,SeqNumber>做缓存,当具有相同主键的消息提交的时候,broker只会持久化一条消息。
msg1<PID:1,Partition:1,SeqNumber:0,message : a >
msg2<PID:1,Partition:1,SeqNumber:1,message : b >
msg2<PID:1,Partition:1,SeqNumber:2,message : c >

但是,PID重启就会变化,同时不同分区也会有不同主键,所以幂等性无法保证跨分区跨会话。这里我们就需要引进kafka事务。

事务

Kafka 从 0.11 版本开始引入了事务支持。事务可以保证 Kafka 在 Exactly Once 语义的基础上,生产和消费可以跨分区和会话,要么全部成功,要么全部失败 。为了实现跨分区跨会话事务,引入一个全局唯一的Transaction id ,将pproducer的pid和Transaction id进行绑定,这样,当producer重启后,就可以通过Transaction ID 获得原来的 PID。这个参数通过客户端程序来进行设定 。

我们使用kafka消息事务的场景有以下两种:

  1. 在一次业务中,存在消费消息,又存在生产消息。此时如果消息生产失败,那么消费者需要回滚。这种情况称为consumer-transform-producer
  2. 在一次业务中,存在多次生产消息,其中后续生产的消息抛出异常,前置生产的消息需要回滚。

事务要求生产者开启幂等性特性,因此通过将transactional.id参数设置为非空从而开启事务特性的同时
需要将ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG设置为true(默认值为true),如果显示设
置为false,则会抛出异常。


以上是保证producer发送数据可靠性保证的相关参数,结合spring-kafka的具体使用如下。

spring-kafka生产端

spring-kafkaProducer.xml配置:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns="http://www.springframework.org/schema/beans" xmlns:aop="http://www.springframework.org/schema/aop"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
    http://www.springframework.org/schema/beans/spring-beans.xsd
    http://www.springframework.org/schema/context
    http://www.springframework.org/schema/context/spring-context.xsd
    http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd">
    <context:component-scan base-package="producer" />
    <bean id="producerProperties" class="java.util.HashMap">
        <constructor-arg>
            <map>
                <!--broker集群地址-->
                <entry key="bootstrap.servers" value="192.168.25.10:9092,192.168.25.11:9092,192.168.25.12:9092"/>
                <!--acks 参数配置-->
                <entry key="acks" value="all"/>
                <!--发送失败重试次数-->
                <entry key ="retries" value="3"/>
                <!--批次发送大小的内存阀值-->
                <entry key="batch.size" value="16384"/>
                <!--批处理延迟时间上限-->
                <entry key="linger.ms" value="1"/>
                <!--开启幂等性-->
                <entry key="enable.idempotence" value="true"/>
                <!--批处理缓冲区-->
                <entry key="buffer.memory" value="33554432"/>
                <!--key序列化器-->
                <entry key="key.serializer" value="org.apache.kafka.common.serialization.StringSerializer"/>
                <!--value序列化器-->
                <entry key="value.serializer" value="org.apache.kafka.common.serialization.StringSerializer"/>
            </map>
        </constructor-arg>
    </bean>
    <!--配置一个生产者监听器,在该类写发送成功或失败的回调方法-->
    <bean id="producerLisener" class="producer.KafkaSendResultHandler"></bean>
    <!--springkafka提供的发送类,对kafka发送方法进行加强性的封装-->
    <bean id="kafkaTemplate" class="org.springframework.kafka.core.KafkaTemplate">
        <constructor-arg ref="producerFactory"/>
        <constructor-arg name="autoFlush" value="true"/>
        <property name="defaultTopic" value="myTopic"/>
        <property name="producerListener" ref="producerLisener"></property>
    </bean>
    <!--producer工厂-->
    <bean id="producerFactory" class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
        <constructor-arg>
            <ref bean="producerProperties"/>
        </constructor-arg>
    </bean>
</beans>

部分重要参数详解:

acks:

? 这个参数用来指定分区中必须有多少个副本收到这条消息,之后生产者才会认为这条消息时写入成功
的。

  • ack=0, 生产者在成功写入消息之前不会等待任何来自服务器的相应。如果出现问题生产者是感知
    不到的,消息就丢失了。不过因为生产者不需要等待服务器响应,所以它可以以网络能够支持的最
    大速度发送消息,从而达到很高的吞吐量。

  • ack=1,默认值为1,只要集群的首领节点收到消息,生产这就会收到一个来自服务器的成功响
    应。如果消息无法达到首领节点(比如首领节点崩溃,新的首领还没有被选举出来),生产者会收
    到一个错误响应,为了避免数据丢失,生产者会重发消息。但是,这样还有可能会导致数据丢失,
    如果收到写成功通知,此时首领节点还没来的及同步数据到follower节点,首领节点崩溃,就会导
    致数据丢失。

  • ack=-1, 只有当所有参与复制的节点都收到消息时,生产这会收到一个来自服务器的成功响应,
    这种模式是最安全的,它可以保证不止一个服务器收到消息。

    注意:acks参数配置的是一个字符串类型,而不是整数类型,如果配置为整数类型会抛出异常

retries :

? 生产者从服务器收到的错误有可能是临时性的错误(比如分区找不到首领)。在这种情况下,如果达到
了 retires 设置的次数,生产者会放弃重试并返回错误。默认情况下,生产者会在每次重试之间等待
100ms,可以通过 retry.backoff.ms 参数来修改这个时间间隔。

batch.size :

? 当有多个消息要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可
以使用的内存大小,按照字节数计算,而不是消息个数。当批次被填满,批次里的所有消息会被发送出
去。不过生产者并不一定都会等到批次被填满才发送,半满的批次,甚至只包含一个消息的批次也可能
被发送。所以就算把 batch.size 设置的很大,也不会造成延迟,只会占用更多的内存而已,如果设置
的太小,生产者会因为频繁发送消息而增加一些额外的开销。

max.request.size :

? 该参数用于控制生产者发送的请求大小,它可以指定能发送的单个消息的最大值,也可以指单个请求里
所有消息的总大小。 broker 对可接收的消息最大值也有自己的限制( message.max.size ),所以两
边的配置最好匹配,避免生产者发送的消息被 broker 拒绝。

linger.ms:批处理延迟时间上限

buffer.memory:批处理缓冲区

enable.idempotence:是否开启幂等性

ProducerListener类

消息发送后的回调方法,注意的是,这里的监听回显的数据时要发送的数据,不是返回的数据,可以通过日志来观察发送数据是否正确。

public class KafkaSendResultHandler implements ProducerListener {
   private static final Logger log = LoggerFactory.getLogger(KafkaSendResultHandler.class);
    public void onSuccess(String topic, Integer partition, Object key, Object value, RecordMetadata recordMetadata) {
        log.info("kafka message send successful : "+"---topic:"+topic+"---partition:"+partition+"---key:"+key+"---value:"+value+"---RecordMetadata:"+recordMetadata);
    }

    public void onError(String topic, Integer partition, Object key, Object value, Exception e) {
        log.error("kafka message send fail : "+"---topic:"+topic+"---partition:"+partition+"---key:"+key+"---value:"+value);
        e.printStackTrace();
    }

    public boolean isInterestedInSuccess() {
        log.info("ProducerListener started");
        return true;
    }
}

ProducerClient类

对kafkaTemplate的再一次封装,kafka在消息发送的时候发送方式可以分为同步发送和异步发送。

同步发送:

? 同步发送的意思就是,一条消息发送之后,会阻塞当前线程, 直至返回 ack。

  //同步发送
   public void syncSend(){
    testTemplate.send("topic",result.toString()).get(10, TimeUnit.SECONDS);
   }
    

异步发送:

//异步发送
   public void asyncSend() {

      ListenableFuture<SendResult<Integer, String>> future = testTemplate.send("topic",result.toString());

      future.addCallback(new ListenableFutureCallback<SendResult<Integer, String>>() {
                    @Override
                    public void onSuccess(SendResult<Integer, String> result) {
                       log.info("success");
                    }
                    @Override
                    public void onFailure(Throwable ex) {
                       log.error("failure");
                    }
                });
}

ProducerClient对kafkaTemplate的封装(不带事务)

这里只封装了最简单的发送方法,同时可对其他发送方法进行封装,只需要修改传参即可。

public class ProducerClient {
    @Autowired
    private KafkaTemplate<String,String> kafkaTemplate;
    /*同步发送*/
    //轮询方式发送
    public void sendMessage(String topicName,String message){
        Map<String,Object> m = new HashMap<String,Object>();
        SendResult<String, String> sendResult = null;
        try {
            sendResult = kafkaTemplate.send(topicName, message).get();
            /*检查recordMetadata的offset数据,不检查producerRecord*/
            if(sendResult!=null) {
                Long offsetIndex = sendResult.getRecordMetadata().offset();
                if (offsetIndex != null && offsetIndex >= 0) {
                    m.put("code", KafkaMesConstant.SUCCESS_CODE);
                    m.put("message", KafkaMesConstant.SUCCESS_MES);
                } else {
                    m.put("code", KafkaMesConstant.KAFKA_NO_OFFSET_CODE);
                    m.put("message", KafkaMesConstant.KAFKA_NO_OFFSET_MES);
                }
            }else {
                m.put("code", KafkaMesConstant.KAFKA_NO_RESULT_CODE);
                m.put("message", KafkaMesConstant.KAFKA_NO_RESULT_MES);
            }
        }  catch (InterruptedException e) {
            e.printStackTrace();
            m.put("code", KafkaMesConstant.KAFKA_SEND_ERROR_CODE);
            m.put("message", KafkaMesConstant.KAFKA_SEND_ERROR_MES);
        } catch (ExecutionException e) {
            e.printStackTrace();
            m.put("code", KafkaMesConstant.KAFKA_SEND_ERROR_CODE);
            m.put("message", KafkaMesConstant.KAFKA_SEND_ERROR_MES);
        }
        System.out.println("kafkaServers response : "+m);
    }
}
public class KafkaMesConstant {
    public static final String SUCCESS_CODE = "00000";
    public static final String SUCCESS_MES = "成功";

    /*kakfa-code*/
    public static final String KAFKA_SEND_ERROR_CODE = "30001";
    public static final String KAFKA_NO_RESULT_CODE = "30002";
    public static final String KAFKA_NO_OFFSET_CODE = "30003";

    /*kakfa-mes*/
    public static final String KAFKA_SEND_ERROR_MES = "发送消息超时,联系liuhui";
    public static final String KAFKA_NO_RESULT_MES = "未查询到返回结果,联系liuhui";
    public static final String KAFKA_NO_OFFSET_MES = "未查到返回数据的offset,联系liuhui";
}

测试一下

public class excuter {

    @Test
    public void producer() throws InterruptedException {
        ApplicationContext context = new ClassPathXmlApplicationContext("producer.xml");
        ProducerClient producerClient = (ProducerClient) context.getBean("producerClient");
        producerClient.sendMessage("topic2", new SimpleDateFormat("yyyy-MM-dd hh:mm:ss").format(new Date()).toString());
        Thread.sleep(1000);
    }

}

控制台结果:(我这里没有使用日志输出,在实际开发中需要使用日志开发)

ProducerListener started
kafka message send successful : ---topic:topic2---partition:null---key:null---value:2019-11-19 02:57:07---RecordMetadata:topic2-2@4928
kafkaServers response : {code=00000, message=成功}

以上是关于Kafka生产者——结合spring开发的主要内容,如果未能解决你的问题,请参考以下文章

Spring Boot - 发布者无法在 Kafka 上发布消息

spring kafka中是不是有多个生产者的代码示例?

spring-kafka整合:DefaultKafkaProducerFactory默认kafka生产者工厂介绍

spring kafka 参数说明

spring kafka producer 生产者

kafka