Kafka生产者
Posted Sierra、
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kafka生产者相关的知识,希望对你有一定的参考价值。
生产者的首要功能就是向某个topic
的某个分区发送一条消息,分区器决定了生产者要向topic
的哪个分区写入消息。Kafaka producer提供了一个默认的分区器,如果发送的消息指定了key,那么分区器会根据key的哈希值选择目标分区,假如将要发送的消息没有指定key,分区器会采用轮询策略确认目标分区,这样保证了消息在所有分区上保持均匀。
生产者发送消息的流程
Broker主要参数
Broker需要配置存储信息,即Broker需要使用哪些磁盘。关于存储的相关参数重点有以下几个。
log.dirs
指定了 Broker 需要使用的若干个文件目录路径,在线上生产环境中一定要为log.dirs
配置多个路径,具体格式是一个 CSV 格式,也就是用逗号分隔的多个路径,比如/home/kafka1,/home/kafka2,/home/kafka3
这样。如果有条件的话你最好保证这些目录挂载到不同的物理磁盘上。这样做有两个好处:
-
提升读写性能:比起单块磁盘,多块物理磁盘同时读写数据有更高的吞吐量。
-
能够实现故障转移:即 Failover。这是 Kafka 1.1 版本新引入的强大功能。要知道在以前,只要 Kafka Broker 使用的任何一块磁盘挂掉了,整个 Broker 进程都会关闭。但是自 1.1 开始,这种情况被修正了,坏掉的磁盘上的数据会自动地转移到其他正常的磁盘上,而且 Broker 还能正常工作。还记得上一期我们关于 Kafka 是否需要使用 RAID 的讨论吗?这个改进正是我们舍弃 RAID 方案的基础:没有这种 Failover 的话,我们只能依靠 RAID 来提供保障。
listeners
监听器,其实就是告诉外部连接者要通过什么协议访问指定主机名和端口开放的 Kafka 服务
从构成上来说,它是若干个逗号分隔的三元组,每个三元组的格式为<协议名称,主机名,端口号>
。这里的协议名称可能是标准的名字,比如 PLAINTEXT 表示明文传输、SSL 表示使用 SSL 或 TLS 加密传输等;也可能是你自己定义的协议名字,比如CONTROLLER: //localhost:9092
。
advertised.listeners
组监听器是 Broker 用于对外发布的,主要是为外网访问用
Topic参数
auto.create.topics.enable
是否允许自动创建 Topic,最好设置成false,即不允许自动创建 Topic
unclean.leader.election.enable
关闭 Unclean Leader 选举
auto.leader.rebalance.enable
设置它的值为 true 表示允许 Kafka 定期地对一些 Topic 分区进行 Leader 重选举,当然这个重选举不是无脑进行的,它要满足一定的条件才会发生。严格来说它与上一个参数中 Leader 选举的最大不同在于,它不是选 Leader,而是换 Leader!比如 Leader A 一直表现得很好,但若auto.leader.rebalance.enable=true
,那么有可能一段时间后 Leader A 就要被强行卸任换成 Leader B。
你要知道换一次 Leader 代价很高的,原本向 A 发送请求的所有客户端都要切换成向 B 发送请求,而且这种换 Leader 本质上没有任何性能收益,因此我建议你在生产环境中把这个参数设置成 false。
kafka生产者多种实现方式
参考技术A 本文介绍kafka生产者多种实现方式,方式1:
编写一个Produce类继承kafkaProduce:
public class Producer extends KafkaProducer<String, String>
public Producer(Properties properties)
super(properties);
编写ProduceUtils工具类:
public class ProduceUtils
// properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com.knowyou.tt.NewPartitioner");//自定义分区函数
producer = new Producer(properties);
System.out.println("loading the properities......");
编写回调函数Callback:
public class ProduceCallback implements Callback
private static final Logger log = LoggerFactory.getLogger(ProduceCallback.class);
调用ProduceUtils即可以实现生产功能。
方式2:
编写一个生产者的工厂类:
public class ProduceFactory
private static final Logger logger = Logger.getLogger(ProduceFactory.class);
3,采用spring-kafka的模式生产数据
先做一个工厂类:
public class ProduceFactory
再做一个工具类:
public class KafkaTool
private static KafkaTemplate<String, String> template = ProduceFactory.getKafkaTemplate();
以上是关于Kafka生产者的主要内容,如果未能解决你的问题,请参考以下文章