kafka2.5.0自定义分区器
Posted zhuwenjoyce
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了kafka2.5.0自定义分区器相关的知识,希望对你有一定的参考价值。
自定义分区器:
import org.apache.kafka.clients.producer.Partitioner; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.PartitionInfo; import java.util.List; import java.util.Map; /** * @author King老师 */ public class SelfPartitioner implements Partitioner { public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { //拿到 List<PartitionInfo> partitionInfos = cluster.partitionsForTopic(topic); //TODO 分区数 int num = partitionInfos.size(); //TODO 根据value与分区数求余的方式得到分区ID int parId = ((String)value).hashCode()%num; return parId; } public void close() { //do nothing } public void configure(Map<String, ?> configs) { //do nothing } }
配置到kafka:
ProducerConfig.PARTITIONER_CLASS_CONFIG, "cn.enjoyedu.selfpartition.SelfPartitioner"
更多配置参考:《kafka2.5.0生产者与消费者配置详解》
end.
以上是关于kafka2.5.0自定义分区器的主要内容,如果未能解决你的问题,请参考以下文章