kafka基础篇——kafka生产者客户端

Posted 敲代码的小小酥

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了kafka基础篇——kafka生产者客户端相关的知识,希望对你有一定的参考价值。

前言

在第一节中我们讲到,在kafka服务端我们可以通过命令创建生产者并发送消息。但是在实际开发中,我们都是以java形式在项目中进行生产者的创建和消息的发送。本节我们基于JAVA API的基础讲解kafka生产者。

一、JAVA API调用kafka生产者入门

先上代码,看java如何创建生产者并发送消息。
首先,在maven工程的pom中引入kafka客户端jar包,如下图:
在这里插入图片描述
我们这里讲解的是2.30版本,所以jar包也选2.3版本。
然后创建生产者类,给kafka服务发送消息,代码如下:

public class HelloKafkaProducer {
    public static void main(String[] args) {
        //创建Properties集合,设定并存放kafka生产者的属性
        Properties properties = new Properties();
        //kafka服务地址,集群环境可以设置多个,用逗号隔开
        properties.put("bootstrap.servers","127.0.0.1:9092");
        //kafka接收消息只认识字节数组,定义消息的key的序列化器和value的序列化器,将其转化成字节数组
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
       //创建kafka生产者对象,并将属性传给生产者
        try (KafkaProducer<String,String> producer = new KafkaProducer<String, String>(properties);){
            //生产者的消息存放在ProducerRecord对象中
            ProducerRecord<String,String> record;
            try {
                //TODO 发送4条消息
                for(int i=0;i<4;i++){
                    //设定消息发送的主题,消息的key值和消息的内容
                    record = new ProducerRecord<String,String>("Hello World", null,"lison");
                    //发送消息
                    producer.send(record);
                    System.out.println(i+",message is sent");
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        } 
    }



}

运行代码,消息就发送到了kafka服务端,主题在kafka服务端原本没有,是新建的主题。下面我们就上面这段代码,进一步进行深入的理解和分析。

二、生产者的属性

上面的入门代码中我们可以看到,需要对生产者设置一些属性,比如kafka服务地址,序列化器等。下面我们总结一下生产者的属性都有哪些。
在kafka的JAVA API中,提供了ProducerConfig类,这个类里,定义了生产者的所有属性。我们看其中几个重点属性:

  • acks:0,1,all,-1
    消息确认机制。默认值为1。在ProducerConfig类里,有对acks机制的解释:
    在这里插入图片描述
    ACKS_DOC就是对acks属性的解释,翻译后的解释如下:
    在请求完成之前,生产者要求首领收到的确认数。这将控制发送的记录的持久性。前面的知识我们讲到,生产者是与分区首领进行交互的,所以,需要分区首领去确认数。
    我们继续往下翻译:
    允许以下设置:
  • **acks=0**如果设置为零,则生产者根本不会等待来自服务器的任何确认。记录将立即添加到套接字缓冲区并被视为已发送。在这种情况下,无法保证服务器已接收到记录,重试配置将不会生效(因为客户端通常不知道任何失败)。为每条记录返回的偏移量将始终设置为-1
  • **acks=1**这意味着首领会将记录写入本地日志,但不会等待所有追随者副本的完全确认。在这种情况下,如果领导者在确认记录之后但在追随者复制它之前立即失败,那么记录将丢失
  • acks=all这意味着领导者将等待全套同步副本确认记录。这保证了只要至少有一个同步副本保持活动状态,记录就不会丢失。这是最有力的保证。这相当于acks=-1设置。”

由上面的翻译我们知道,acks为0时,不管kafka服务的分区首领是否接收成功,发了就行。且acks为0时,kafka的重试机制也就不用了。acks为1时,确认首领成功接收了消息。至于该条消息是否被其他的副本成功复制,生产者不关心。根据我们前面集群讲到的知识可知,如果该条消息在首领中给其他副本复制时失败了,那么该条消息就不会被消费者消费了。acks为all或-1时,是分区以及其他副本都收到了消息,生产者才会认为是消息发送成功了,这是最安全的机制,但是该机制也比较影响性能。

  • batch.size
    当多个消息被发送同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。当批次内存 被填满后,批次里的所有消息会被发送出去。但是生产者不一定都会等到批次被填满才发送,半满甚至只包含一个消息的批次也有可能被发送。缺省 16384(16k) ,如果一条消息超过了批次的大小,会写不进去。

  • linger.ms
    指定了生产者在发送批次前等待更多消息加入批次的时间, 缺省0 50ms 0就是不按批次走,有就发。

  • max.request.size
    控制生产者发送请求最大大小,默认1M (这个参数和Kafka主机的message.max.bytes 参数有关系,设置成一致即可)

  • buffer.memory
    生产者内存缓冲区大小

  • retries
    消息发送失败后的重试次数,缺省 Integer.MAX_VALUE。默认情况下,生产者在每次重试之间等待 100ms,可以通过参数 retry.backoff.ms 参数来改变这个时间间隔。

  • request.timeout.ms
    客户端将等待请求的响应的最大时间 默认30秒

  • max.block.ms
    最大阻塞时间,超过则抛出异常 缺省60000ms

  • compression.type
    于压缩数据的压缩类型。默认是无压缩 ,none、gzip、snappy
    压缩的是使用时间换空间的思想,具体来说就是使用CPU的时间去换取空间或网络I/0传输量。

三、序列化器

kafka服务接收字节数组,所以在生产者中,需要制定消息的key和value的序列化器。主要实现org.apache.kafka.common.serialization.Serializer接口的类,都可以作为kafka的序列化器。
我们看Serializer接口的源码:

public interface Serializer<T> extends Closeable {
    default void configure(Map<String, ?> configs, boolean isKey) {
    }

    byte[] serialize(String var1, T var2);

    default byte[] serialize(String topic, Headers headers, T data) {
        return this.serialize(topic, data);
    }

    default void close() {
    }
}

由源码可知,最终序列化的返回值是byte[]。
我们看kafka的JAVA API给我们提供了哪些序列化实现类。
在这里插入图片描述
kafka提供了很多序列化器,且序列化器都对应着一个反序列化器。我们看几个实用的序列化器。

  • StringSerializer
    顾名思义,这是序列化字符串的序列化器。我们看StringSerializer源码中的serialize方法:
 public byte[] serialize(String topic, String data) {
        try {
            return data == null ? null : data.getBytes(this.encoding);
        } catch (UnsupportedEncodingException var4) {
            throw new SerializationException("Error when serializing string to byte[] due to unsupported encoding " + this.encoding);
        }
    }

我们可以看到,其就是调用了String的getBytes方法,返回成byte[]类型。对于简单的字符串而言,我们无需多说,对于对象而言,我们可以将其转化成json字符串,然后用StringSerializer序列化器发送到kafka服务端去。

  • UUIDSerializer
    我们看UUID序列化器的serialize方法:
public byte[] serialize(String topic, UUID data) {
        try {
            return data == null ? null : data.toString().getBytes(this.encoding);
        } catch (UnsupportedEncodingException var4) {
            throw new SerializationException("Error when serializing UUID to byte[] due to unsupported encoding " + this.encoding);
        }
    }

可以看到,我们直接传入UUID对象即可。该序列化器用于发送业务数据的唯一id时使用。

  • JsonSerializer
    json序列化器,序列化jackson形式的json对象。我们看其源码:
public byte[] serialize(String topic, JsonNode data) {
        if (data == null) {
            return null;
        } else {
            try {
                return this.objectMapper.writeValueAsBytes(data);
            } catch (Exception var4) {
                throw new SerializationException("Error serializing JSON message", var4);
            }
        }
    }

可以看到,需要传入一个JsonNode对象,其是jackson工具的一个类。jackson是一个json处理工具,这里我们不进行讲解。
此外,kafka还提供了byte,short,double,long等基本类型的序列化器,供我们使用。
自定义序列化器:
我们可以自己定义序列化器,制定序列化规则,只要实现了Serializer接口即可。同时我们还需要制定反序列化器,按照我们事先约定好的规则,进行反序列化。一般情况下,示例如下:
我们先定义一个自定义类,将其进行序列化:

public class DemoUser {
    private int id;
    private String name;

    public DemoUser(int id) {
        this.id = id;
    }

    public DemoUser(int id, String name) {
        this.id = id;
        this.name = name;
    }

    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    @Override
    public String toString() {
        return "DemoUser{" +
                "id=" + id +
                ", name='" + name + '\\'' +
                '}';
    }
}

然后我们制定该类的序列化器:

public class SelfSerializer implements Serializer<DemoUser> {
    public void configure(Map<String, ?> configs, boolean isKey) {
        //do nothing
    }

    /**
     * 序列化器最终就是返回byte数组,里面的规则,我们可以自己制定,然后反序列化的时候,按照设定的规则去反就行
     * @param topic
     * @param data
     * @return
     */
    public byte[] serialize(String topic, DemoUser data) {
        try {
            byte[] name;
            int nameSize;
            if(data==null){
                return null;
            }
            if(data.getName()!=null){
                name = data.getName().getBytes("UTF-8");
                //字符串的长度
                nameSize = data.getName().length();
            }else{
                name = new byte[0];
                nameSize = 0;
            }
            /*id的长度4个字节,字符串的长度描述4个字节,
            字符串本身的长度nameSize个字节*/
            ByteBuffer buffer = ByteBuffer.allocate(4+4+nameSize);
            buffer.putInt(data.getId());//4
            buffer.putInt(nameSize);//4
            buffer.put(name);//nameSize
            return buffer.array();
        } catch (Exception e) {
            throw new SerializationException("Error serialize DemoUser:"+e);
        }
    }

    public void close() {
        //do nothing
    }
}

然后我们需要定义反序列化器:

public class SelfDeserializer implements Deserializer<DemoUser> {


    public void configure(Map<String, ?> configs, boolean isKey) {
        //do nothing
    }

    public DemoUser deserialize(String topic, byte[] data) {
        try {
            if(data==null){
                return null;
            }
            if(data.length<8){
                throw new SerializationException("Error data size.");
            }
            ByteBuffer buffer = ByteBuffer.wrap(data);
            int id;
            String name;
            int nameSize;
            id = buffer.getInt();
            nameSize = buffer.getInt();
            byte[] nameByte = new byte[nameSize];
            buffer.get(nameByte);
            name = new String(nameByte,"UTF-8");
            return new DemoUser(id,name);
        } catch (Exception e) {
            throw new SerializationException("Error Deserializer DemoUser."+e);
        }

    }

    public void close() {
        //do nothing
    }
}

这样,就实现了我们对自定义类的自定义序列化需求。但是kafka本身为我们提供的序列化器绝大多数情况下都能满足我们的需求,我们尽量避免自定义序列化器。因为kafka提供的序列化器肯定要比我们自定义的安全。

四、消息的key与分区的映射关系

在上面的代码中我们可以看到,在创建ProducerRecord对象时,有3个参数:

 record = new ProducerRecord<String,String>("Hello World", null,"lison");

其中第一个参数是主题,第二个参数是消息的key值,第三个参数的消息的value值。在前面我们介绍过,kafka的主题是由一个或多个分区构成的。默认情况下,kafka均衡的将消息分散在各个分区中。其应用的也是最少使用原则。哪个分区的消息少,就往哪个分区中发送消息。当我们给消息设置key值时,也就是第二个参数,那么同一个key值得消息,会发送给同一个分区,前提条件是分区固定。一旦分区数量发生了改变,就无法保证同一个key的消息都在同一个分区了。所以,在创建主题时,要规划好分区数量,我们在消费者会讲到分区再均衡,它也跟规划分区有关,总之,分区事先规划好很重要。
自定义分区器:
kafka客户端提供了Partitioner接口,该接口是分区器的规范。上面我们提到kafka默认情况是均衡的将消息发送到分区上,这种机制其实就是Partitioner的实现类去实现的。我们看一下kafka提供的默认分区器的源码:

public class DefaultPartitioner implements Partitioner {
    private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap();

    public DefaultPartitioner() {
    }

    public void configure(Map<String, ?> configs) {
    }
//这里定义分区规则
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
       //列出主题的所有分区
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        //如果key值为空,则进行均匀散列
        if (keyBytes == null) {
            int nextValue = this.nextValue(topic);
            List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
            if (availablePartitions.size() > 0) {
                int part = Utils.toPositive(nextValue) % availablePartitions.size();
                return ((PartitionInfo)availablePartitions.get(part)).partition();
            } else {
                return Utils.toPositive(nextValue) % numPartitions;
            }
        } else {//如果key值不为空,则按固定算法计算分区。所以,key值相同的消息进入了同要给分区
            return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
        }
    }

    private int nextValue(String topic) {
        AtomicInteger counter = (AtomicInteger)this.topicCounterMap.get(topic);
        if (null == counter) {
            counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
            AtomicInteger currentCounter = (AtomicInteger)this.topicCounterMap.putIfAbsent(topic, counter);
            if (currentCounter != null) {
                counter = currentCounter;
            }
        }

        return counter.getAndIncrement();
    }

    public void close() {
    }
}

我们也可以自定义分区,按照我们的规则将消息发送到不同的分区,比如:指定的消息发送到指定的分区中。大家可以自行手写一个自定义分区哦。
需要注意的是,要使用自定义分区,生产者的属性里,需要设置ProducerConfig.PARTITIONER_CLASS_CONFIG属性,即"partitioner.class"属性,值为我们自定义分区器的包名路径。

五、生产者的三种发送方式

  • 发送并忘记
    在上面的入门代码中,我们可以看到生产者直接调用了send方法,没有接收任何的返回值,这就是发送并忘记模式。该模式把消息发送出去就不管了,不管kafka服务是否接收消息成功。对于不重要的数据而言,我们可以采用该种方式。因为kafka有重试机制,对于一次没发送成功的数据,可以重试发送,所以,消息丢失的概率会减少,对于可接受丢失数据的业务而言,可以使用该模式。
  • 同步发送
    send方法是有返回值的,当我们忽略send方法返回值时,就是采用发送并忘记模式。send方法的返回值是Future对象,同步发送就是获取返回值Future对象,并在后面的代码中调用Future的get方法,将线程阻塞。待消息发送成功后,线程才往下执行。代码如下:
public class KafkaFutureProducer {

    private static KafkaProducer<String,String> producer = null;

    public static void main(String[] args) {
        public static void main (String[]args){
            Properties properties = new Properties();
            properties.put("bootstrap.servers", "39.100.116.73:9092");
            properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            producer = new KafkaProducer<String, String>(properties);
            try {
                ProducerRecord<String, String> record;
                try {
                    record = new ProducerRecord<String, String>(
                            "Hello World", null, "同步发送");
                    //获取send方法的返回值,RecordMetadata对象是kafka服务收到消息后返回给生产者的消息
                    Future<RecordMetadata> future = producer.send(record);
                    System.out.println("这里可以处理其他业务");
                    //阻塞线程,直到kafka服务返回RecordMetadata数据
                    RecordMetadata recordMetadata = future.get();//阻塞在这个位置
                    if (null != recordMetadata) {
                        //RecordMetadata对象中包含了消息在kafka服务中的偏移量,分区等信息
                        System.out.println("offset:" + recordMetadata.offset() + "-" + "partition:" + recordMetadata.partition());
                    }

                } catch (Exception e) {
                 //如果发送失败,则抛出异常,进入catch语句,我们根据实际业务处理该异常
                    e.printStackTrace();
                }

            } finally {
                producer.close();
            }
        }


    }
}

以上是关于kafka基础篇——kafka生产者客户端的主要内容,如果未能解决你的问题,请参考以下文章

kafka基础篇——kafka总结

Kafka3.x核心速查手册二客户端使用篇-7生产者消息事务

Kafka3.x核心速查手册二客户端使用篇-7生产者消息事务

Kafka3.x核心速查手册二客户端使用篇-7生产者消息事务

kafka基础篇——kafka消费者客户端

Kafka原理分析之基础篇