Spark基础学习笔记31:Kafka分布式消息系统
Posted howard2005
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark基础学习笔记31:Kafka分布式消息系统相关的知识,希望对你有一定的参考价值。
文章目录
零、本讲学习目标
- 掌握Kafka的架构原理
- 掌握Kafka的主题、分区、消费者组的概念
- 掌握Kafka的数据存储机制
- 掌握Kafka集群环境的搭建
- 掌握Kafka Java API的操作
- 掌握Kafka生产者拦截器的使用
一、Kafka概述
- Kafka官网:https://kafka.apache.org/
(一)什么是Kafka
- 在
Spark
生态体系中,Kafka
占有非常重要的位置。Kafka
是一个使用Scala
语言编写的基于ZooKeeper
的高吞吐量低延迟的分布式发布与订阅消息系统,它可以实时处理大量消息数据以满足各种需求,比如基于Hadoop
的批处理系统,低延迟的实时系统等。即便使用非常普通的硬件,Kafka每秒也可以处理数百万条消息、其延迟最低只有几毫秒。 - 在实际开发中,
Kafka
常常作为Spark Streaming
的实时数据源,Spark Streaming
从Kafka`中读取实时消息进行处理,保证了数据的可靠性与实时性。二者是实时消息处理系统的重要组成部分。 - 那么
Kafka
到底是什么?简单来说,Kafka
是消息中间件的一种。举一个生产者与消费者的例子:生产者生产鸡蛋,消费者消费鸡蛋。假设消费者消费鸡蛋的时候噎住了(系统宕机了),而生产者还在生产鸡蛋,那么新生产的鸡蛋就丢失了;再比如,生产者1秒钟生产100个鸡蛋(大交易量的情况),而消费者1秒钟只能消费50个鸡蛋,那过不了多长时间,消费者就吃不消了(消息堵塞,最终导致系统超时),导致鸡蛋又丢失了。这个时候我们放个篮子在生产者与消费者中间,生产者生产出来的鸡蛋都放到篮子里,消费者去篮子里拿鸡蛋,这样鸡蛋就不会丢失了,这个篮子就相当于Kafka
。 - 上述例子中的鸡蛋则相当于
Kafka
中的消息,篮子相当于存放消息的消息队列,也就是Kafka
集群。当篮子满了,鸡蛋放不下了,这时再加几个篮子,就是Kafka
集群扩容。
(二)Kafka中的基本概念
1、消息(Message)
- Kafka的数据单元被称为消息。可以把消息看成是数据库里的一行数据或一条记录。为了提高效率,消息可以分组传输,每一组消息就是一个批次,分成批次传输可以减少网络开销。但是批次越大,单位时间内处理的消息就越大,因此要在吞吐量和时间延迟之间做出权衡。
2、服务器节点(Broker)
- Kafka集群包含一个或多个服务器节点,一个独立的服务器节点被称为
Broker
(经纪人)。
3、主题(Topic)
- 每条发布到Kafka集群的消息都有一个类别,这个类别被称为主题。在物理上,不同主题的消息分开存储;在逻辑上,一个主题的消息虽然保存于一个或多个
Broker
上,但用户只需指定消息的主题即可生产或消费消息,而不必关心消息存于何处。主题在逻辑上可以被认为是一个队列。每条消息都必须指定它的主题,可以简单理解为必须指明把这条消息放进哪个队列里。
4、分区(Partition)
- 为了使Kafka的吞吐率可以水平扩展,物理上把主题分成一个或多个分区。创建主题时可指定分区数量。每个分区对应于一个文件夹,该文件夹下存储该分区的数据和索引文件。
5、生产者(Producer)
- 负责发布消息到Kafka的
Broker
,实际上属于Broker
的一种客户端。生产者负责选择哪些消息应该分配到哪个主题内的哪个分区。默认生产者会把消息均匀地分布到特定主题的所有分区上,但在某些情况下,生产者会将消息直接写到指定的分区。
6、消费者(Consumer)
- 从Kafka的Broker上读取消息的客户端。读取消息时需要指定读取的主题,通常消费者会订阅一个或多个主题,并按照消息生成的顺序读取它们。
二、Kafka架构
(一)Kafka消息传递流程图
- 生产者将消息发送给Kafka集群,同时Kafka集群将消息转发给消费者
(二)Kafka集群架构图
- 在Kafka中,客户端和服务器之间的通信是通过一个简单的、高性能的、与语言无关的TCP协议完成的。该协议进行了版本控制,并与旧版本保持向后兼容。Kafka不仅提供Java客户端,也提供其他多种语言的客户端。
- 一个典型的Kafka集群中包含若干生产者(数据可以是Web前端产生的页面内容或者服务器日志等)、若干Broker、若干消费者(可以是Hadoop集群、实时监控程序、数据仓库或其他服务)以及一个ZooKeeper集群。ZooKeeper用于管理和协调Broker。当Kafka系统中新增了Broker或者某个Broker故障失效时,ZooKeeper将通知生产者和消费者。生产者和消费者据此开始与其他Broker协调工作。
- Kafka集群架构图
- 生产者使用Push模式将消息发送到Broker,而消费者使用Pull模式从Broker订阅并消费消息
三、主题与分区
(一)主题与分区的关系
- Kafka通过主题对消息进行分类,一个主题可以分为多个分区,且每个分区可以存储于不同的Broker上,也就是说,一个主题可以横跨多个服务器。
- 如果你对HBase的集群架构比较了解,用HBase数据库做类比,可以将主题看作HBase数据库中的一张表,而分区则是将表数据拆分成多个部分,即HRegion。不同的HRegion可以存储于不同的服务器上,而分区也是如此。
(二)主题分区的好处
- 对主题进行分区的好处:
允许主题消息规模超出一台服务器的文件大小上限
。因为一个主题可以有多个分区,且可以存储在不同的服务器上,当一个分区的文件大小超出了所在服务器的文件大小上限时,可以动态添加其他分区,因此可以处理无限量的数据。 - Kafka会为每个主题维护一个
分区日志
,记录各个分区的消息存放情况。消息以追加的方式写入每个分区的尾部,然后以先入先出的顺序进行读取。由于一个主题包含多个分区,因此无法在整个主题范围内保证消息的顺序,但可以保证单个分区内消息的顺序
。 - 当一条消息被发送到Broker时,会根据分区规则被存储到某个分区里。如果分区规则设置得合理,所有消息将被均匀地分配到不同的分区里,这样就实现了水平扩展。如果一个主题的消息都存放到一个文件中,那么该文件所在的Broker的I/O将成为主题的性能瓶颈,而分区正好解决了这个问题。
(三)分区消息的读写
- 分区中的每个记录都被分配了一个
偏移量(Offset)
,偏移量是一个连续递增的整数值,它唯一标识分区中的某个记录。而消费者只需保存该偏移量即可,当消费者客户端向Broker发起消息请求时需要携带偏移量。 - 例如,消费者向Broker请求主题
test
的分区0
中的偏移量从20
开始的所有消息以及主题test
的分区1
中的偏移量从35
开始的所有消息。当消费者读取消息后,偏移量会线性递增。当然,消费者也可以按照任意顺序消费消息,比如读取已经消费过的历史消息(将偏移量重置到之前版本)。 - 此外,消费者还可以指定从某个分区中一次最多返回多少条数据,防止一次返回数据太多而耗尽客户端的内存。
(四)消息保留的策略
- 对于已经发布的消息,无论这些消息是否被消费,Kafka都将会保留一段时间,具体的保留策略有两种:根据时间保留(例如7天)和根据消息大小保留(例如1GB),可以进行相关参数配置,选择具体策略。当消息数量达到配置的策略上限时,Kafka就会为节省磁盘空间而将旧消息删除。
- 例如,设置消息保留两天,则两天内该消息可以随时被消费,但两天后该消息将被删除。Kafka的性能对数据大小不敏感,因此保留大量数据毫无压力。
- 每个主题也可以配置自己的保留策略,可以根据具体的业务进行设置。例如,用于跟踪用户活动的数据可能需要保留几天,而应用程序的度量指标可能只需要保留几个小时。
四、分区副本
(一)分区的复制示意图
- 在Kafka集群中,为了提高数据的可靠性,同一个分区可以复制多个副本分配到不同的Broker,这种方式类似于HDFS中的副本机制。如果其中一个Broker宕机,其他Broker可以接替宕机的Broker,不过生产者和消费者需要重新连接到新的Broker。
(二)分区副本分为两种类型
- Kafka每个分区的副本都被分为两种类型:
领导者副本
和跟随者副本
。领导者副本只有一个,其余的都是跟随者副本。所有生产者和消费者都向领导者副本发起请求,进行消息的写入与读取,而跟随者副本并不处理客户端的请求,它唯一的任务是从领导者副本复制消息,以保持与领导者副本数据及状态的一致。如果领导者副本发生崩溃,就会从其余的跟随者副本中选出一个作为新的领导者副本。 - 领导者与跟随者在Kafka集群中分布图
(三)跟随者如何与领导者保持同步
1、提出问题
- 既然跟随者副本会从领导者副本那里复制消息,那么这种复制是领导者主动向跟随者发起Push(推送)请求还是跟随者向领导者发起Pull(拉取)请求?
2、分析问题
- 跟随者为了与领导者保持同步,会周期性地向领导者发起获取数据的请求(Pull),这种请求与消费者读取消息发送的请求是一样的。请求消息里包含跟随者想要获取消息的偏移量,偏移量的值随着每次请求进行递增。领导者从跟随者请求的偏移量可以知道消息复制的进度。
3、解决问题
- 领导者与跟随者之间的消息复制什么时候才认为是成功的呢? 是同步复制还是异步复制?如果个别跟随者由于网络问题导致消息没有复制完成,是否允许消费者对消息进行读取?
- Kafka的消息复制是以分区为单位的,既不是完全的同步复制,又不是完全的异步复制,而是基于ISR(In-Sync Replica) 的动态复制方案。
- 领导者会维护一个需要与其保持同步的副本列表(包括领导者自己),该列表称为
ISR
,且每个分区都会有一个ISR
。如果在一定时间内(可以通过参数replica.lag.time.max.ms
进行配置),跟随者没有向领导者请求新的消息(可能由于网络问题),该跟随者将被认为是不同步的,领导者会从ISR
中将其移除,从而避免因跟随者的请求速度过慢而拖慢整体速度。而当跟随者重新与领导者保持同步,领导者会将其再次加入ISR
中。当领导者失效时,也不会选择ISR
中不存在的跟随者作为新的领导者。 ISR
的列表数据保存在ZooKeeper
中,每次ISR
改变后,领导者都会将最新的ISR
同步到ZooKeeper
中。- 每次消息写入时,只有
ISR
中的所有跟随者都复制完毕,领导者才会将消息写入状态置为Commit
(写入成功),而只有状态置为Commit
的消息才能被消费者读取。从消费者的角度来看,要想成功读取消息,ISR
中的所有副本必须处于同步状态,从而提高数据的一致性。 - 可以通过设置
min.insync.replicas
参数指定ISR
的最小数量,默认为1
,即ISR
中的所有跟随者都可以被移除,只剩下领导者。但是在这种情况下,如果领导者失效,由于ISR
中没有跟随者,因此该分区将不可用。适当增加参数min.insync.replicas
的值将提高系统的可用性。 - 以上是站在消费者的角度来看
ISR
,即ISR
中所有的副本都成功写入消息后,才允许消费者读取。那么对于生产者来说,在消息发送完毕后,是否需要等待ISR
中所有副本成功写入才认为消息发送成功? - 生产者通过
ZooKeeper
向领导者副本所在的Broker
发送消息,领导者收到消息后需要向生产者返回消息确认收到的通知。而实际上,领导者并不需要等待ISR
中的所有副本都写入成功才向生产者进行确认。 - 在生产者发送消息时,可以对
acks
参数进行配置,该参数指定了对消息写入成功的界定。生产者可以通过acks
参数指定需要多少个副本写入成功才视为该消息发送成功。acks
参数有3
个值,分别为1
、all
和0
。若acks=1(默认值)
,只要领导者副本写入成功,生产者就认为写入成功;acks=all
,则需要ISR中的所有副本都写入成功,生产者才能认为写入成功;若acks=0
,则生产者将消息发送出去后,立即认为该消息发送成功,不需要等待Broker的响应(而实际上该消息可能发送失败)。因此,需要根据实际业务需求来设置acks
的值。 - 对于Broker来说,Broker会通过
acks
的值来判断何时向生产者返回响应。在消息被写入分区的领导者副本后,Broker开始检查acks
参数值,若acks
的值为1
或0
,则Broker立即返回响应给生产者;若acks的值为all
,则请求会被保存在缓冲区中,直到领导者检测到所有跟随者副本都已成功复制了消息,才会向生产者返回响应。
五、消费者组
(一)消费者组概念
- 消费者组(Consumer Group)实际上就是一组消费者的集合。每个消费者属于一个特定的消费者组(可为每个消费者指定组名称,消费者通过组名称对自己进行标识,若不指定组名称,则属于默认的组)。
(二)消费者组模式的特点
- 传统消息处理有两种模式:
队列模式
和发布订阅模式
。队列模式是指消费者可以从一台服务器读取消息,并且每个消息只被其中一个消费者消费;发布订阅模式是指消息通过广播方式发送给所有消费者。而Kafka提供了消费者组模式,同时具备这两种(队列和发布订阅)模式的特点。
(三)消费者组与分区的关系
- Kafka规定,同一消费者组内不允许多个消费者消费同一分区的消息,而不同的消费者组可以同时消费同一分区的消息。也就是说,分区与同一个消费者组中的消费者的对应关系是多对一,而不允许一对多。举个例子,如果同一个应用有100台机器,这100台机器属于同一个消费者组,同一条消息在100台机器中就只有一台能得到。如果另一个应用也需要同时消费同一个主题的消息,就需要新建一个消费者组并消费同一个主题的消息。
- 避免组内消费者竞争同一个分区的消息,但是不同组的消费者就可以竞争同一个分区的消息,比如分区0的消息被消费组A的消费者1与消费组B的消费者3共同消费,但是分区0的消息不会被消费组A里的消费者1与消费者2同时消费,也绝对不会被消费组B的消费者3、消费者4、消费者5和消费者6共同消费。
(四)消费者组的两种特殊情况
- 每条消息发送到主题后,只能发送给某个消费者组中的唯一一个消费者实例(可以是同一台服务器上的不同进程,也可以是不同服务器上的进程)。
1、所有消费者实例属于同一分组
- 如果所有消费者实例属于同一分组(有相同的分组名),该过程就是传统的队列模式,即同一消息只有一个消费者能得到
2、所有消费者都不属于同一分组
- 如果所有消费者都不属于同一分组,该过程就是发布订阅模式,即同一消息每个消费者都能得到。
(四)同一个分区只能被组中一个消费者消费
- 从Kafka架构图
- 属于同一个消费者组的3个消费者共同读取一个主题,其中的两个消费者各自读取一个分区,而另一个消费者同时读取了两个分区。消费者组保证了同一个分区只能被组中的一个消费者进行消费。
六、数据存储机制
(一)分区文件夹
- Kafka中的消息由主题进行分类,而主题在物理上又分为多个分区。那么分区是怎么存储数据的呢?
- 假设在Broker中有一个名为
sampletopic
的主题,该主题被分为4
个分区,则在Kafka消息存储目录(配置文件server.properties
中属性log.dirs
指定的目录)中会生成以下4
个文件夹,且这4
个文件夹可能分布于不同的Broker中。在Kafka数据存储中,每个分区的消息数据存储于一个单独的文件夹中,分区文件夹的命名规则为“主题名-分区编号
”,分区编号从0
开始,依次递增。
sampletopic-0
sampletopic-1
sampletopic-2
sampletopic-3
(二)segment(段)
1、segement概念
- 一个分区在物理上由多个segment(段)组成。segment是Kafka数据存储的最小单位。每个分区的消息数据会被分配到多个segment文件中,这种将分区细分为segment的方式,方便了旧消息(旧segment)的删除和清理,达到及时释放磁盘空间的效果。
2、segment文件的构成
- segment文件由两部分组成:索引文件(后缀为.index)和数据文件(后缀为.log),这两个文件一一相应,且成对出现。索引文件存储元数据,数据文件存储实际消息,索引文件中的元数据指向对应数据文件中消息的物理偏移地址。
3、segment文件的命名
- segment文件的命名由20位数字组成,同一分区中的第一个segment文件的命名编号从0开始,下一个segment文件的命名编号为上一个segment文件的最后一条消息的offset值。编号长度不够以0补充。
00000000000000000000.index
00000000000000000000.log
00000000000000170410.index
00000000000000170410.log
00000000000000258330.index
00000000000000258330.log
4、segement索引文件与数据文件的对应关系
- 索引文件的左侧为消息在该文件中的顺序(第几条消息),右侧为消息在数据文件中对应的偏移地址(实际物理存储位置)。可以看到,并不是所有的消息都会在索引文件中建立索引,而是采用每隔一定字节的数据建立一条索引,这种索引方式被称为
稀疏索引
。采用稀疏索引避免了索引文件占用过多的空间,从而提高了索引的查找速度。但缺点是,没有建立索引的消息不能一次定位到其在数据文件的物理位置,而是通过“二分法”定位到与其最近的消息的位置(小于等于需要查找的消息位置中的最大值),然后顺序进行扫描(此时顺序扫描的范围已经被缩小了),直到找到需要查找的消息。
5、消费者通过offset值查找消息
- 查找
offset=170413
的消息,Kafka的查找步骤
(1)通过offset值定位到索引文件
- 索引文件
00000000000000170410.index
的起始offset
值为170410+1=170411
,索引文件00000000000000258330.index
的起始offset
值为258330+1=258331
。根据二分查找法,可以快速定位到offset
值为170413
的消息的索引文件为00000000000000170410.index
。
(2)通过索引文件查询消息物理偏移地址
- 首先根据offset值查找到消息顺序,
offset
值为170413
的消息在索引文件00000000000000170410.index
中的消息顺序为170413-170410=3
。然后根据消息顺序查找(二分法)到消息在数据文件的物理偏移地址,消息顺序为3
的消息对应的物理偏移地址为256
。
(3)通过物理偏移地址定位到消息内容
- 根据查找到的物理偏移地址,到数据文件
00000000000000170410.log
中查找对应的消息内容(消息内容3
)。
七、搭建Kafka集群
(一)从官网下载Kafka
(二)上传到master虚拟机
- 将kafka安装包上传到master虚拟机
/opt
目录
(三)将Kafka安装包解压到指定目录
- 执行命令:
tar -zxvf kafka_2.11-2.0.0.tgz -C /usr/local
(四)配置Kafka环境变量
- 执行命令:
vim /etc/profile
export KAFKA_HOME=/usr/local/kafka_2.11-2.0.0
export PATH=$KAFKA_HOME/bin:$PATH
- 存盘退出,执行命令
source /etc/profile
,让环境变量生效
(五)配置Kafka服务器属性文件
- 进入Kafka配置目录,执行命令:
vim server.properties
- 在分布式环境中,建议至少修改以下配置项(若文件中无此配置项,则需要新增),其他配置项可以根据具体项目环境进行调优。
broker.id=1 # 当前server编号
num.partitions=3 # 配置分区数
default.replication.factor=2 # 消息备份副本数
listeners=PLAINTEXT://master:9092 #Socket监听的地址
log.dirs=/usr/local/kafka_2.11-2.0.0/kafka-logs # 日志存储目录
zookeeper.connect=master:2181 # 连接zookeeper
group.id=hw-kafka # 配置消费组id
- broker.id:每一个Broker都需要有一个标识符,使用
broker.id
表示,类似于ZooKeeper的myid
。broker.id必须是一个全局(集群范围)唯一的整数值,即集群中每个Kafka服务器的broker.id
的值不能相同。 - num.partitions:每个主题的分区数量,默认是1。需要注意的是,可以增加分区的数量,但是不能减少分区的数量。
- default.replication.factor:消息备份副本数,默认为1,即不进行备份。
• listeners:Socket监听的地址,用于Broker监听生产者和消费者请求,格式为listeners= security_protocol://host_name:port
。如果没有配置该参数,就默认通过Java的API(java.net.InetAddress.getCanonicalHostName())来获取主机名,端口默认为9092
,建议进行显式配置,避免多网卡时解析有误。 - log.dirs:Kafka消息数据的存储位置,可以指定多个目录,以逗号分隔。
- zookeeper.connect:ZooKeeper的连接地址。该参数是用逗号分隔的一组格式为hostname:port/path的列表,其中hostname为ZooKeeper服务器的主机名或IP地址;port是ZooKeeper客户端连接端口;/path是可选的ZooKeeper路径,如果不指定,就默认使用ZooKeeper根路径。
(六)将master虚拟机上的Kafka分发到slave1和slave2虚拟机
- 执行命令:
scp -r $KAFKA_HOME root@slave1:$KAFKA_HOME
- 执行命令:
scp -r $KAFKA_HOME root@slave2:$KAFKA_HOME
(七)将master虚拟机上的环境配置文件分发到slave1和slave2虚拟机
- 执行命令:
scp /etc/profile root@slave1:/etc/profile
- 执行命令:
scp /etc/profile root@slave2:/etc/profile
- 切换到slave1虚拟机,让环境配置生效
- 切换到slave2虚拟机,让环境配置生效
(八)修改Kafka服务器配置文件
1、slave1虚拟机
- 进入Kafka配置目录,修改服务器配置文件
2、slave2虚拟机
- 进入Kafka配置目录,修改服务器配置文件
- 说明:
group.id
属性暂时不设置也是可以的
八、命令行操作Kafka
(一)启动ZooKeeper集群
- 依次在三个节点启动zk服务
(二)启动Kafka服务
1、master节点
- 进入Kafka安装目录,执行命令:
kafka-server-start.sh -daemon config/server.properties
- 查看启动的Kafka进程
2、slave1节点
- 进入Kafka安装目录,执行命令:
kafka-server-start.sh -daemon config/server.properties
- 查看启动的Kafka进程
3、slave2节点
- 进入Kafka安装目录,执行命令:
kafka-server-start.sh -daemon config/server.properties
- 查看启动的Kafka进程
(三)使用Kafka
1、创建主题
- 在master节点上创建一个拥有3个副本的主题
- 执行命令:
kafka-topics.sh --create --zookeeper master:2181 --replication-factor 3 --partitions 3 --topic hello-kafka
- 创建了名为
hello-kafka
的主题
2、查看主题
- 在master节点上查看主题 -
hello-kafka
- 执行命令:
kafka-topics.sh --describe --zookeeper master:2181 --topic hello-kafka
- 在slave1节点上查看主题 -
hello-kafka
- 执行命令:
kafka-topics.sh --describe --zookeeper master:2181 --topic hello-kafka
- 在slave2节点上查看主题 -
hello-kafka
- 执行命令:
kafka-topics.sh --describe --zookeeper master:2181 --topic hello-kafka
- AR: Assigned Replicas的缩写,是每个partition下所有副本(replicas)的统称
- ISR: In-Sync Replicas的缩写,是指副本同步队列,ISR是AR中的一个子集
3、生产消息
- 在slave1节点上启动生产者,基于主题
hello-kafka
生产消息 - 执行命令:
kafka-console-producer.sh --broker-list master:9092 --topic hello-kafka
,生产了3条消息
4、消费消息
- 在slave2节点上启动消费者,从头开始消费主题
hello-kafka
的消息 - 执行命令:
kafka-console-consumer.sh --bootstrap-server master:9092 --from-beginning --topic hello-kafka
- 注意:消费的顺序往往跟生产的顺序不一致
5、删除主题
- 在slave1节点上删除主题
hello-kafka
- 执行命令:
kafka-topics.sh --delete --zookeeper master:2181 --topic hello-kafka
- 主题
hello-kafka
只是打了删除标记,如果delete.topic.enable
没有设置为true
,那么这个删除操作不会有什么影响,主题依然存在。
- 修改三个节点的server.properties文件,添加一句
delete.topic.enable=true
- 重启三个节点的Kafka,再在slave1节点上执行删除主题的操作
- 执行命令:
kafka-topics.sh --delete --zookeeper master:2181 --topic hello-kafka
- 查看主题
hello-kafka
是否已被删除
- 可以看到,主题
hello-kafka
已被删除掉。
6、检测容错性
- 在master节点上创建一个拥有1一个分区和3个副本的主题
- 执行命令:
kafka-topics.sh --create --zookeeper master:2181 --replication-factor 3 --partitions 1 --topic test
- 查看主题信息:执行命令
kafka-topics.sh --describe --zookeeper master:2181 --topic test
- 目前slave1(brokder.id=2)是Leader,副本存在三个服务器上,都是存活的
- 启动生产者,生产消息,执行命令:
kafka-console-producer.sh --broker-list master:9092 --topic test
- 启动消费者,消费消息,执行命令:
kafka-console-consumer.sh --bootstrap-server master:9092 --from-beginning --topic test
- 目前,Kafka Leader在slave1虚拟机,将其kafka进程杀死。
- 此时,查看主题test信息,执行命令:
kafka-topics.sh --describe --zookeeper master:2181 --topic test
- 现在
slave2
节点成为Kafka的Leader,副本存放在三个服务器上,只有master和slave2节点还存活着 - 此时,重启slave1 (broker.id=2)的kafka服务,它能成为Leader吗?执行命令:
kafka-server-start.sh config/server.properties &
- 再次,查看主题test信息,执行命令:
kafka-topics.sh --describe --zookeeper master:2181 --topic test
- Leader依然是slave2(broker.id=3),而不是刚刚启动了Kafka的slave1。
九、Java程序操作Kafka
(一)准备工作
1、在三个节点进入Kafka安装目录
- 执行命令:
cd $KAFKA_HOME
2、在三个节点启动ZooKeeper服务
- 执行命令:
zkServer.sh start
3、在三个节点启动Kafka服务
- 执行命令:
kafka-server-start.sh config/server.properties &
4、在master节点上创建主题
- 执行命令:
kafka-topics.sh --create --zookeeper master:2181 --replication-factor 3 --partitions 3 --topic foo
- 查看主题
foo
,执行命令:kafka-topics.sh --describe --zookeeper master:2181 --topic foo
(二)Java程序操作Kafka
1、创建Maven项目
- 创建Maven项目 - KafkaDemo
2、添加Kafka依赖
- 在pom.xml文件里添加Kafka依赖
3、创建日志属性文件
- 在resources目录下创建log4j.properties
log4j.rootLogger=error, stdout, logfile
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/kafka.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
4、创建包
- 创建
net.huawei.kafka
包
5、创建类
- 在
net.huawei.kafka
包里创建TestKafka
类
package net.huawei.kafka;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.*;
import org.junit.Test;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
/**
* 功能:操作Kafka
* 作者:华卫
* 日期:2022年04月20日
*/
public class TestKafka
private static String topic = "foo";//定义主题
@Test
public void sendMessage()
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "master:9092,slave1:9092,slave2:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
Producer<String, String> producer = new KafkaProducer<>(props);
for (int i = 1; i <= 10; i++)
ProducerRecord<String, String> message = new ProducerRecord<>(topic, "Kafka, I love you " + i + " times.");
producer.send(message);
System.out.println("消息发送成功~" + message.value());
producer.close以上是关于Spark基础学习笔记31:Kafka分布式消息系统的主要内容,如果未能解决你的问题,请参考以下文章