Kafaka基础快速入门
Posted 长安不及十里
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kafaka基础快速入门相关的知识,希望对你有一定的参考价值。
Kafaka基本入门
文章目录
一 基本认识
1.1 消息中间件(消息队列)
-
消息(Message):是在两台计算机间传送的数据单位。消息可以非常简单,例如只包含文本字符串;也可以更复杂,可能包含嵌入对象。
-
队列(Queue):消息队列,用来保存消息直到发送给消费者。是一种数据结构,先进进出。
-
消息队列的主要特点是异步处理,主要目的是减少请求响应时间和解耦。所以主要的使用场景就是将比较耗时而且不需要即时(同步)返回结果的操作作为消息放入消息队列。同时由于使用了消息队列,只要保证消息格式不变,消息的发送方和接收方并不需要彼此联系,也不需要受对方的影响,即解耦和。这也是消息中间件的意义所在。
1.2 常用消息中间件
ActiveMQ
:是 Apache开源产品,完全支持 J M S 规范的消息中间件,是一个纯Java程序,因此只需要操作系统支持Java虚拟机,ActiveMQ
便可执行。ActiveMQ
可以很容易内嵌到使用Spring
的系统里面去通过了常见J2EE
服务器的测试。JMS
即Java消息服务(Java Message Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的API
,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。其丰富的API
、多种集群构建模式使得他成为业界老牌消息中间件,在中小型企业中应用广泛!Kafka
: 是由Linkedin
公司开发的,它是一个分布式的,支持多分区、多副本,基于Zookeeper
的分布式消息流平台,它同时也是一款开源的基于发布订阅模式的消息引擎系统。KAFKA
基于TCP协议。RocketMQ
:阿里系下开源的一款分布式、队列模型的消息中间件,原名Metaq
,3.0版本名称改为RocketMQ
,是阿里参照kafka
设计思想使用java
实现的一套mq
。同时将阿里系内部多款mq
产品(Notify、metaq
)进行整合,只维护核心功能,去除了所有其他运行时依赖,保证核心功能最简化,在此基础上配合阿里上述其他开源产品实现不同场景下mq
的架构,目前主要多用于订单交易系统。RabbitMQ
:使用Erlang
编写的一个开源的消息队列,本身支持很多的协议:AMQP,XMPP, SMTP,STOMP
,也正是如此,使的它变的非常重量级,更适合于企业级的开发。同时实现了Broker架构,核心思想是生产者不会将消息直接发送给队列,消息在发送给客户端时先在中心队列排队。对路由(Routing),负载均衡(Load balance)、数据持久化都有很好的支持。多用于进行企业级的ESB
整合。ZeroMQ
:号称最快的消息队列系统,专门为高吞吐量/低延迟的场景开发,在金融界的应用中经常使用,偏重于实时数据通信场景。ZMQ
能够实现RabbitMQ
不擅长的高级/复杂的队列,但是开发人员需要自己组合多种技术框架,开发成本高。
1.3 通信协议
-
AMQP
:Advanced Message Queuing Protocol
一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同开发语言等条件的限制。 -
MQTT
:(Message Queuing Telemetry Transport,消息队列遥测传输)
是IBM开发的一个即时通讯协议,有可能成为物联网的重要组成部分。该协议支持所有平台,几乎可以把所有联网物品和外部连接起来,被用来当做传感器和致动器(比如通过Twitter让房屋联网)的通信协议。 -
STOMP
:(Streaming Text Orientated Message Protocol)
是流文本定向消息协议,是一种为MOM(Message Oriented Middleware
,面向消息的中间件)设计的简单文本协议。STOMP提供一个可互操作的连接格式,允许客户端与任意STOMP消息代理(Broker)进行交互。 -
XMPP
:(可扩展消息处理现场协议,Extensible Messaging and Presence Protocol)
是基于可扩展标记语言(XML)的协议,多用于即时消息(IM)以及在线现场探测。适用于服务器之间的准即时操作。核心是基于XML流传输,这个协议可能最终允许因特网用户向因特网上的其他任何人发送即时消息,即使其操作系统和浏览器不同。 -
其他:有些特殊框架(如:
redis、kafka、zeroMq
等)根据自身需要未严格遵循MQ规范,而是基于TCP\\IP自行封装了一套协议,通过网络socket接口进行传输,实现了MQ的功能。
1.4 基本术语
Broker
消息服务器,作为server提供消息核心服务
Producer
消息生产者,业务的发起方,负责生产消息传输给broker,
Consumer
消息消费者,业务的处理方,负责从broker获取消息并进行业务逻辑处理
Topic
主题,发布订阅模式下的消息统一汇集地,不同生产者向topic发送消息,由MQ服务器分发到不同的订阅者,实现消息的 广播
Queue
队列,PTP模式下,特定生产者向特定queue发送消息,消费者订阅特定的queue完成指定消息的接收
Message
消息体,根据不同通信协议定义的固定格式进行编码的数据包,来封装业务数据,实现消息的传输
二 kafaka的基本介绍
2.1 概述
Kafka是最初由Linkedin
公司开发,是一个分布式、分区的、多副本的、多订阅者,基于zookeeper
协调的分布式日志系统(也可以当做MQ
系统),常见可以用于web/nginx
日志、访问日志,消息服务等等,Linkedin
于2010年贡献给了Apache基金会并成为顶级开源项目。
主要应用场景是:日志收集系统和消息系统。
Kafka主要设计目标如下:
- 以时间复杂度为O(1)的方式提供消息持久化能力,即使对TB级以上数据也能保证常数时间的访问性能。
- 高吞吐率。即使在非常廉价的商用机器上也能做到单机支持每秒100K条消息的传输。
- 支持Kafka Server间的消息分区,及分布式消费,同时保证每个partition内的消息顺序传输。
- 同时支持离线数据处理和实时数据处理。
- Scale out:支持在线水平扩展
2.2 消息系统介绍
一个消息系统负责将数据从一个应用传递到另外一个应用,应用只需关注于数据,无需关注数据在两个或多个应用间是如何传递的。分布式消息传递基于可靠的消息队列,在客户端应用和消息系统之间异步传递消息。有两种主要的消息传递模式:点对点传递模式、发布-订阅模式。大部分的消息系统选用发布-订阅模式。Kafka就是一种发布-订阅模式。
2.3 点对点消息传递模式
在点对点消息系统中,消息持久化到一个队列中。此时,将有一个或多个消费者消费队列中的数据。但是一条消息只能被消费一次。当一个消费者消费了队列中的某条数据之后,该条数据则从消息队列中删除。该模式即使有多个消费者同时消费数据,也能保证数据处理的顺序。这种架构描述示意图如下:
生产者发送一条消息到queue,只有一个消费者能收到。
2.4 发布-订阅消息传递模式
在发布-订阅消息系统中,消息被持久化到一个topic中。与点对点消息系统不同的是,消费者可以订阅一个或多个topic,消费者可以消费该topic中所有的数据,同一条数据可以被多个消费者消费,数据被消费后不会立马删除。在发布-订阅消息系统中,消息的生产者称为发布者,消费者称为订阅者。该模式的示例图如下:
发布者发送到topic的消息,只有订阅了topic的订阅者才会收到消息。
三 Kafka中的术语解释
3.1 概述
在深入理解Kafka之前,先介绍一下Kafka中的术语。下图展示了Kafka的相关术语以及之间的关系:
3.2 broker
Kafka 集群包含一个或多个服务器,服务器节点称为broker。
3.3 Topic
每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic,类似于数据库的表名。
3.4 Partition
topic中的数据分割为一个或多个partition。每个topic至少有一个partition。每个partition中的数据使用多个segment文件存储。partition中的数据是有序的,不同partition间的数据丢失了数据的顺序。如果topic有多个partition,消费数据时就不能保证数据的顺序。在需要严格保证消息的消费顺序的场景下,需要将partition数目设为1。
3.5 Producer
生产者即数据的发布者,该角色将消息发布到Kafka的topic中。broker接收到生产者发送的消息后,broker将该消息追加到当前用于追加数据的segment文件中。生产者发送的消息,存储到一个partition中,生产者也可以指定数据存储的partition。
3.6 Consumer
消费者可以从broker中读取数据。消费者可以消费多个topic中的数据。
3.7 Consumer Group
每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group name,若不指定group name则属于默认的group)。
3.8 Leader
每个partition有多个副本,其中有且仅有一个作为Leader,Leader是当前负责数据的读写的partition。
3.9 Follower
Follower跟随Leader,所有写请求都通过Leader路由,数据变更会广播给所有Follower,Follower与Leader保持数据同步。如果Leader失效,则从Follower中选举出一个新的Leader。
四 Kafaka的安装
4.1 Zookeeper的安装
- 首页:Apache ZooKeeper
- 安装
# 解压
tar -zxvf apache-zookeeper-3.7.0-bin.tar.gz
# 修改配置文件
cd conf
cp coo_sample.cfg zoo.cfg
vim zoo.cfg
#启动
bin/zkServer.sh start
#查看
jps
#状态查看
bin/zkServer.sh status
#停止
bin/zkServer.sh stop
#启动客户端
bin/zkCli.sh
#退出
quit
tickTime = 2000
:通信心跳时间,Zookeeper
服务器与客户端心跳时间,单位毫秒initLimit = 10
:LF初始通信时限,Leader和Follower初始连接时能容忍的最多心跳数(tickTime
的数量)syncLimit = 5
:LF同步通信时限,Leader和Follower之间通信时间如果超过syncLimit * tickTime
,Leader认为Follwer
死 掉,从服务器列表中删除Follwer
。dataDir
:保存Zookeeper
中的数据,注意:默认的tmp
目录,容易被Linux系统定期删除,所以一般不用默认的tmp
目录。clientPort = 2181
:客户端连接端口,通常不做修改。
4.2 Kafka的安装
- 官网:Apache Kafka
#解压
tar -zxvf kafka_2.11-2.4.0.tgz
#修改配置文件
cd config
vim server.properties
# 修改以下配置
#broker.id属性在kafka集群中必须要是唯⼀
broker.id=0
#kafka部署的机器ip和提供服务的端⼝号(内网)
#listeners=PLAINTEXT://服务器地址:9092
#阿里云外网
advertised.listeners=PLAINTEXT://阿里云地址:9092
#kafka的消息存储⽂件
log.dir=/usr/local/data/kafka-logs
#kafka连接zookeeper的地址
zookeeper.connect=192.168.65.60:2181
#是否可以删除
delete.topic.enable=true
# 启动
cd bin
./kafka-server-start.sh -daemon ../config/server.properties
# 检查是否启动
jps
#查看端口问题
netstat -an | grep 9092
#或者
lsof -i:9092
# 防火墙开发端口
firewall-cmd --zone=public --add-port=9092/tcp --permanent
firewall-cmd --reload
#停止kafka
./kafka-server-stop.sh ../config/server.properties
4.3 基本命令
注:这些命令我们不需要记,因为我们是在代码中完成这些命令
4.3.1 创建topic
./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic my
4.3.2 查看创建的topic
./kafka-topics.sh --list --zookeeper localhost:2181
4.3.3 删除某个topic
删除topic的前提是需要将kafka的消费者和生产者停止
./kafka-topics.sh --delete --zookeeper localhost:2181 --topic my
4.3.4 查看某个topic的信息
./kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic: my-replicated-topic PartitionCount: 1 ReplicationFactor: 1 Configs:
Topic: my-replicated-topic Partition: 0 Leader: 0 Replicas: 0 Isr: 0
4.3.5 发送消息
./kafka-console-producer.sh --broker-list 服务器地址:9092 --topic my
4.3.6 接受消息
# 重头消费
./kafka-console-consumer.sh --bootstrap-server 服务器地址:9092 --topic my --from-beginning
# :从当前主题中的最后⼀条消息的offset(偏移量位置)+1开始消费
./kafka-console-consumer.sh --bootstrap-server 服务器地址:9092 --topic my
4.3.7 消息的有序性
- ⽣产者将消息发送给broker,broker会将消息保存在本地的⽇志⽂件中
- 消息的保存是有序的,通过offset偏移量来描述消息的有序性
- 消费者消费消息时也是通过offset来描述当前要消费的那条消息的位置
4.4 消费者组
4.4.1 单播消费
在⼀个kafka的topic中,启动两个消费者,⼀个⽣产者,问:⽣产者发送消息,这条消息是否 同时会被两个消费者消费? 如果多个消费者在同⼀个消费组,那么只有⼀个消费者可以收到订阅的topic中的消息。换⾔ 之,同⼀个消费组中只能有⼀个消费者收到⼀个topic中的消息。
./kafka-console-consumer.sh --bootstrap-server 服务器地址:9092 --consumer-property group.id=testGroup --topic my --from-beginning
4.4.2 多播消费
不同的消费组订阅同⼀个topic,那么不同的消费组中只有⼀个消费者能收到消息。实际上也 是多个消费组中的多个消费者收到了同⼀个消息。
./kafka-console-consumer.sh --bootstrap-server 服务器地址:9092 --consumer-property group.id=testGroup01 --topic my --from-beginning
./kafka-console-consumer.sh --bootstrap-server 服务器地址:9092 --consumer-property group.id=testGroup02 --topic my --from-beginning
4.4.3 查看消费组的信息
./kafka-consumer-groups.sh --bootstrap-server 服务器地海:9092 --describe --group testGroup
重点关注以下⼏个信息:
- current-offset: 最后被消费的消息的偏移量
- Log-end-offset: 消息总量(最后⼀条消息的偏移量)
- Lag:积压了多少条消息
五 Kafka中主题和分区的概念
5.1 主题
主题-topic在kafka中是⼀个逻辑的概念,kafka通过topic将消息进⾏分类。不同的topic会被 订阅该topic的消费者消费。 但是有⼀个问题,如果说这个topic中的消息⾮常⾮常多,多到需要⼏T来存,因为消息是会被 保存到log⽇志⽂件中的。为了解决这个⽂件过⼤的问题,kafka提出了Partition分区的概念
5.2 分区
通过partition将⼀个topic中的消息分区来存储。这样的好处有多个:
-
分区存储,可以解决统⼀存储⽂件过⼤的问题
-
提供了读写的吞吐量:读和写可以同时在多个分区中进⾏
./kafka-topics.sh --create --zookeeper localhost:2181 --replicationfactor 1 --partitions 2 --topic test
5.3 日志信息
- 00000.log: 这个⽂件中保存的就是消息
- __consumer_offsets-49: kafka内部⾃⼰创建了__consumer_offsets主题包含了50个分区。这个主题⽤来存放消费 者消费某个主题的偏移量。因为每个消费者都会⾃⼰维护着消费的主题的偏移量,也就是 说每个消费者会把消费的主题的偏移量⾃主上报给kafka中的默认主题: consumer_offsets。
- 因此kafka为了提升这个主题的并发性,默认设置了50个分区。 提交到哪个分区:通过hash函数:hash(consumerGroupId) % __consumer_offsets 主题的分区数 提交到该主题中的内容是:key是consumerGroupId+topic+分区号,value就是当前 offset的值 ⽂件中保存的消息,默认保存7天。
- 七天到后消息会被删除。
六 Kafka集群的搭建
6.1 Zookeeper集群的搭建
-
注意开放端口,以及关闭防火墙
-
ip:2181,ip:2182,ip:2183
-
修改配置文件
cd conf
#修改配置文件
vim zoo.cfg
# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just
# example sakes.
dataDir=/environment/zookeeper/apache-zookeeper-3.6.3-bin/data_log
# the port at which the clients will connect
clientPort=2182
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60
#
# Be sure to read the maintenance section of the
# administrator guide before turning on autopurge.
#
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1
## Metrics Providers
#
# https://prometheus.io Metrics Exporter
#metricsProvider.className=org.apache.zookeeper.metrics.prometheus.PrometheusMetricsProvider
#metricsProvider.httpPort=7000
#metricsProvider.exportJvmInfo=true
server.1=ip1:2888:3888
server.2=ip2:2888:3888
server.3=ip3:2888:3888
quorumListenOnAllIPs=true
#启动zookeeper,修改其他机器的配置文件
bin/zkServer.sh start、
# 等待一下,查看选举状态
bin/zkServer.sh status
[root@shu apache-zookeeper-3.6.3-bin]# bin/zkServer.sh status
/usr/bin/java
ZooKeeper JMX enabled by default
Using config: /environment/zookeeper/apache-zookeeper-3.6.3-bin/bin/../conf/zoo.cfg
Client port found: 2182. Client address: localhost. Client SSL: false.
Mode: leader
[root@shu apache-zookeeper-3.6.3-bin]#
# 问题:端口开发问题,防火墙问题
# 防火墙开发端口
firewall-cmd --zone=public --add-port=2182/tcp --permanent
firewall-cmd --reload
#关闭防火墙
systemctl stop firewalld
6.2 Kafka集群的搭建
-
注意开放端口,以及关闭防火墙
-
ip:9092,ip:9093,ip:9094
-
修改配置文件
cd config
#修改配置文件
vim server.properties
#修改zookeeper连接
zookeeper.connect=ip:2181,ip:2182,ip:2183
# 分布修改三台的机器的配置文件,并启动
#broker.id属性在kafka集群中必须要是唯⼀
broker.id=0
./kafka-server-start.sh -daemon ../config/server.properties
# 检查是否启动
jps
#查看端口问题
netstat -an | grep 9092
#或者
lsof -i:9092
# 防火墙开发端口
firewall-cmd --zone=public --add-port=9092/tcp --permanent
firewall-cmd --reload
#停止kafka
./kafka-server-stop.sh ../config/server.properties
# 验证,我们在lead机器上面创建一个topic
./kafka-topics.sh --create --zookeeper localhost:2182 --replication-factor 1 --partitions 1 --topic my
#查看其余机器上的topic
[root@xlc bin]# ./kafka-topics.sh --list --zookeeper localhost:2183
my
[root@shu bin]# ./kafka-topics.sh --list --zookeeper localhost:2181
my
6.3 副本的概念
副本是为了为主题中的分区创建多个备份,多个副本在kafka集群的多个broker中,会有⼀个 副本作为leader,其他是follower(就是备份)
# 创建topic
./kafka-topics.sh --create --zookeeper localhost:2182 --replication-factor 3 --partitions 2 --topic my-replicated-topic
# 查看topic详细信息
./kafka-topics.sh --describe --zookeeper localhost:2182 --topic my-replicated-topic
[root@shu bin]# ./kafka-topics.sh --create --zookeeper localhost:2182 --replication-factor 3 --partitions 2 --topic my-replicated-topic
Created topic my-replicated-topic.
[root@shu bin]# ./kafka-topics.sh --describe --zookeeper localhost:2182 --topic my-replicated-topic
Topic: my-replicated-topic PartitionCount: 2 ReplicationFactor: 3 Configs:
Topic: my-replicated-topic Partition: 0 Leader: 2 Replicas: 2,1,0 Isr: 2,1,0
Topic: my-replicated-topic Partition: 1 Leader: 0 Replicas: 0,2,1 Isr: 0,2,1
-
leader: kafka的写和读的操作,都发⽣在leader上。
-
leader负责把数据同步给follower。当leader挂 了,经过主从选举,从多个follower中选举产⽣⼀个新的leader follower 接收leader的同步的数据
-
isr:可以同步和已同步的节点会被存⼊到isr集合中。这⾥有⼀个细节:如果isr中的节点性能较差,会被提出isr集合。
-
集群中有多个broker,创建主题时可以指明主题有多个分区(把消息拆分到不同的分区中存 储),可以为分区创建多个副本,不同的副本存放在不同的broker⾥。
6.4 集群消费
- 我们在领导服务器中,创建主体,发送消息
# 创建topic
./kafka-topics.sh --create --zookeeper localhost:2182 --replication-factor 3 --partitions 2 --topic my-replicated-topic
# 查看topic信息
./kafka-topics.sh --describe --zookeeper localhost:2182 --topic my-replicated-topic
# 创建消息
./kafka-console-producer.sh --broker-list ip:9093 --topic my-replicated-topic
>nihao
- 其余机器接受消息
./kafka-console-consumer.sh --bootstrap-server ip:9092 --topic my-replicated-topic
./kafka-console-consumer.sh --bootstrap-server ip:9093 --topic my-replicated-topic
- 集群消费组命令,参考前面的消费者组命令
- ⼀个partition只能被⼀个消费组中的⼀个消费者消费,⽬的是为了保证消费的顺序性,但 是多个partion的多个消费者消费的总的顺序性是得不到保证的,那怎么做到消费的总顺 序性呢?
- partition的数量决定了消费组中消费者的数量,建议同⼀个消费组中消费者的数量不要超 过partition的数量,否则多的消费者消费不到消息
6.5 集群中的controller
- 集群中谁来充当controller 每个broker启动时会向zk创建⼀个临时序号节点,获得的序号最⼩的那个broker将会作为集 群中的controller,
- 负责这么⼏件事: 当集群中有⼀个副本的leader挂掉,需要在集群中选举出⼀个新的leader,选举的规则是 从isr集合中最左边获得。
- 当集群中有broker新增或减少,controller会同步信息给其他broker 当集群中有分区新增或减少,controller会同步信息给其他broker
6.6 rebalance机制
- 前提:消费组中的消费者没有指明分区来消费 触发的条件:当消费组中的消费者和分区的关系发⽣变化的时候
- 分区分配的策略:在rebalance之前,分区怎么分配会有这么三种策略
- range:根据公示计算得到每个消费消费哪⼏个分区:前⾯的消费者是分区总数/消费 者数量+1,之后的消费者是分区总数/消费者数量
- 轮询:⼤家轮着来
- sticky:粘合策略,如果需要rebalance,会在之前已分配的基础上调整,不会改变之 前的分配情况。如果这个策略没有开,那么就要进⾏全部的重新分配。建议开启。
6.8 HW和LEO
- LEO是某个副本最后消息的消息位置(log-end-offset)
- HW是已完成同步的位置。消息在写⼊broker时,且每个broker完成这条消息的同步后,hw 才会变化。在这之前消费者是消费不到这条消息的。
- 在同步完成之后,HW更新之后,消费者 才能消费到这条消息,这样的⽬的是防⽌消息的丢失。
七 代码中的实现
7.1 消息提供者
7.1 .1 Java消息提供者代码中的实现
- 依赖
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.4.1</version>
</dependency>
- 代码
/**
* @Author shu
* @Date: 2021/10/22/ 16:25
* @Description
**/
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class MySimpleProducer {
private final static String TOPIC_NAME = "my-replicated-topic";
public static void main(String[] args) throws ExecutionException, InterruptedException {
//1.设置参数
Properties props = new Properties();
//领导者主机
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "ip:9093");
//把发送的key从字符串序列化为字节数组
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
//把发送消息value从字符串序列化为字节数组
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
//2.创建⽣产消息的客户端,传⼊参数
Producer<String,String> producer = new KafkaProducer<String, String>(props);
//3.创建消息
//key:作⽤是决定了往哪个分区上发,value:具体要发送的消息内容
ProducerRecord<String,String> producerRecord = new ProducerRecord<>(TOPIC_NAME,"value","hello-kafka-ok");
//4.发送消息,得到消息发送的元数据并输出
RecordMetadata metadata = producer.send(producerRecord).get();
System.out.println( "topic-" + metadata.topic() + "|partition-" + metadata.partition() + "|offset-" + metadata.offset());
}
}
- 查看结果
可以发现我们的消费者已经收到了消息
7.1.2 ⽣产者中的ack的配置
同步
-
ack = 0 kafka-cluster
:不需要任何的broker收到消息,就⽴即返回ack给⽣产者,最容易 丢消息的,效率是最⾼的 -
ack=1(默认)
: 多副本之间的leader已经收到消息,并把消息写⼊到本地的log中,才 会返回ack给⽣产者,性能和安全性是最均衡的 -
ack=-1/all
:⾥⾯有默认的配置min.insync.replicas=2(默认为1,推荐配置⼤于等于2), 此时就需要leader和⼀个follower同步完后,才会返回ack给⽣产者(此时集群中有2个 broker已完成数据的接收),这种⽅式最安全,但性能最差。
props.put(ProducerConfig.ACKS_CONFIG, "1");
/*
发送失败会重试,默认重试间隔100ms,重试能保证消息发送的可靠性,但是也可能造
成消息重复发送,⽐如⽹络抖动,所以需要在
接收者那边做好消息接收的幂等性处理
*/
props.put(ProducerConfig.RETRIES_CONFIG, 3);
//重试间隔设置
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 300);
-
producer 先从 zookeeper 的 “/brokers/…/state” 节点找到该 partition 的 leader
-
producer 将消息发送给该 leader
-
leader 将消息写入本地 log
-
followers 从 leader pull 消息,写入本地 log 后 leader 发送 ACK
-
leader 收到所有 ISR 中的 replica 的 ACK 后,增加 HW(high watermark,最后 commit 的 offset) 并向 producer 发送 ACK
异步
异步发送,⽣产者发送完消息后就可以执⾏之后的业务,broker在收到消息后异步调⽤⽣产 者提供的callback回调⽅法。但是容易造成消息丢失。
//异步发送消息
producer.send(producerRecord, new Callback() {
public void onCompletion(RecordMetadata metadata, Exception
exception) {
if (exception != null) {
System.err.println("发送消息失败:" +
exception.getStackTrace());
}
if (metadata != null) {
System.out.println("异步⽅式发送消息结果:" + "topic-" +
metadata.topic() + "|partition-"
+ metadata.partition() + "|offset-" + metadata.offset());
}
}
});
7.1.3 消息缓冲区
- kafka默认会创建⼀个消息缓冲区,⽤来存放要发送的消息,缓冲区是32m
- kafka本地线程会去缓冲区中⼀次拉16k的数据,发送到broker
- 如果线程拉不到16k的数据,间隔10ms也会将已拉到的数据发到broker 七、Java客户端消费者的实现细节
//缓存区默认大小
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432);
//拉取数据默认大小
props.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);
//如果数据未满16k,也提交
props.put(ProducerConfig.LINGER_MS_CONFIG,10);
7.2 消息消费者
7.2.1 java客服端基本实现
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.以上是关于Kafaka基础快速入门的主要内容,如果未能解决你的问题,请参考以下文章
spring cloud 2.x版本 Spring Cloud Stream消息驱动组件基础教程(kafaka篇)
MQ之主流MQ:kafaka+RocketMQ+RabbitMQ对比