kafka的生产者的分区策略
Posted 健康平安的活着
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了kafka的生产者的分区策略相关的知识,希望对你有一定的参考价值。
一 kafka分区策略
1.1 分区的好处
分区的好处:便于将数据进行合理存储,将数据分割成一块一块的存储到不同的broker中。
提高并行度:生产者按分区为单位进行生产,消费者按分区为单位进行消费。
1.2 分区的策略
1.给定了分区号,直接将数据发送到指定的分区里面去。
2.没有给定分区号,通过key的hashcode,和 topic的分区数,进行取模。
例如:key1的hash值=5, key2的hash值=6 ,topic的partition数=2,那 么key1 对应的value1写入1号分区,key2对应的value2写入0号分区。
3.既没有给定分区号,也没有给定key值,直接轮循进行分区。
Kafka采用Sticky Partition(黏性分区器),会随机选择一个分区,并尽可能一直使用该分区,待该batch已满或者已完成,Kafka再随机一个分区进行使用(和上一次的分区不同)。 例如:第一次随机选择0号分区,等0号分区当前批次满了(默认16k)或者linger.ms设置的时间到, Kafka再随机一个分区进行使用(如果还是0会继续随机)。
4.自定义分区。根据业务需要制定以分区策略。
二 具体案例分析
2.1 制定分区
将数据发往指定 partition 的情况 下,例如,将所有数据发往分区 1 中。 1.代码 /**
* @author liujianfu
* @description 制定分区去生产数据
* @date 2022/4/7 10:08
* @param []
* @return void
*/
public static void zhidingPartion() throws InterruptedException
// 1. 创建 kafka 生产者的配置对象
Properties properties = new Properties();
// 2. 给 kafka 配置对象添加配置信息
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"192.168.152.136:9092,192.168.152.138:9092,192.168.152.140:9092");
// key,value 序列化(必须):key.serializer,value.serializer
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
// 3. 创建 kafka 生产者对象
KafkaProducer<String, String> kafkaProducer = new
KafkaProducer<String, String>(properties);
// 4. 调用 send 方法,发送消息
for (int i = 0; i < 10; i++)
// 添加回调
String info="4月5号-xx"+i+" 时间为:"+ DateUtils.dateToStr(new Date(), "yyyy-MM-dd HH:mm:ss");
kafkaProducer.send(new ProducerRecord<>("kafka-ljf",1,"test",info), new Callback() // 该方法在 Producer 收到 ack 时调用,为异步调用
@Override
public void onCompletion(RecordMetadata metadata,
Exception exception)
if (exception == null)
// 没有异常,输出信息到控制台
System.out.println(" 主题: " +
metadata.topic() + "->" + "分区:" + metadata.partition());
else
// 出现异常打印
exception.printStackTrace();
);
// 延迟一会会看到数据发往不同分区
Thread.sleep(2000);
// 5. 关闭资源
kafkaProducer.close();
2.消费端进行消费
[root@localhost kafka_2.12-2.1.0]# bin/kafka-console-consumer.sh --from-beginning --topic kafka-ljf --bootstrap-server 192.168.152.136:9092,192.168.152.138:9092,192.168.152.140:9092
3.console控制台打印日志
2.2 key值取模
没有指明 partition 值但有 key 的情况下 ,将 key 的 hash 值与 topic 的 partition 数进行取 余得到 partition 值。 1.代码 /**
* @author liujianfu
* @description 分区取模
* @date 2022/4/7 10:18
* @param []
* @return void
*/
public static void qumoPartition() throws InterruptedException
// 1. 创建 kafka 生产者的配置对象
Properties properties = new Properties();
// 2. 给 kafka 配置对象添加配置信息
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"192.168.152.136:9092,192.168.152.138:9092,192.168.152.140:9092");
// key,value 序列化(必须):key.serializer,value.serializer
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
// 3. 创建 kafka 生产者对象
KafkaProducer<String, String> kafkaProducer = new
KafkaProducer<String, String>(properties);
// 4. 调用 send 方法,发送消息
for (int i = 0; i < 10; i++)
// 添加回调
String info="4月5号-qumo"+i+" 时间为:"+ DateUtils.dateToStr(new Date(), "yyyy-MM-dd HH:mm:ss");
int partition=0;
if(i%2==0)
partition=1;
kafkaProducer.send(new ProducerRecord<>("kafka-ljf",partition,"test",info), new Callback() // 该方法在 Producer 收到 ack 时调用,为异步调用
@Override
public void onCompletion(RecordMetadata metadata,
Exception exception)
if (exception == null)
// 没有异常,输出信息到控制台
System.out.println(" 主题: " +
metadata.topic() + "->" + "分区:" + metadata.partition());
else
// 出现异常打印
exception.printStackTrace();
);
// 延迟一会会看到数据发往不同分区
Thread.sleep(2000);
// 5. 关闭资源
kafkaProducer.close();
2.消费端进行消费
[root@localhost kafka_2.12-2.1.0]# bin/kafka-console-consumer.sh --from-beginning --topic kafka-ljf --bootstrap-server 192.168.152.136:9092,192.168.152.138:9092,192.168.152.140:9092
3.console消费情况:有数据发往分区0,有的数据发往分区1
2.3 制定规则进行分区
1要求:我们实现一个分区器实现,发送过来的数据中如果包含 xinxiang,就发往 0 号分区,
不包含 atguigu ,就发往 1 号分区。 2.编写自定义分区器 ( 1 )定义类实现 Partitioner 接口。 (2)重写 partition() 方法。package com.ljf.spring.boot.demo.producer;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import java.util.Map;
/**
* @ClassName: MyDefinePartition
* @Description: TODO
* @Author: liujianfu
* @Date: 2022/04/07 10:34:34
* @Version: V1.0
**/
public class MyDefinePartition implements Partitioner
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster)
// 获取消息
String msgValue = value.toString();
System.out.println("msgValue:"+msgValue);
// 创建 partition
int partition;
// 判断消息是否包含 atguigu
if (msgValue.contains("xinxiang"))
partition = 0;
else
partition = 1;
// 返回分区号
return partition;
@Override
public void close()
@Override
public void configure(Map<String, ?> map)
发送代码
/**
* @author liujianfu
* @description 自定义分区策略
* @date 2022/4/7 10:37
* @param []
* @return void
*/
public static void zidingyicelvPartition() throws InterruptedException
// 1. 创建 kafka 生产者的配置对象
Properties properties = new Properties();
// 2. 给 kafka 配置对象添加配置信息
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"192.168.152.136:9092,192.168.152.138:9092,192.168.152.140:9092");
// key,value 序列化(必须):key.serializer,value.serializer
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
// 添加自定义分区器
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.ljf.spring.boot.demo.producer.MyDefinePartition");
// 3. 创建 kafka 生产者对象
KafkaProducer<String, String> kafkaProducer = new
KafkaProducer<String, String>(properties);
// 4. 调用 send 方法,发送消息
for (int i = 0; i < 5; i++)
// 添加回调
String info="4月5号-xinxiang";
if(i%2==0)
info="4月7号";
info=info+i+" 时间为:"+ DateUtils.dateToStr(new Date(), "yyyy-MM-dd HH:mm:ss");
kafkaProducer.send(new ProducerRecord<>("kafka-ljf",info), new Callback() // 该方法在 Producer 收到 ack 时调用,为异步调用
@Override
public void onCompletion(RecordMetadata metadata,
Exception exception)
if (exception == null)
// 没有异常,输出信息到控制台
System.out.println(" 主题: " +
metadata.topic() + "->" + "分区:" + metadata.partition());
else
// 出现异常打印
exception.printStackTrace();
);
// 延迟一会会看到数据发往不同分区
Thread.sleep(2000);
// 5. 关闭资源
kafkaProducer.close();
3.消费端进行消费
[root@localhost kafka_2.12-2.1.0]# bin/kafka-console-consumer.sh --from-beginning --topic kafka-ljf --bootstrap-server 192.168.152.136:9092,192.168.152.138:9092,192.168.152.140:9092
4.console打印结果
以上是关于kafka的生产者的分区策略的主要内容,如果未能解决你的问题,请参考以下文章