kafka生产者的发送消息的流程以及代码案例
Posted 健康平安的活着
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了kafka生产者的发送消息的流程以及代码案例相关的知识,希望对你有一定的参考价值。
一 kafka发送消息流程
1.1 发送流程原理
kafka在发送消息的过程中,主要涉及两个线程main 线程和 Sender 线程。
在 main 线程 中创建了一个双端队列 RecordAccumulator。main 线程将消息发送给 RecordAccumulator。
Sender 线程不断从 RecordAccumulator 中拉取消息发送到 Kafka Broker 。1.2 发送信息的时机
生产者何时向broker中发送信息。触发什么机制开始发送呢?这里有两种机制
1.batch.size:数据累计到batch.size之后,sender才会发送数据。
2.linger.ms:如果数据迟迟未达到batch.size,但是设置的linger.ms时间满足条件,则就进行发送数据,单位未ms,默认值是0ms;表示没有延迟。
1.3 生产者需要的配置参数说明
二 代码实操
2.1 异步演示发送代码
需求: 创建 Kafka 生产者,采用异步的方式发送到 Kafka Broker。 确保kafka集群已经启动 1.代码如下:package com.ljf.spring.boot.demo.producer;
import com.ljf.spring.boot.demo.utils.DateUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Date;
import java.util.Properties;
/**
* @ClassName: producer
* @Description: TODO
* @Author: liujianfu
* @Date: 2022/04/06 17:02:52
* @Version: V1.0
**/
public class producer
public static void main(String[] args)
// 1. 创建 kafka 生产者的配置对象
Properties properties = new Properties();
// 2. 给 kafka 配置对象添加配置信息:bootstrap.servers
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"192.168.152.136:9092,192.168.152.138:9092,192.168.152.140:9092");
// key,value 序列化(必须):key.serializer,value.serializer
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
// 3. 创建 kafka 生产者对象
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
// 4. 调用 send 方法,发送消息
for (int i = 100; i < 105; i++)
String v="kfdemo-xx"+i+"时间:"+DateUtils.dateToStr(new Date(),"yyyy-MM-dd HH:mm:ss");
kafkaProducer.send(new ProducerRecord<>("kafka-ljf",v));
// 5. 关闭资源
kafkaProducer.close();
System.out.println("执行完毕!!!");
2.启动consumer端
[root@localhost kafka_2.12-2.1.0]# bin/kafka-console-consumer.sh --from-beginning --topic kafka-ljf --bootstrap-server 192.168.152.136:9092,192.168.152.138:9092,192.168.152.140:9092
3.查看consumer的消费情况
2.2 带回调函数的异步发送
回调函数会在 producer 收到 ack 时调用,为异步调用,该方法有两个参数,分别是元 数 据信息(RecordMetadata)和异常信息(Exception),如果 Exception 为 null,说明消息发 送成功,如果 Exception 不为 null,说明消息发送失败。 1.代码:package com.ljf.spring.boot.demo.producer;
import com.ljf.spring.boot.demo.utils.DateUtils;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Date;
import java.util.Properties;
/**
* @ClassName: CustomProducerCallback
* @Description: TODO
* @Author: liujianfu
* @Date: 2022/04/06 18:02:17
* @Version: V1.0
**/
public class CustomProducerCallback
public static void main(String[] args) throws InterruptedException
// 1. 创建 kafka 生产者的配置对象
Properties properties = new Properties();
// 2. 给 kafka 配置对象添加配置信息
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"192.168.152.136:9092,192.168.152.138:9092,192.168.152.140:9092");
// key,value 序列化(必须):key.serializer,value.serializer
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
// 3. 创建 kafka 生产者对象
KafkaProducer<String, String> kafkaProducer = new
KafkaProducer<String, String>(properties);
// 4. 调用 send 方法,发送消息
for (int i = 0; i < 10; i++)
// 添加回调
String info="callback-xx"+i+" 时间为:"+DateUtils.dateToStr(new Date(), "yyyy-MM-dd HH:mm:ss");
kafkaProducer.send(new ProducerRecord<>("kafka-ljf",info), new Callback() // 该方法在 Producer 收到 ack 时调用,为异步调用
@Override
public void onCompletion(RecordMetadata metadata,
Exception exception)
if (exception == null)
// 没有异常,输出信息到控制台
System.out.println(" 主题: " +
metadata.topic() + "->" + "分区:" + metadata.partition());
else
// 出现异常打印
exception.printStackTrace();
);
// 延迟一会会看到数据发往不同分区
Thread.sleep(2000);
// 5. 关闭资源
kafkaProducer.close();
2.启动消费端
[root@localhost kafka_2.12-2.1.0]# b in/kafka-console-consumer.sh --from-beginning --topic kafka-ljf --bootstrap-server 192.168.152.136:9092,192.168.152.138:9092,192.168.152.140:9092 3.消费展示2.3 同步发送数据
只需在异步发送的基础上,再调用一下 get()方法即可。1.代码
package com.ljf.spring.boot.demo.producer;
import com.ljf.spring.boot.demo.utils.DateUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Date;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
/**
* @ClassName: CustomProducerSync
* @Description: TODO
* @Author: liujianfu
* @Date: 2022/04/06 18:07:34
* @Version: V1.0
**/
public class CustomProducerSync
public static void main(String[] args) throws ExecutionException, InterruptedException
// 1. 创建 kafka 生产者的配置对象
Properties properties = new Properties();
// 2. 给 kafka 配置对象添加配置信息
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.152.136:9092,192.168.152.138:9092,192.168.152.140:9092");
// key,value 序列化(必须):key.serializer,value.serializer
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
// 3. 创建 kafka 生产者对象
KafkaProducer<String, String> kafkaProducer = new
KafkaProducer<String, String>(properties);
// 4. 调用 send 方法,发送消息
for (int i = 0; i < 10; i++)
// 异步发送 默认
// kafkaProducer.send(new ProducerRecord<>("first","kafka" + i));
// 同步发送,只需在异步发送的基础上,再调用一下 get()方法即可。
String info="tonbu-xx"+i+" 时间为:"+ DateUtils.dateToStr(new Date(), "yyyy-MM-dd HH:mm:ss");
kafkaProducer.send(new ProducerRecord<>("kafka-ljf",info)).get();
// 5. 关闭资源
kafkaProducer.close();
System.out.println("jieshu");
2.启动消费者脚本
[root@localhost kafka_2.12-2.1.0]# bin/kafka-console-consumer.sh --from-beginning --topic kafka-ljf --bootstrap-server 192.168.152.136:9092,192.168.152.138:9092,192.168.152.140:9092
3.查看消费结果
以上是关于kafka生产者的发送消息的流程以及代码案例的主要内容,如果未能解决你的问题,请参考以下文章
大数据-消息队列-Kafka:Producer(生产者)发送消息采用的是异步发送两个线程:main线程和Sender线程线程共享变量:双端队列RecordAccumulator