Kafka生产者

Posted Sierra、

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kafka生产者相关的知识,希望对你有一定的参考价值。

Kafka生产者

生产者的首要功能就是向某个topic的某个分区发送一条消息,分区器决定了生产者要向topic的哪个分区写入消息。Kafaka producer提供了一个默认的分区器,如果发送的消息指定了key,那么分区器会根据key的哈希值选择目标分区,假如将要发送的消息没有指定key,分区器会采用轮询策略确认目标分区,这样保证了消息在所有分区上保持均匀。

生产者发送消息的流程

首先需要创建一个ProduccerRecord对象,该对象包含了目标主题和要发送的内容,发送ProducerRecord对象时生产者需要把键和值对象序列化成字节数组才能在网络上传输。接着数据传给分区器,分区器根据分区策略将消息发送到主题和分区。然后这条消息被添加到一个记录批次里,这个批次里的所有消息都会被发送到相同的主题和分区上,并且有一个独立的线程负责把这些记录批次发送到相应的broker。服务器收到消息后会返回响应,如果消息成功写入Kafka,就返回一个RecordMetaData对象,它包含了主题和分区信息,并且记录了消息在分区内的偏移量。如果写入失败,则返回错误信息,并且会尝试重新发送消息。如果重试超过一定次数,那么直接返回错误信息。具体流程如下图所示。

Broker主要参数

Broker需要配置存储信息,即Broker需要使用哪些磁盘。关于存储的相关参数重点有以下几个。

log.dirs

指定了 Broker 需要使用的若干个文件目录路径,在线上生产环境中一定要为log.dirs配置多个路径,具体格式是一个 CSV 格式,也就是用逗号分隔的多个路径,比如/home/kafka1,/home/kafka2,/home/kafka3这样。如果有条件的话你最好保证这些目录挂载到不同的物理磁盘上。这样做有两个好处:

  • 提升读写性能:比起单块磁盘,多块物理磁盘同时读写数据有更高的吞吐量。

  • 能够实现故障转移:即 Failover。这是 Kafka 1.1 版本新引入的强大功能。要知道在以前,只要 Kafka Broker 使用的任何一块磁盘挂掉了,整个 Broker 进程都会关闭。但是自 1.1 开始,这种情况被修正了,坏掉的磁盘上的数据会自动地转移到其他正常的磁盘上,而且 Broker 还能正常工作。还记得上一期我们关于 Kafka 是否需要使用 RAID 的讨论吗?这个改进正是我们舍弃 RAID 方案的基础:没有这种 Failover 的话,我们只能依靠 RAID 来提供保障。

listeners

监听器,其实就是告诉外部连接者要通过什么协议访问指定主机名和端口开放的 Kafka 服务

从构成上来说,它是若干个逗号分隔的三元组,每个三元组的格式为<协议名称,主机名,端口号>。这里的协议名称可能是标准的名字,比如 PLAINTEXT 表示明文传输、SSL 表示使用 SSL 或 TLS 加密传输等;也可能是你自己定义的协议名字,比如CONTROLLER: //localhost:9092

advertised.listeners

组监听器是 Broker 用于对外发布的,主要是为外网访问用

 

Topic参数

auto.create.topics.enable

是否允许自动创建 Topic,最好设置成false,即不允许自动创建 Topic

unclean.leader.election.enable

关闭 Unclean Leader 选举

auto.leader.rebalance.enable

设置它的值为 true 表示允许 Kafka 定期地对一些 Topic 分区进行 Leader 重选举,当然这个重选举不是无脑进行的,它要满足一定的条件才会发生。严格来说它与上一个参数中 Leader 选举的最大不同在于,它不是选 Leader,而是换 Leader!比如 Leader A 一直表现得很好,但若auto.leader.rebalance.enable=true,那么有可能一段时间后 Leader A 就要被强行卸任换成 Leader B。

你要知道换一次 Leader 代价很高的,原本向 A 发送请求的所有客户端都要切换成向 B 发送请求,而且这种换 Leader 本质上没有任何性能收益,因此我建议你在生产环境中把这个参数设置成 false。

 

 

 

 

kafka生产者多种实现方式

参考技术A 本文介绍kafka生产者多种实现方式,
方式1:
编写一个Produce类继承kafkaProduce:
public class Producer extends KafkaProducer<String, String>
public Producer(Properties properties)
super(properties);



编写ProduceUtils工具类:
public class ProduceUtils

// properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com.knowyou.tt.NewPartitioner");//自定义分区函数
producer = new Producer(properties);
System.out.println("loading the properities......");


编写回调函数Callback:
public class ProduceCallback implements Callback
private static final Logger log = LoggerFactory.getLogger(ProduceCallback.class);



调用ProduceUtils即可以实现生产功能。

方式2:
编写一个生产者的工厂类:
public class ProduceFactory
private static final Logger logger = Logger.getLogger(ProduceFactory.class);

3,采用spring-kafka的模式生产数据
先做一个工厂类:
public class ProduceFactory



再做一个工具类:
public class KafkaTool
private static KafkaTemplate<String, String> template = ProduceFactory.getKafkaTemplate();

以上是关于Kafka生产者的主要内容,如果未能解决你的问题,请参考以下文章

kafka源码分析 生产消息过程

kafka生产者的发送消息的流程以及代码案例

大数据(6f)图解Kafka生产者和消费者API

spring kafka中是不是有多个生产者的代码示例?

如何保证kafka生产者发送消息的可靠性

Java API中kafka生产者发送消息没有成功