kafka之消息生成者基本知识

Posted 长安不及十里

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了kafka之消息生成者基本知识相关的知识,希望对你有一定的参考价值。

生产者

一 消息提供者开发

1.1 过程

1.2 代码实现

/**
 * @Author shu
 * @Date: 2021/10/22/ 16:25
 * @Description 生成者
 **/
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class MySimpleProducer {
    private final static String TOPIC_NAME = "my-replicated-topic";
    //分组
    private final static String CONSUMER_GROUP_NAME = "testGroup";

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        //1.设置参数
        Properties props = new Properties();
        //主机
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "ip:9093");
        //分组
        props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP_NAME);
        //把发送的key从字符串序列化为字节数组
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        //把发送消息value从字符串序列化为字节数组
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        //ack默认
        props.put(ProducerConfig.ACKS_CONFIG,"1");
        //缓存区默认大小
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432);
        //拉取数据默认大小
        props.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);
        //如果数据未满16k,也提交
        props.put(ProducerConfig.LINGER_MS_CONFIG,10);
        //2.创建⽣产消息的客户端,传⼊参数
        Producer<String,String> producer = new KafkaProducer<String, String>(props);
        //3.创建消息
        //key:作⽤是决定了往哪个分区上发,value:具体要发送的消息内容
        ProducerRecord<String,String> producerRecord = new ProducerRecord<>(TOPIC_NAME,"message","四川");
        //4.发送消息,得到消息发送的元数据并输出
        RecordMetadata metadata = producer.send(producerRecord).get();
        System.out.println( "topic-" + metadata.topic() + "|partition-" + metadata.partition() + "|offset-" + metadata.offset());
    }
}

1.3 重点配置参数

  • bootstrap.servers:该参数用来指定生产者客户端连接Kafka集群所需的broker地址清单,具体的内容格式为host1:port1,host2:port2,可以设置一个或多个地址,中间以逗号隔开。

  • key.serializer 和 value.serializer:broker 端接收的消息必须以字节数组(byte[])的形式存在,key.serializervalue.serializer这两个参数分别用来指定key和value序列化操作的序列化器,这两个参数无默认值。注意这里必须填写序列化器的全限定名。

  • ack:

    • 0 意味着producer不等待broker同步完成的确认,继续发送下一条(批)信息。
    • 1意味着producer要等待leader成功收到数据并得到确认,才发送下一条message。
    • -1意味着producer得到follwer确认,才发送下一条数据。
    • 综合性能与效率来看,kafka默认ack为1
  • buffer-memory:Kafka的客户端发送数据到服务器,不是来一条就发一条,而是经过缓冲的,也就是说,通过KafkaProducer发送出去的消息都是先进入到客户端本地的内存缓冲里,然后把很多消息收集成一个一个的Batch,再发送到Broker上去的,这样性能才可能高。默认32M。

  • batch-size:kafka一次拉取大小,默认16k

  • retries:重试次数

# lead机器
spring.kafka.bootstrap-servers=ip:9093
#########producer############
# ack
spring.kafka.producer.acks=1
# 拉取大小
spring.kafka.producer.batch-size=16384
# 重试次数
spring.kafka.producer.retries=10
# 缓冲区大小
spring.kafka.producer.buffer-memory=33554432
# 序列化
spring.kafka.producer.key-serializer= org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

1.4 消息的发送

在创建完生产者实例之后,接下来的工作就是构建消息,即创建ProducerRecord对象。

		 //3.创建消息
        //key:作⽤是决定了往哪个分区上发,value:具体要发送的消息内容
        ProducerRecord<String,String> producerRecord = new ProducerRecord<>(TOPIC_NAME,"message","四川");
  • ProducerRecord方法
	//成员变量
	private final String topic;//主题
    private final Integer partition;//分区
    private final Headers headers;//header
    private final K key;
    private final V value;
    private final Long timestamp;//时间

//构造器
 public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers) {
        if (topic == null)
            throw new IllegalArgumentException("Topic cannot be null.");
        if (timestamp != null && timestamp < 0)
            throw new IllegalArgumentException(
                    String.format("Invalid timestamp: %d. Timestamp should always be non-negative or null.", timestamp));
        if (partition != null && partition < 0)
            throw new IllegalArgumentException(
                    String.format("Invalid partition: %d. Partition number should always be non-negative or null.", partition));
        this.topic = topic;
        this.partition = partition;
        this.key = key;
        this.value = value;
        this.timestamp = timestamp;
        this.headers = new RecordHeaders(headers);
    }

  public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value) {
        this(topic, partition, timestamp, key, value, null);
    }

 public ProducerRecord(String topic, Integer partition, K key, V value, Iterable<Header> headers) {
        this(topic, partition, null, key, value, headers);
    }

 public ProducerRecord(String topic, Integer partition, K key, V value) {
        this(topic, partition, null, key, value, null);
    }

 public ProducerRecord(String topic, K key, V value) {
        this(topic, null, null, key, value, null);
    }
    
 public ProducerRecord(String topic, V value) {
        this(topic, null, null, null, value, null);
    }
  • KafkaProducer方法
//构造器
public KafkaProducer(final Map<String, Object> configs) {
        this(configs, null, null, null, null, null, Time.SYSTEM);
    }
    
public KafkaProducer(Map<String, Object> configs, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
        this(configs, keySerializer, valueSerializer, null, null, null, Time.SYSTEM);
    }

public KafkaProducer(Properties properties) {
        this(propsToMap(properties), null, null, null, null, null, Time.SYSTEM);
    }


public KafkaProducer(Properties properties, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
        this(propsToMap(properties), keySerializer, valueSerializer, null, null, null,
                Time.SYSTEM);
    }

//发送方法

@Override
    public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
        return send(record, null);
    }


 @Override
    public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
        // intercept the record, which can be potentially modified; this method does not throw exceptions
        ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);
        return doSend(interceptedRecord, callback);
    }

执行完send()方法之后直接调用get()方法,这样可以获取一个RecordMetadata对象,在RecordMetadata对象里包含了消息的一些元数据信息,比如当前消息的主题、分区号、分区中的偏移量(offset)、时间戳等。如果在应用代码中需要这些信息。

 //4.发送消息,得到消息发送的元数据并输出
        RecordMetadata metadata = producer.send(producerRecord).get();
        System.out.println( "topic-" + metadata.topic() + "|partition-" + metadata.partition() + "|offset-" + metadata.offset())

二 原理解析

消息在通过send()方法发往broker的过程中,有可能需要经过拦截器(Interceptor)序列化器(Serializer)分区器(Partitioner)的一系列作用之后才能被真正地发往 broker

2.1基本知识

//成员变量
private final String clientId;
// Visible for testing
final Metrics metrics;
//分区器
private final Partitioner partitioner;
private final int maxRequestSize;
private final long totalMemorySize;
private final ProducerMetadata metadata;
//累加器
private final RecordAccumulator accumulator;
//Sender线程
private final Sender sender;
private final Thread ioThread;
private final CompressionType compressionType;
private final Sensor errors;
private final Time time;
// 序列化器
private final Serializer<K> keySerializer;
private final Serializer<V> valueSerializer;
//配置文件
private final ProducerConfig producerConfig;
private final long maxBlockTimeMs;
//拦截器
private final ProducerInterceptors<K, V> interceptors;
private final ApiVersions apiVersions;
//事务管理器
private final TransactionManager transactionManager;
  • 整个生产者客户端由两个线程协调运行,这两个线程分别为主线程和Sender线程(发送线程)

  • 在主线程中由KafkaProducer创建消息,然后通过可能的拦截器序列化器分区器的作用之后缓存到消息累加器(RecordAccumulator,也称为消息收集器)中。Sender 线程负责从RecordAccumulator中获取消息并将其发送到Kafka中。

  • RecordAccumulator主要用来缓存消息以便 Sender 线程可以批量发送,进而减少网络传输的资源消耗以提升性能。

  • RecordAccumulator 缓存的大小可以通过生产者客户端参数buffer.memory配置,默认值为 33554432B,即 32MB

  • 如果生产者发送消息的速度超过发送到服务器的速度,则会导致生产者空间不足,这个时候KafkaProducer的send()方法调用要么被阻塞,要么抛出异常,这个取决于参数max.block.ms的配置,此参数的默认值为60000,即60秒。

  • 消息在网络上都是以字节(Byte)的形式传输的,在发送之前需要创建一块内存区域来保存对应的消息。在Kafka生产者客户端中,通过java.io.ByteBuffer实现消息内存的创建和释放。

  • 不过频繁的创建和释放是比较耗费资源的,在RecordAccumulator的内部还有一个BufferPool,它主要用来实现ByteBuffer的复用,以实现缓存的高效利用。

  • ProducerBatch的大小和batch.size参数也有着密切的关系。

  • 在新建ProducerBatch时评估这条消息的大小是否超过batch.size参数的大小,如果不超过,那么就以 batch.size 参数的大小来创建ProducerBatch

  • Sender线程从RecordAccumulator中获取缓存的消息之后,会进一步将原本<分区,Deque<ProducerBatch>>的保存形式转变成<Node,List<ProducerBatch>的形式,其中Node表示Kafka集群的broker节点。

  • 在转换成<Node,List<ProducerBatch>>的形式之后,Sender还会进一步封装成<Node,Request>的形式,这样就可以将Request请求发往各个Node了,这里的Request是指Kafka的各种协议请求,对于消息发送而言就是指具体的ProduceRequest

  • 请求在从Sender线程发往Kafka之前还会保存到InFlightRequests中,InFlightRequests保存对象的具体形式为 Map<NodeId,Deque<Request>>它的主要作用是缓存了已经发出去但还没有收到响应的请求(NodeId是一个String 类型,表示节点的 id 编号),可以限制连接大小。

  • InFlightRequests还可以获得leastLoadedNode,即所有Node中负载最小的那一个。这里的负载最小是通过每个Node在InFlightRequests中还未确认的请求决定的,未确认的请求越多则认为负载越大。优先发送负载最小的,避免因网络拥塞等异常而影响整体的进度。

2.2 拦截器

2.2.1 基本结构

  • 拦截器(Interceptor)是早在Kafka 0.10.0.0中就已经引入的一个功能,Kafka一共有两种拦截器:生产者拦截器和消费者拦截器
  • 生产者拦截器既可以用来在消息发送前做一些准备工作,比如按照某个规则过滤不符合要求的消息、修改消息的内容等,也可以用来在发送回调逻辑前做一些定制化的需求,比如统计类工作。
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);
return doSend(interceptedRecord, callback);
    }

  • configure(Map<String, ?> configs)
    该方法在初始化数据的时候被调用,用于获取生产者的配置信息
  • onSend(ProducerRecord<K, V>)
    该方法在消息被序列化之前调用,并传入要发送的消息记录。用户可以在该方法中对消息记录进行任意的修改,包括消息的key和value以及要发送的主题和分区等。
/**
当客户端将记录发送到 KafkaProducer 时,在键和值被序列化之前调用。 该方法调用ProducerInterceptor.onSend(ProducerRecord)方法。 从第一个拦截器的 onSend() 返回的 ProducerRecord 传递给第二个拦截器 onSend(),在拦截器链中依此类推。 从最后一个拦截器返回的记录就是从这个方法返回的。 此方法不会抛出异常。 任何拦截器方法抛出的异常都会被捕获并忽略。 如果链中间的拦截器(通常会修改记录)抛出异常,则链中的下一个拦截器将使用前一个未抛出异常的拦截器返回的记录调用。 
**/
public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record) {
        ProducerRecord<K, V> interceptRecord = record;//集合
        for (ProducerInterceptor<K, V> interceptor : this.interceptors) {
            try {
                interceptRecord = interceptor.onSend(interceptRecord);
            } catch (Exception e) {
                // do not propagate interceptor exception, log and continue calling other interceptors
                // be careful not to throw exception from here
                if (record != null)
                    log.warn("Error executing interceptor onSend callback for topic: {}, partition: {}", record.topic(), record.partition(), e);
                else
                    log.warn("Error executing interceptor onSend callback", e);
            }
        }
        return interceptRecord;
    }
  • onAcknowledgement(RecordMetadata metadata, Exception exception)
    该方法在发送到服务器的记录已被确认或者记录发送失败时调用(在生产者回调逻辑触发之前),可以在metadata对象中获取消息的主题、分区和偏移量等信息,在exception对象中获取消息的异常信息。

    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
        int partition = metadata.partition();
        String topic = metadata.topic();
        long offset = metadata.offset();
        String str = metadata.toString();
        long timestamp = metadata.timestamp();
    }
    
  • close()该方法用于关闭拦截器并释放资源。当生产者关闭时将调用该方法。

2.2.2 自定义拦截器

package com.Interceptor;

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Map;

/**
 * @Author shu
 * @Date: 2021/10/30/ 11:40
 * @Description 时间拦截器,发送消息之前,在消息内容前面加入时间戳
 **/
public class TimeInterceptor implements ProducerInterceptor<String, String> {

    /**
     * 获取生产者配置信息
     */
    @Override
    public void configure(Map<String, ?> configs) {
        System.out.println(configs.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
    }

    以上是关于kafka之消息生成者基本知识的主要内容,如果未能解决你的问题,请参考以下文章

1.kafka的基本知识

1.kafka的基本知识

Kafka-文件管理

Kafka和RabbitMQ比较之基础知识部分

MQ之Kafka

Kafka--05---java客户端代码实现