kafka生产者的发送消息的流程以及代码案例

Posted 健康平安的活着

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了kafka生产者的发送消息的流程以及代码案例相关的知识,希望对你有一定的参考价值。

一 kafka发送消息流程

1.1 发送流程原理

kafka在发送消息的过程中,主要涉及两个线程main 线程和 Sender 线程

main 线程 中创建了一个双端队列 RecordAccumulatormain 线程将消息发送给 RecordAccumulator。

Sender 线程不断从 RecordAccumulator 中拉取消息发送到 Kafka Broker

 1.2 发送信息的时机

生产者何时向broker中发送信息。触发什么机制开始发送呢?这里有两种机制

1.batch.size:数据累计到batch.size之后,sender才会发送数据。

2.linger.ms:如果数据迟迟未达到batch.size,但是设置的linger.ms时间满足条件,则就进行发送数据,单位未ms,默认值是0ms;表示没有延迟。

1.3 生产者需要的配置参数说明

 

 二 代码实操

2.1 异步演示发送代码

需求: 创建 Kafka 生产者,采用异步的方式发送到 Kafka Broker。 确保kafka集群已经启动 1.代码如下:
package com.ljf.spring.boot.demo.producer;

import com.ljf.spring.boot.demo.utils.DateUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Date;
import java.util.Properties;

/**
 * @ClassName: producer
 * @Description: TODO
 * @Author: liujianfu
 * @Date: 2022/04/06 17:02:52
 * @Version: V1.0
 **/
public class producer 
    public static void main(String[] args) 
      // 1. 创建 kafka 生产者的配置对象
        Properties properties = new Properties();
        // 2. 给 kafka 配置对象添加配置信息:bootstrap.servers
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
                "192.168.152.136:9092,192.168.152.138:9092,192.168.152.140:9092");
        // key,value 序列化(必须):key.serializer,value.serializer
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringSerializer");

        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringSerializer");
        // 3. 创建 kafka 生产者对象
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
        // 4. 调用 send 方法,发送消息
        for (int i = 100; i < 105; i++) 
            String v="kfdemo-xx"+i+"时间:"+DateUtils.dateToStr(new Date(),"yyyy-MM-dd HH:mm:ss");
            kafkaProducer.send(new ProducerRecord<>("kafka-ljf",v));
        
        // 5. 关闭资源
        kafkaProducer.close();
        System.out.println("执行完毕!!!");
    


2.启动consumer端

[root@localhost kafka_2.12-2.1.0]# bin/kafka-console-consumer.sh --from-beginning --topic kafka-ljf  --bootstrap-server  192.168.152.136:9092,192.168.152.138:9092,192.168.152.140:9092

3.查看consumer的消费情况

 2.2 带回调函数的异步发送

回调函数会在 producer 收到 ack 时调用,为异步调用,该方法有两个参数,分别是元 据信息(RecordMetadata)和异常信息(Exception),如果 Exception 为 null,说明消息发 送成功,如果 Exception 不为 null,说明消息发送失败。 1.代码:
package com.ljf.spring.boot.demo.producer;

import com.ljf.spring.boot.demo.utils.DateUtils;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Date;
import java.util.Properties;

/**
 * @ClassName: CustomProducerCallback
 * @Description: TODO
 * @Author: liujianfu
 * @Date: 2022/04/06 18:02:17
 * @Version: V1.0
 **/
public class CustomProducerCallback 
    public static void main(String[] args) throws InterruptedException 
        // 1. 创建 kafka 生产者的配置对象
        Properties properties = new Properties();
        // 2. 给 kafka 配置对象添加配置信息
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
                "192.168.152.136:9092,192.168.152.138:9092,192.168.152.140:9092");
        // key,value 序列化(必须):key.serializer,value.serializer
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                StringSerializer.class.getName());

        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                StringSerializer.class.getName());
        // 3. 创建 kafka 生产者对象
        KafkaProducer<String, String> kafkaProducer = new
                KafkaProducer<String, String>(properties);
        // 4. 调用 send 方法,发送消息
        for (int i = 0; i < 10; i++) 
            // 添加回调
            String info="callback-xx"+i+" 时间为:"+DateUtils.dateToStr(new Date(), "yyyy-MM-dd HH:mm:ss");
            kafkaProducer.send(new ProducerRecord<>("kafka-ljf",info), new Callback() // 该方法在 Producer 收到 ack 时调用,为异步调用
                @Override
                public void onCompletion(RecordMetadata metadata,
                                         Exception exception) 
                    if (exception == null) 
                        // 没有异常,输出信息到控制台
                        System.out.println(" 主题: " +
                                metadata.topic() + "->" + "分区:" + metadata.partition());
                     else 
                        // 出现异常打印
                        exception.printStackTrace();
                    
                
            );
            // 延迟一会会看到数据发往不同分区
            Thread.sleep(2000);
            
        // 5. 关闭资源
        kafkaProducer.close();
    

2.启动消费端

[root@localhost kafka_2.12-2.1.0]# b in/kafka-console-consumer.sh --from-beginning --topic kafka-ljf  --bootstrap-server  192.168.152.136:9092,192.168.152.138:9092,192.168.152.140:9092 3.消费展示

 2.3 同步发送数据

只需在异步发送的基础上,再调用一下 get()方法即可。

1.代码

package com.ljf.spring.boot.demo.producer;

import com.ljf.spring.boot.demo.utils.DateUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Date;
import java.util.Properties;
import java.util.concurrent.ExecutionException;

/**
 * @ClassName: CustomProducerSync
 * @Description: TODO
 * @Author: liujianfu
 * @Date: 2022/04/06 18:07:34
 * @Version: V1.0
 **/
public class CustomProducerSync 
    public static void main(String[] args) throws ExecutionException, InterruptedException 
        // 1. 创建 kafka 生产者的配置对象
        Properties properties = new Properties();
        // 2. 给 kafka 配置对象添加配置信息

        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.152.136:9092,192.168.152.138:9092,192.168.152.140:9092");
        // key,value 序列化(必须):key.serializer,value.serializer
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                StringSerializer.class.getName());

        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                StringSerializer.class.getName());
        // 3. 创建 kafka 生产者对象
        KafkaProducer<String, String> kafkaProducer = new
                KafkaProducer<String, String>(properties);
        // 4. 调用 send 方法,发送消息
        for (int i = 0; i < 10; i++) 
            // 异步发送 默认
             // kafkaProducer.send(new ProducerRecord<>("first","kafka" + i));
            // 同步发送,只需在异步发送的基础上,再调用一下 get()方法即可。
            String info="tonbu-xx"+i+" 时间为:"+ DateUtils.dateToStr(new Date(), "yyyy-MM-dd HH:mm:ss");
            kafkaProducer.send(new ProducerRecord<>("kafka-ljf",info)).get();
        
        // 5. 关闭资源
        kafkaProducer.close();
        System.out.println("jieshu");
    
    

2.启动消费者脚本

[root@localhost kafka_2.12-2.1.0]# bin/kafka-console-consumer.sh --from-beginning --topic kafka-ljf  --bootstrap-server  192.168.152.136:9092,192.168.152.138:9092,192.168.152.140:9092​​​​​​​

3.查看消费结果

 

以上是关于kafka生产者的发送消息的流程以及代码案例的主要内容,如果未能解决你的问题,请参考以下文章

源码分析 Kafka 消息发送流程(文末附流程图)

源码分析 Kafka 消息发送流程(文末附流程图)

Kafka工作流程

大数据-消息队列-Kafka:Producer(生产者)发送消息采用的是异步发送两个线程:main线程和Sender线程线程共享变量:双端队列RecordAccumulator

kafka 生产者发送流程

Kafka生产者