kafka基础篇——kafka生产者客户端
Posted 敲代码的小小酥
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了kafka基础篇——kafka生产者客户端相关的知识,希望对你有一定的参考价值。
前言
在第一节中我们讲到,在kafka服务端我们可以通过命令创建生产者并发送消息。但是在实际开发中,我们都是以java形式在项目中进行生产者的创建和消息的发送。本节我们基于JAVA API的基础讲解kafka生产者。
一、JAVA API调用kafka生产者入门
先上代码,看java如何创建生产者并发送消息。
首先,在maven工程的pom中引入kafka客户端jar包,如下图:
我们这里讲解的是2.30版本,所以jar包也选2.3版本。
然后创建生产者类,给kafka服务发送消息,代码如下:
public class HelloKafkaProducer {
public static void main(String[] args) {
//创建Properties集合,设定并存放kafka生产者的属性
Properties properties = new Properties();
//kafka服务地址,集群环境可以设置多个,用逗号隔开
properties.put("bootstrap.servers","127.0.0.1:9092");
//kafka接收消息只认识字节数组,定义消息的key的序列化器和value的序列化器,将其转化成字节数组
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//创建kafka生产者对象,并将属性传给生产者
try (KafkaProducer<String,String> producer = new KafkaProducer<String, String>(properties);){
//生产者的消息存放在ProducerRecord对象中
ProducerRecord<String,String> record;
try {
//TODO 发送4条消息
for(int i=0;i<4;i++){
//设定消息发送的主题,消息的key值和消息的内容
record = new ProducerRecord<String,String>("Hello World", null,"lison");
//发送消息
producer.send(record);
System.out.println(i+",message is sent");
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
运行代码,消息就发送到了kafka服务端,主题在kafka服务端原本没有,是新建的主题。下面我们就上面这段代码,进一步进行深入的理解和分析。
二、生产者的属性
上面的入门代码中我们可以看到,需要对生产者设置一些属性,比如kafka服务地址,序列化器等。下面我们总结一下生产者的属性都有哪些。
在kafka的JAVA API中,提供了ProducerConfig类,这个类里,定义了生产者的所有属性。我们看其中几个重点属性:
- acks:0,1,all,-1
消息确认机制。默认值为1。在ProducerConfig类里,有对acks机制的解释:
ACKS_DOC就是对acks属性的解释,翻译后的解释如下:
在请求完成之前,生产者要求首领收到的确认数。这将控制发送的记录的持久性。前面的知识我们讲到,生产者是与分区首领进行交互的,所以,需要分区首领去确认数。
我们继续往下翻译:
允许以下设置:
**acks=0**
如果设置为零,则生产者根本不会等待来自服务器的任何确认。记录将立即添加到套接字缓冲区并被视为已发送。在这种情况下,无法保证服务器已接收到记录,重试
配置将不会生效(因为客户端通常不知道任何失败)。为每条记录返回的偏移量将始终设置为-1**acks=1**
这意味着首领会将记录写入本地日志,但不会等待所有追随者副本的完全确认。在这种情况下,如果领导者在确认记录之后但在追随者复制它之前立即失败,那么记录将丢失acks=all
这意味着领导者将等待全套同步副本确认记录。这保证了只要至少有一个同步副本保持活动状态,记录就不会丢失。这是最有力的保证。这相当于acks=-1设置。”
由上面的翻译我们知道,acks为0时,不管kafka服务的分区首领是否接收成功,发了就行。且acks为0时,kafka的重试机制也就不用了。acks为1时,确认首领成功接收了消息。至于该条消息是否被其他的副本成功复制,生产者不关心。根据我们前面集群讲到的知识可知,如果该条消息在首领中给其他副本复制时失败了,那么该条消息就不会被消费者消费了。acks为all或-1时,是分区以及其他副本都收到了消息,生产者才会认为是消息发送成功了,这是最安全的机制,但是该机制也比较影响性能。
-
batch.size
当多个消息被发送同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。当批次内存 被填满后,批次里的所有消息会被发送出去。但是生产者不一定都会等到批次被填满才发送,半满甚至只包含一个消息的批次也有可能被发送。缺省 16384(16k) ,如果一条消息超过了批次的大小,会写不进去。 -
linger.ms
指定了生产者在发送批次前等待更多消息加入批次的时间, 缺省0 50ms 0就是不按批次走,有就发。 -
max.request.size
控制生产者发送请求最大大小,默认1M (这个参数和Kafka主机的message.max.bytes 参数有关系,设置成一致即可) -
buffer.memory
生产者内存缓冲区大小 -
retries
消息发送失败后的重试次数,缺省 Integer.MAX_VALUE。默认情况下,生产者在每次重试之间等待 100ms,可以通过参数 retry.backoff.ms 参数来改变这个时间间隔。 -
request.timeout.ms
客户端将等待请求的响应的最大时间 默认30秒 -
max.block.ms
最大阻塞时间,超过则抛出异常 缺省60000ms -
compression.type
于压缩数据的压缩类型。默认是无压缩 ,none、gzip、snappy
压缩的是使用时间换空间的思想,具体来说就是使用CPU的时间去换取空间或网络I/0传输量。
三、序列化器
kafka服务接收字节数组,所以在生产者中,需要制定消息的key和value的序列化器。主要实现org.apache.kafka.common.serialization.Serializer接口的类,都可以作为kafka的序列化器。
我们看Serializer接口的源码:
public interface Serializer<T> extends Closeable {
default void configure(Map<String, ?> configs, boolean isKey) {
}
byte[] serialize(String var1, T var2);
default byte[] serialize(String topic, Headers headers, T data) {
return this.serialize(topic, data);
}
default void close() {
}
}
由源码可知,最终序列化的返回值是byte[]。
我们看kafka的JAVA API给我们提供了哪些序列化实现类。
kafka提供了很多序列化器,且序列化器都对应着一个反序列化器。我们看几个实用的序列化器。
- StringSerializer
顾名思义,这是序列化字符串的序列化器。我们看StringSerializer源码中的serialize方法:
public byte[] serialize(String topic, String data) {
try {
return data == null ? null : data.getBytes(this.encoding);
} catch (UnsupportedEncodingException var4) {
throw new SerializationException("Error when serializing string to byte[] due to unsupported encoding " + this.encoding);
}
}
我们可以看到,其就是调用了String的getBytes方法,返回成byte[]类型。对于简单的字符串而言,我们无需多说,对于对象而言,我们可以将其转化成json字符串,然后用StringSerializer序列化器发送到kafka服务端去。
- UUIDSerializer
我们看UUID序列化器的serialize方法:
public byte[] serialize(String topic, UUID data) {
try {
return data == null ? null : data.toString().getBytes(this.encoding);
} catch (UnsupportedEncodingException var4) {
throw new SerializationException("Error when serializing UUID to byte[] due to unsupported encoding " + this.encoding);
}
}
可以看到,我们直接传入UUID对象即可。该序列化器用于发送业务数据的唯一id时使用。
- JsonSerializer
json序列化器,序列化jackson形式的json对象。我们看其源码:
public byte[] serialize(String topic, JsonNode data) {
if (data == null) {
return null;
} else {
try {
return this.objectMapper.writeValueAsBytes(data);
} catch (Exception var4) {
throw new SerializationException("Error serializing JSON message", var4);
}
}
}
可以看到,需要传入一个JsonNode对象,其是jackson工具的一个类。jackson是一个json处理工具,这里我们不进行讲解。
此外,kafka还提供了byte,short,double,long等基本类型的序列化器,供我们使用。
自定义序列化器:
我们可以自己定义序列化器,制定序列化规则,只要实现了Serializer接口即可。同时我们还需要制定反序列化器,按照我们事先约定好的规则,进行反序列化。一般情况下,示例如下:
我们先定义一个自定义类,将其进行序列化:
public class DemoUser {
private int id;
private String name;
public DemoUser(int id) {
this.id = id;
}
public DemoUser(int id, String name) {
this.id = id;
this.name = name;
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
@Override
public String toString() {
return "DemoUser{" +
"id=" + id +
", name='" + name + '\\'' +
'}';
}
}
然后我们制定该类的序列化器:
public class SelfSerializer implements Serializer<DemoUser> {
public void configure(Map<String, ?> configs, boolean isKey) {
//do nothing
}
/**
* 序列化器最终就是返回byte数组,里面的规则,我们可以自己制定,然后反序列化的时候,按照设定的规则去反就行
* @param topic
* @param data
* @return
*/
public byte[] serialize(String topic, DemoUser data) {
try {
byte[] name;
int nameSize;
if(data==null){
return null;
}
if(data.getName()!=null){
name = data.getName().getBytes("UTF-8");
//字符串的长度
nameSize = data.getName().length();
}else{
name = new byte[0];
nameSize = 0;
}
/*id的长度4个字节,字符串的长度描述4个字节,
字符串本身的长度nameSize个字节*/
ByteBuffer buffer = ByteBuffer.allocate(4+4+nameSize);
buffer.putInt(data.getId());//4
buffer.putInt(nameSize);//4
buffer.put(name);//nameSize
return buffer.array();
} catch (Exception e) {
throw new SerializationException("Error serialize DemoUser:"+e);
}
}
public void close() {
//do nothing
}
}
然后我们需要定义反序列化器:
public class SelfDeserializer implements Deserializer<DemoUser> {
public void configure(Map<String, ?> configs, boolean isKey) {
//do nothing
}
public DemoUser deserialize(String topic, byte[] data) {
try {
if(data==null){
return null;
}
if(data.length<8){
throw new SerializationException("Error data size.");
}
ByteBuffer buffer = ByteBuffer.wrap(data);
int id;
String name;
int nameSize;
id = buffer.getInt();
nameSize = buffer.getInt();
byte[] nameByte = new byte[nameSize];
buffer.get(nameByte);
name = new String(nameByte,"UTF-8");
return new DemoUser(id,name);
} catch (Exception e) {
throw new SerializationException("Error Deserializer DemoUser."+e);
}
}
public void close() {
//do nothing
}
}
这样,就实现了我们对自定义类的自定义序列化需求。但是kafka本身为我们提供的序列化器绝大多数情况下都能满足我们的需求,我们尽量避免自定义序列化器。因为kafka提供的序列化器肯定要比我们自定义的安全。
四、消息的key与分区的映射关系
在上面的代码中我们可以看到,在创建ProducerRecord对象时,有3个参数:
record = new ProducerRecord<String,String>("Hello World", null,"lison");
其中第一个参数是主题,第二个参数是消息的key值,第三个参数的消息的value值。在前面我们介绍过,kafka的主题是由一个或多个分区构成的。默认情况下,kafka均衡的将消息分散在各个分区中。其应用的也是最少使用原则。哪个分区的消息少,就往哪个分区中发送消息。当我们给消息设置key值时,也就是第二个参数,那么同一个key值得消息,会发送给同一个分区,前提条件是分区固定。一旦分区数量发生了改变,就无法保证同一个key的消息都在同一个分区了。所以,在创建主题时,要规划好分区数量,我们在消费者会讲到分区再均衡,它也跟规划分区有关,总之,分区事先规划好很重要。
自定义分区器:
kafka客户端提供了Partitioner接口,该接口是分区器的规范。上面我们提到kafka默认情况是均衡的将消息发送到分区上,这种机制其实就是Partitioner的实现类去实现的。我们看一下kafka提供的默认分区器的源码:
public class DefaultPartitioner implements Partitioner {
private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap();
public DefaultPartitioner() {
}
public void configure(Map<String, ?> configs) {
}
//这里定义分区规则
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
//列出主题的所有分区
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
//如果key值为空,则进行均匀散列
if (keyBytes == null) {
int nextValue = this.nextValue(topic);
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
if (availablePartitions.size() > 0) {
int part = Utils.toPositive(nextValue) % availablePartitions.size();
return ((PartitionInfo)availablePartitions.get(part)).partition();
} else {
return Utils.toPositive(nextValue) % numPartitions;
}
} else {//如果key值不为空,则按固定算法计算分区。所以,key值相同的消息进入了同要给分区
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
}
private int nextValue(String topic) {
AtomicInteger counter = (AtomicInteger)this.topicCounterMap.get(topic);
if (null == counter) {
counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
AtomicInteger currentCounter = (AtomicInteger)this.topicCounterMap.putIfAbsent(topic, counter);
if (currentCounter != null) {
counter = currentCounter;
}
}
return counter.getAndIncrement();
}
public void close() {
}
}
我们也可以自定义分区,按照我们的规则将消息发送到不同的分区,比如:指定的消息发送到指定的分区中。大家可以自行手写一个自定义分区哦。
需要注意的是,要使用自定义分区,生产者的属性里,需要设置ProducerConfig.PARTITIONER_CLASS_CONFIG属性,即"partitioner.class"属性,值为我们自定义分区器的包名路径。
五、生产者的三种发送方式
- 发送并忘记
在上面的入门代码中,我们可以看到生产者直接调用了send方法,没有接收任何的返回值,这就是发送并忘记模式。该模式把消息发送出去就不管了,不管kafka服务是否接收消息成功。对于不重要的数据而言,我们可以采用该种方式。因为kafka有重试机制,对于一次没发送成功的数据,可以重试发送,所以,消息丢失的概率会减少,对于可接受丢失数据的业务而言,可以使用该模式。 - 同步发送
send方法是有返回值的,当我们忽略send方法返回值时,就是采用发送并忘记模式。send方法的返回值是Future对象,同步发送就是获取返回值Future对象,并在后面的代码中调用Future的get方法,将线程阻塞。待消息发送成功后,线程才往下执行。代码如下:
public class KafkaFutureProducer {
private static KafkaProducer<String,String> producer = null;
public static void main(String[] args) {
public static void main (String[]args){
Properties properties = new Properties();
properties.put("bootstrap.servers", "39.100.116.73:9092");
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producer = new KafkaProducer<String, String>(properties);
try {
ProducerRecord<String, String> record;
try {
record = new ProducerRecord<String, String>(
"Hello World", null, "同步发送");
//获取send方法的返回值,RecordMetadata对象是kafka服务收到消息后返回给生产者的消息
Future<RecordMetadata> future = producer.send(record);
System.out.println("这里可以处理其他业务");
//阻塞线程,直到kafka服务返回RecordMetadata数据
RecordMetadata recordMetadata = future.get();//阻塞在这个位置
if (null != recordMetadata) {
//RecordMetadata对象中包含了消息在kafka服务中的偏移量,分区等信息
System.out.println("offset:" + recordMetadata.offset() + "-" + "partition:" + recordMetadata.partition());
}
} catch (Exception e) {
//如果发送失败,则抛出异常,进入catch语句,我们根据实际业务处理该异常
e.printStackTrace();
}
} finally {
producer.close();
}
}
}
}
以上是关于kafka基础篇——kafka生产者客户端的主要内容,如果未能解决你的问题,请参考以下文章
Kafka3.x核心速查手册二客户端使用篇-7生产者消息事务
Kafka3.x核心速查手册二客户端使用篇-7生产者消息事务