kafka的生产者的分区策略

Posted 健康平安的活着

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了kafka的生产者的分区策略相关的知识,希望对你有一定的参考价值。

一 kafka分区策略

1.1 分区的好处

分区的好处:便于将数据进行合理存储,将数据分割成一块一块的存储到不同的broker中。

提高并行度:生产者按分区为单位进行生产,消费者按分区为单位进行消费

1.2 分区的策略

1.给定了分区号,直接将数据发送到指定的分区里面去。

2.没有给定分区号,通过key的hashcode,和 topic的分区数,进行取模。

例如:key1的hash值=5, key2的hash值=6 ,topic的partition数=2,那 么key1 对应的value1写入1号分区,key2对应的value2写入0号分区。

3.既没有给定分区号,也没有给定key值,直接轮循进行分区。

Kafka采用Sticky Partition(黏性分区器),会随机选择一个分区,并尽可能一直使用该分区,待该batch已满或者已完成,Kafka再随机一个分区进行使用(和上一次的分区不同)。 例如:第一次随机选择0号分区,等0号分区当前批次满了(默认16k)或者linger.ms设置的时间到, Kafka再随机一个分区进行使用(如果还是0会继续随机)。

4.自定义分区。根据业务需要制定以分区策略。

二  具体案例分析

2.1 制定分区

将数据发往指定 partition 的情况 下,例如,将所有数据发往分区 1 中。 1.代码
 /**
    * @author liujianfu
    * @description       制定分区去生产数据
    * @date 2022/4/7 10:08
    * @param []
    * @return void
    */
    public static  void zhidingPartion() throws InterruptedException 
        // 1. 创建 kafka 生产者的配置对象
        Properties properties = new Properties();
        // 2. 给 kafka 配置对象添加配置信息
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
                "192.168.152.136:9092,192.168.152.138:9092,192.168.152.140:9092");
        // key,value 序列化(必须):key.serializer,value.serializer
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                StringSerializer.class.getName());

        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                StringSerializer.class.getName());
        // 3. 创建 kafka 生产者对象
        KafkaProducer<String, String> kafkaProducer = new
                KafkaProducer<String, String>(properties);
        // 4. 调用 send 方法,发送消息
        for (int i = 0; i < 10; i++) 
            // 添加回调
            String info="4月5号-xx"+i+" 时间为:"+ DateUtils.dateToStr(new Date(), "yyyy-MM-dd HH:mm:ss");
            kafkaProducer.send(new ProducerRecord<>("kafka-ljf",1,"test",info), new Callback() // 该方法在 Producer 收到 ack 时调用,为异步调用
                @Override
                public void onCompletion(RecordMetadata metadata,
                                         Exception exception) 
                    if (exception == null) 
                        // 没有异常,输出信息到控制台
                        System.out.println(" 主题: " +
                                metadata.topic() + "->" + "分区:" + metadata.partition());
                     else 
                        // 出现异常打印
                        exception.printStackTrace();
                    
                
            );
            // 延迟一会会看到数据发往不同分区
            Thread.sleep(2000);
        
        // 5. 关闭资源
        kafkaProducer.close();
    

2.消费端进行消费 [root@localhost kafka_2.12-2.1.0]# bin/kafka-console-consumer.sh --from-beginning --topic kafka-ljf  --bootstrap-server  192.168.152.136:9092,192.168.152.138:9092,192.168.152.140:9092
 

3.console控制台打印日志

 2.2 key值取模

没有指明 partition 值但有 key 的情况下 ,将 key hash 值与 topic partition 数进行取 余得到 partition 值。 1.代码
 /**
    * @author liujianfu
    * @description       分区取模
    * @date 2022/4/7 10:18
    * @param []
    * @return void
    */
    public static void qumoPartition() throws InterruptedException 
        // 1. 创建 kafka 生产者的配置对象
        Properties properties = new Properties();
        // 2. 给 kafka 配置对象添加配置信息
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
                "192.168.152.136:9092,192.168.152.138:9092,192.168.152.140:9092");
        // key,value 序列化(必须):key.serializer,value.serializer
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                StringSerializer.class.getName());

        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                StringSerializer.class.getName());
        // 3. 创建 kafka 生产者对象
        KafkaProducer<String, String> kafkaProducer = new
                KafkaProducer<String, String>(properties);
        // 4. 调用 send 方法,发送消息
        for (int i = 0; i < 10; i++) 
            // 添加回调
            String info="4月5号-qumo"+i+" 时间为:"+ DateUtils.dateToStr(new Date(), "yyyy-MM-dd HH:mm:ss");
            int partition=0;
            if(i%2==0)
                partition=1;
            
            kafkaProducer.send(new ProducerRecord<>("kafka-ljf",partition,"test",info), new Callback() // 该方法在 Producer 收到 ack 时调用,为异步调用
                @Override
                public void onCompletion(RecordMetadata metadata,
                                         Exception exception) 
                    if (exception == null) 
                        // 没有异常,输出信息到控制台
                        System.out.println(" 主题: " +
                                metadata.topic() + "->" + "分区:" + metadata.partition());
                     else 
                        // 出现异常打印
                        exception.printStackTrace();
                    
                
            );
            // 延迟一会会看到数据发往不同分区
            Thread.sleep(2000);
        
        // 5. 关闭资源
        kafkaProducer.close();
    

2.消费端进行消费

[root@localhost kafka_2.12-2.1.0]# bin/kafka-console-consumer.sh --from-beginning --topic kafka-ljf  --bootstrap-server  192.168.152.136:9092,192.168.152.138:9092,192.168.152.140:9092

3.console消费情况:有数据发往分区0,有的数据发往分区1

 2.3  制定规则进行分区

1要求:我们实现一个分区器实现,发送过来的数据中如果包含 xinxiang,就发往 0 号分区,

不包含 atguigu ,就发往 1 号分区。 2.编写自定义分区器 1 )定义类实现 Partitioner 接口。 (2)重写 partition() 方法。
package com.ljf.spring.boot.demo.producer;

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;

import java.util.Map;

/**
 * @ClassName: MyDefinePartition
 * @Description: TODO
 * @Author: liujianfu
 * @Date: 2022/04/07 10:34:34
 * @Version: V1.0
 **/
public class MyDefinePartition implements Partitioner 
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) 
        // 获取消息
        String msgValue = value.toString();
        System.out.println("msgValue:"+msgValue);
        // 创建 partition
        int partition;
        // 判断消息是否包含 atguigu
        if (msgValue.contains("xinxiang"))
            partition = 0;
        else 
            partition = 1;
        
        // 返回分区号
        return partition;
    

    @Override
    public void close() 

    

    @Override
    public void configure(Map<String, ?> map) 

    

发送代码

  /**
    * @author liujianfu
    * @description       自定义分区策略
    * @date 2022/4/7 10:37
    * @param []
    * @return void
    */
    public static void zidingyicelvPartition() throws InterruptedException 
        // 1. 创建 kafka 生产者的配置对象
        Properties properties = new Properties();
        // 2. 给 kafka 配置对象添加配置信息
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
                "192.168.152.136:9092,192.168.152.138:9092,192.168.152.140:9092");
        // key,value 序列化(必须):key.serializer,value.serializer
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                StringSerializer.class.getName());

        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                StringSerializer.class.getName());
        // 添加自定义分区器
        properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.ljf.spring.boot.demo.producer.MyDefinePartition");
        // 3. 创建 kafka 生产者对象
        KafkaProducer<String, String> kafkaProducer = new
                KafkaProducer<String, String>(properties);
        // 4. 调用 send 方法,发送消息
        for (int i = 0; i < 5; i++) 
            // 添加回调
            String info="4月5号-xinxiang";
            if(i%2==0)
                info="4月7号";
            
            info=info+i+" 时间为:"+ DateUtils.dateToStr(new Date(), "yyyy-MM-dd HH:mm:ss");
            kafkaProducer.send(new ProducerRecord<>("kafka-ljf",info), new Callback() // 该方法在 Producer 收到 ack 时调用,为异步调用
                @Override
                public void onCompletion(RecordMetadata metadata,
                                         Exception exception) 
                    if (exception == null) 
                        // 没有异常,输出信息到控制台
                        System.out.println(" 主题: " +
                                metadata.topic() + "->" + "分区:" + metadata.partition());
                     else 
                        // 出现异常打印
                        exception.printStackTrace();
                    
                
            );
            // 延迟一会会看到数据发往不同分区
            Thread.sleep(2000);
        
        // 5. 关闭资源
        kafkaProducer.close();
    

 

3.消费端进行消费

[root@localhost kafka_2.12-2.1.0]# bin/kafka-console-consumer.sh --from-beginning --topic kafka-ljf  --bootstrap-server  192.168.152.136:9092,192.168.152.138:9092,192.168.152.140:9092

 4.console打印结果

 

以上是关于kafka的生产者的分区策略的主要内容,如果未能解决你的问题,请参考以下文章

kafka消费者分区分配策略

Kafka生成消息时的3种分区策略

Kafka生成消息时的3种分区策略

Kafka的生产者与消费者机制+分区策略你这还不懂?

大数据消息中间件之Kafka02

kafka学习笔记 & 面经分享