Spark基础学习笔记31:Kafka分布式消息系统

Posted howard2005

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark基础学习笔记31:Kafka分布式消息系统相关的知识,希望对你有一定的参考价值。

文章目录

零、本讲学习目标

  1. 掌握Kafka的架构原理
  2. 掌握Kafka的主题、分区、消费者组的概念
  3. 掌握Kafka的数据存储机制
  4. 掌握Kafka集群环境的搭建
  5. 掌握Kafka Java API的操作
  6. 掌握Kafka生产者拦截器的使用

一、Kafka概述

(一)什么是Kafka

  • Spark生态体系中,Kafka占有非常重要的位置。Kafka是一个使用Scala语言编写的基于ZooKeeper的高吞吐量低延迟的分布式发布与订阅消息系统,它可以实时处理大量消息数据以满足各种需求,比如基于Hadoop的批处理系统,低延迟的实时系统等。即便使用非常普通的硬件,Kafka每秒也可以处理数百万条消息、其延迟最低只有几毫秒。
  • 在实际开发中,Kafka常常作为Spark Streaming的实时数据源,Spark Streaming从Kafka`中读取实时消息进行处理,保证了数据的可靠性与实时性。二者是实时消息处理系统的重要组成部分。
  • 那么Kafka到底是什么?简单来说,Kafka是消息中间件的一种。举一个生产者与消费者的例子:生产者生产鸡蛋,消费者消费鸡蛋。假设消费者消费鸡蛋的时候噎住了(系统宕机了),而生产者还在生产鸡蛋,那么新生产的鸡蛋就丢失了;再比如,生产者1秒钟生产100个鸡蛋(大交易量的情况),而消费者1秒钟只能消费50个鸡蛋,那过不了多长时间,消费者就吃不消了(消息堵塞,最终导致系统超时),导致鸡蛋又丢失了。这个时候我们放个篮子在生产者与消费者中间,生产者生产出来的鸡蛋都放到篮子里,消费者去篮子里拿鸡蛋,这样鸡蛋就不会丢失了,这个篮子就相当于Kafka
  • 上述例子中的鸡蛋则相当于Kafka中的消息,篮子相当于存放消息的消息队列,也就是Kafka集群。当篮子满了,鸡蛋放不下了,这时再加几个篮子,就是Kafka集群扩容

(二)Kafka中的基本概念

1、消息(Message)

  • Kafka的数据单元被称为消息。可以把消息看成是数据库里的一行数据或一条记录。为了提高效率,消息可以分组传输,每一组消息就是一个批次,分成批次传输可以减少网络开销。但是批次越大,单位时间内处理的消息就越大,因此要在吞吐量和时间延迟之间做出权衡。

2、服务器节点(Broker)

  • Kafka集群包含一个或多个服务器节点,一个独立的服务器节点被称为Broker(经纪人)。

3、主题(Topic)

  • 每条发布到Kafka集群的消息都有一个类别,这个类别被称为主题。在物理上,不同主题的消息分开存储;在逻辑上,一个主题的消息虽然保存于一个或多个Broker上,但用户只需指定消息的主题即可生产或消费消息,而不必关心消息存于何处。主题在逻辑上可以被认为是一个队列。每条消息都必须指定它的主题,可以简单理解为必须指明把这条消息放进哪个队列里。

4、分区(Partition)

  • 为了使Kafka的吞吐率可以水平扩展,物理上把主题分成一个或多个分区。创建主题时可指定分区数量。每个分区对应于一个文件夹,该文件夹下存储该分区的数据和索引文件。

5、生产者(Producer)

  • 负责发布消息到Kafka的Broker,实际上属于Broker的一种客户端。生产者负责选择哪些消息应该分配到哪个主题内的哪个分区。默认生产者会把消息均匀地分布到特定主题的所有分区上,但在某些情况下,生产者会将消息直接写到指定的分区。

6、消费者(Consumer)

  • 从Kafka的Broker上读取消息的客户端。读取消息时需要指定读取的主题,通常消费者会订阅一个或多个主题,并按照消息生成的顺序读取它们。

二、Kafka架构

(一)Kafka消息传递流程图

  • 生产者将消息发送给Kafka集群,同时Kafka集群将消息转发给消费者

(二)Kafka集群架构图

  • 在Kafka中,客户端和服务器之间的通信是通过一个简单的、高性能的、与语言无关的TCP协议完成的。该协议进行了版本控制,并与旧版本保持向后兼容。Kafka不仅提供Java客户端,也提供其他多种语言的客户端。
  • 一个典型的Kafka集群中包含若干生产者(数据可以是Web前端产生的页面内容或者服务器日志等)、若干Broker、若干消费者(可以是Hadoop集群、实时监控程序、数据仓库或其他服务)以及一个ZooKeeper集群。ZooKeeper用于管理和协调Broker。当Kafka系统中新增了Broker或者某个Broker故障失效时,ZooKeeper将通知生产者和消费者。生产者和消费者据此开始与其他Broker协调工作。
  • Kafka集群架构图
  • 生产者使用Push模式将消息发送到Broker,而消费者使用Pull模式从Broker订阅并消费消息

三、主题与分区

(一)主题与分区的关系

  • Kafka通过主题对消息进行分类,一个主题可以分为多个分区,且每个分区可以存储于不同的Broker上,也就是说,一个主题可以横跨多个服务器。
  • 如果你对HBase的集群架构比较了解,用HBase数据库做类比,可以将主题看作HBase数据库中的一张表,而分区则是将表数据拆分成多个部分,即HRegion。不同的HRegion可以存储于不同的服务器上,而分区也是如此。

(二)主题分区的好处

  • 对主题进行分区的好处:允许主题消息规模超出一台服务器的文件大小上限。因为一个主题可以有多个分区,且可以存储在不同的服务器上,当一个分区的文件大小超出了所在服务器的文件大小上限时,可以动态添加其他分区,因此可以处理无限量的数据
  • Kafka会为每个主题维护一个分区日志,记录各个分区的消息存放情况。消息以追加的方式写入每个分区的尾部,然后以先入先出的顺序进行读取。由于一个主题包含多个分区,因此无法在整个主题范围内保证消息的顺序,但可以保证单个分区内消息的顺序
  • 当一条消息被发送到Broker时,会根据分区规则被存储到某个分区里。如果分区规则设置得合理,所有消息将被均匀地分配到不同的分区里,这样就实现了水平扩展。如果一个主题的消息都存放到一个文件中,那么该文件所在的Broker的I/O将成为主题的性能瓶颈,而分区正好解决了这个问题。

(三)分区消息的读写

  • 分区中的每个记录都被分配了一个偏移量(Offset),偏移量是一个连续递增的整数值,它唯一标识分区中的某个记录。而消费者只需保存该偏移量即可,当消费者客户端向Broker发起消息请求时需要携带偏移量。
  • 例如,消费者向Broker请求主题test的分区0中的偏移量从20开始的所有消息以及主题test的分区1中的偏移量从35开始的所有消息。当消费者读取消息后,偏移量会线性递增。当然,消费者也可以按照任意顺序消费消息,比如读取已经消费过的历史消息(将偏移量重置到之前版本)。
  • 此外,消费者还可以指定从某个分区中一次最多返回多少条数据,防止一次返回数据太多而耗尽客户端的内存。

(四)消息保留的策略

  • 对于已经发布的消息,无论这些消息是否被消费,Kafka都将会保留一段时间,具体的保留策略有两种:根据时间保留(例如7天)和根据消息大小保留(例如1GB),可以进行相关参数配置,选择具体策略。当消息数量达到配置的策略上限时,Kafka就会为节省磁盘空间而将旧消息删除。
  • 例如,设置消息保留两天,则两天内该消息可以随时被消费,但两天后该消息将被删除。Kafka的性能对数据大小不敏感,因此保留大量数据毫无压力。
  • 每个主题也可以配置自己的保留策略,可以根据具体的业务进行设置。例如,用于跟踪用户活动的数据可能需要保留几天,而应用程序的度量指标可能只需要保留几个小时。

四、分区副本

(一)分区的复制示意图

  • 在Kafka集群中,为了提高数据的可靠性,同一个分区可以复制多个副本分配到不同的Broker,这种方式类似于HDFS中的副本机制。如果其中一个Broker宕机,其他Broker可以接替宕机的Broker,不过生产者和消费者需要重新连接到新的Broker。

(二)分区副本分为两种类型

  • Kafka每个分区的副本都被分为两种类型:领导者副本跟随者副本。领导者副本只有一个,其余的都是跟随者副本。所有生产者和消费者都向领导者副本发起请求,进行消息的写入与读取,而跟随者副本并不处理客户端的请求,它唯一的任务是从领导者副本复制消息,以保持与领导者副本数据及状态的一致。如果领导者副本发生崩溃,就会从其余的跟随者副本中选出一个作为新的领导者副本。
  • 领导者与跟随者在Kafka集群中分布图

(三)跟随者如何与领导者保持同步

1、提出问题

  • 既然跟随者副本会从领导者副本那里复制消息,那么这种复制是领导者主动向跟随者发起Push(推送)请求还是跟随者向领导者发起Pull(拉取)请求?

2、分析问题

  • 跟随者为了与领导者保持同步,会周期性地向领导者发起获取数据的请求(Pull),这种请求与消费者读取消息发送的请求是一样的。请求消息里包含跟随者想要获取消息的偏移量,偏移量的值随着每次请求进行递增。领导者从跟随者请求的偏移量可以知道消息复制的进度。

3、解决问题

  • 领导者与跟随者之间的消息复制什么时候才认为是成功的呢? 是同步复制还是异步复制?如果个别跟随者由于网络问题导致消息没有复制完成,是否允许消费者对消息进行读取?
  • Kafka的消息复制是以分区为单位的,既不是完全的同步复制,又不是完全的异步复制,而是基于ISR(In-Sync Replica) 的动态复制方案。
  • 领导者会维护一个需要与其保持同步的副本列表(包括领导者自己),该列表称为ISR,且每个分区都会有一个ISR。如果在一定时间内(可以通过参数replica.lag.time.max.ms进行配置),跟随者没有向领导者请求新的消息(可能由于网络问题),该跟随者将被认为是不同步的,领导者会从ISR中将其移除,从而避免因跟随者的请求速度过慢而拖慢整体速度。而当跟随者重新与领导者保持同步,领导者会将其再次加入ISR中。当领导者失效时,也不会选择ISR中不存在的跟随者作为新的领导者。
  • ISR的列表数据保存在ZooKeeper中,每次ISR改变后,领导者都会将最新的ISR同步到ZooKeeper中。
  • 每次消息写入时,只有ISR中的所有跟随者都复制完毕,领导者才会将消息写入状态置为Commit(写入成功),而只有状态置为Commit的消息才能被消费者读取。从消费者的角度来看,要想成功读取消息,ISR中的所有副本必须处于同步状态,从而提高数据的一致性。
  • 可以通过设置min.insync.replicas参数指定ISR的最小数量,默认为1,即ISR中的所有跟随者都可以被移除,只剩下领导者。但是在这种情况下,如果领导者失效,由于ISR中没有跟随者,因此该分区将不可用。适当增加参数min.insync.replicas的值将提高系统的可用性
  • 以上是站在消费者的角度来看ISR,即ISR中所有的副本都成功写入消息后,才允许消费者读取。那么对于生产者来说,在消息发送完毕后,是否需要等待ISR中所有副本成功写入才认为消息发送成功?
  • 生产者通过ZooKeeper向领导者副本所在的Broker发送消息,领导者收到消息后需要向生产者返回消息确认收到的通知。而实际上,领导者并不需要等待ISR中的所有副本都写入成功才向生产者进行确认。
  • 在生产者发送消息时,可以对acks参数进行配置,该参数指定了对消息写入成功的界定。生产者可以通过acks参数指定需要多少个副本写入成功才视为该消息发送成功。acks参数有3个值,分别为1all0。若acks=1(默认值),只要领导者副本写入成功,生产者就认为写入成功;acks=all,则需要ISR中的所有副本都写入成功,生产者才能认为写入成功;若acks=0,则生产者将消息发送出去后,立即认为该消息发送成功,不需要等待Broker的响应(而实际上该消息可能发送失败)。因此,需要根据实际业务需求来设置acks的值。
  • 对于Broker来说,Broker会通过acks的值来判断何时向生产者返回响应。在消息被写入分区的领导者副本后,Broker开始检查acks参数值,若acks的值为10,则Broker立即返回响应给生产者;若acks的值为all,则请求会被保存在缓冲区中,直到领导者检测到所有跟随者副本都已成功复制了消息,才会向生产者返回响应。

五、消费者组

(一)消费者组概念

  • 消费者组(Consumer Group)实际上就是一组消费者的集合。每个消费者属于一个特定的消费者组(可为每个消费者指定组名称,消费者通过组名称对自己进行标识,若不指定组名称,则属于默认的组)。

(二)消费者组模式的特点

  • 传统消息处理有两种模式:队列模式发布订阅模式。队列模式是指消费者可以从一台服务器读取消息,并且每个消息只被其中一个消费者消费;发布订阅模式是指消息通过广播方式发送给所有消费者。而Kafka提供了消费者组模式,同时具备这两种(队列和发布订阅)模式的特点。

(三)消费者组与分区的关系

  • Kafka规定,同一消费者组内不允许多个消费者消费同一分区的消息,而不同的消费者组可以同时消费同一分区的消息。也就是说,分区与同一个消费者组中的消费者的对应关系是多对一,而不允许一对多。举个例子,如果同一个应用有100台机器,这100台机器属于同一个消费者组,同一条消息在100台机器中就只有一台能得到。如果另一个应用也需要同时消费同一个主题的消息,就需要新建一个消费者组并消费同一个主题的消息。
  • 避免组内消费者竞争同一个分区的消息,但是不同组的消费者就可以竞争同一个分区的消息,比如分区0的消息被消费组A的消费者1与消费组B的消费者3共同消费,但是分区0的消息不会被消费组A里的消费者1与消费者2同时消费,也绝对不会被消费组B的消费者3、消费者4、消费者5和消费者6共同消费。

(四)消费者组的两种特殊情况

  • 每条消息发送到主题后,只能发送给某个消费者组中的唯一一个消费者实例(可以是同一台服务器上的不同进程,也可以是不同服务器上的进程)。

1、所有消费者实例属于同一分组

  • 如果所有消费者实例属于同一分组(有相同的分组名),该过程就是传统的队列模式,即同一消息只有一个消费者能得到

2、所有消费者都不属于同一分组

  • 如果所有消费者都不属于同一分组,该过程就是发布订阅模式,即同一消息每个消费者都能得到。

(四)同一个分区只能被组中一个消费者消费

  • 从Kafka架构图
  • 属于同一个消费者组的3个消费者共同读取一个主题,其中的两个消费者各自读取一个分区,而另一个消费者同时读取了两个分区。消费者组保证了同一个分区只能被组中的一个消费者进行消费。

六、数据存储机制

(一)分区文件夹

  • Kafka中的消息由主题进行分类,而主题在物理上又分为多个分区。那么分区是怎么存储数据的呢?
  • 假设在Broker中有一个名为sampletopic的主题,该主题被分为4个分区,则在Kafka消息存储目录(配置文件server.properties中属性log.dirs指定的目录)中会生成以下4个文件夹,且这4个文件夹可能分布于不同的Broker中。在Kafka数据存储中,每个分区的消息数据存储于一个单独的文件夹中,分区文件夹的命名规则为“主题名-分区编号”,分区编号从0开始,依次递增。
sampletopic-0
sampletopic-1
sampletopic-2
sampletopic-3

(二)segment(段)

1、segement概念

  • 一个分区在物理上由多个segment(段)组成。segment是Kafka数据存储的最小单位。每个分区的消息数据会被分配到多个segment文件中,这种将分区细分为segment的方式,方便了旧消息(旧segment)的删除和清理,达到及时释放磁盘空间的效果。

2、segment文件的构成

  • segment文件由两部分组成:索引文件(后缀为.index)和数据文件(后缀为.log),这两个文件一一相应,且成对出现。索引文件存储元数据,数据文件存储实际消息,索引文件中的元数据指向对应数据文件中消息的物理偏移地址。

3、segment文件的命名

  • segment文件的命名由20位数字组成,同一分区中的第一个segment文件的命名编号从0开始,下一个segment文件的命名编号为上一个segment文件的最后一条消息的offset值。编号长度不够以0补充。
00000000000000000000.index
00000000000000000000.log
00000000000000170410.index
00000000000000170410.log
00000000000000258330.index
00000000000000258330.log

4、segement索引文件与数据文件的对应关系

  • 索引文件的左侧为消息在该文件中的顺序(第几条消息),右侧为消息在数据文件中对应的偏移地址(实际物理存储位置)。可以看到,并不是所有的消息都会在索引文件中建立索引,而是采用每隔一定字节的数据建立一条索引,这种索引方式被称为稀疏索引。采用稀疏索引避免了索引文件占用过多的空间,从而提高了索引的查找速度。但缺点是,没有建立索引的消息不能一次定位到其在数据文件的物理位置,而是通过“二分法”定位到与其最近的消息的位置(小于等于需要查找的消息位置中的最大值),然后顺序进行扫描(此时顺序扫描的范围已经被缩小了),直到找到需要查找的消息。

5、消费者通过offset值查找消息

  • 查找offset=170413的消息,Kafka的查找步骤

(1)通过offset值定位到索引文件

  • 索引文件00000000000000170410.index的起始offset值为170410+1=170411,索引文件00000000000000258330.index的起始offset值为258330+1=258331。根据二分查找法,可以快速定位到offset值为170413的消息的索引文件为00000000000000170410.index

(2)通过索引文件查询消息物理偏移地址

  • 首先根据offset值查找到消息顺序,offset值为170413的消息在索引文件00000000000000170410.index中的消息顺序为170413-170410=3。然后根据消息顺序查找(二分法)到消息在数据文件的物理偏移地址,消息顺序为3的消息对应的物理偏移地址为256

(3)通过物理偏移地址定位到消息内容

  • 根据查找到的物理偏移地址,到数据文件00000000000000170410.log中查找对应的消息内容(消息内容3)。

七、搭建Kafka集群

(一)从官网下载Kafka

(二)上传到master虚拟机

  • 将kafka安装包上传到master虚拟机/opt目录

(三)将Kafka安装包解压到指定目录

  • 执行命令:tar -zxvf kafka_2.11-2.0.0.tgz -C /usr/local

(四)配置Kafka环境变量

  • 执行命令:vim /etc/profile
export KAFKA_HOME=/usr/local/kafka_2.11-2.0.0
export PATH=$KAFKA_HOME/bin:$PATH
  • 存盘退出,执行命令source /etc/profile,让环境变量生效

(五)配置Kafka服务器属性文件

  • 进入Kafka配置目录,执行命令:vim server.properties
  • 在分布式环境中,建议至少修改以下配置项(若文件中无此配置项,则需要新增),其他配置项可以根据具体项目环境进行调优。
broker.id=1 # 当前server编号
num.partitions=3 # 配置分区数
default.replication.factor=2 # 消息备份副本数
listeners=PLAINTEXT://master:9092 #Socket监听的地址
log.dirs=/usr/local/kafka_2.11-2.0.0/kafka-logs # 日志存储目录
zookeeper.connect=master:2181 # 连接zookeeper
group.id=hw-kafka # 配置消费组id
  • broker.id:每一个Broker都需要有一个标识符,使用broker.id表示,类似于ZooKeeper的myid。broker.id必须是一个全局(集群范围)唯一的整数值,即集群中每个Kafka服务器的broker.id的值不能相同。
  • num.partitions:每个主题的分区数量,默认是1。需要注意的是,可以增加分区的数量,但是不能减少分区的数量。
  • default.replication.factor:消息备份副本数,默认为1,即不进行备份。
    listeners:Socket监听的地址,用于Broker监听生产者和消费者请求,格式为listeners= security_protocol://host_name:port。如果没有配置该参数,就默认通过Java的API(java.net.InetAddress.getCanonicalHostName())来获取主机名,端口默认为9092,建议进行显式配置,避免多网卡时解析有误。
  • log.dirs:Kafka消息数据的存储位置,可以指定多个目录,以逗号分隔。
  • zookeeper.connect:ZooKeeper的连接地址。该参数是用逗号分隔的一组格式为hostname:port/path的列表,其中hostname为ZooKeeper服务器的主机名或IP地址;port是ZooKeeper客户端连接端口;/path是可选的ZooKeeper路径,如果不指定,就默认使用ZooKeeper根路径。

(六)将master虚拟机上的Kafka分发到slave1和slave2虚拟机

  • 执行命令:scp -r $KAFKA_HOME root@slave1:$KAFKA_HOME
  • 执行命令:scp -r $KAFKA_HOME root@slave2:$KAFKA_HOME

(七)将master虚拟机上的环境配置文件分发到slave1和slave2虚拟机

  • 执行命令:scp /etc/profile root@slave1:/etc/profile
  • 执行命令:scp /etc/profile root@slave2:/etc/profile
  • 切换到slave1虚拟机,让环境配置生效
  • 切换到slave2虚拟机,让环境配置生效

(八)修改Kafka服务器配置文件

1、slave1虚拟机

  • 进入Kafka配置目录,修改服务器配置文件

2、slave2虚拟机

  • 进入Kafka配置目录,修改服务器配置文件

  • 说明:group.id属性暂时不设置也是可以的

八、命令行操作Kafka

(一)启动ZooKeeper集群

  • 依次在三个节点启动zk服务


(二)启动Kafka服务

1、master节点

  • 进入Kafka安装目录,执行命令:kafka-server-start.sh -daemon config/server.properties
  • 查看启动的Kafka进程

2、slave1节点

  • 进入Kafka安装目录,执行命令:kafka-server-start.sh -daemon config/server.properties
  • 查看启动的Kafka进程

3、slave2节点

  • 进入Kafka安装目录,执行命令:kafka-server-start.sh -daemon config/server.properties
  • 查看启动的Kafka进程

(三)使用Kafka

1、创建主题

  • 在master节点上创建一个拥有3个副本的主题
  • 执行命令:kafka-topics.sh --create --zookeeper master:2181 --replication-factor 3 --partitions 3 --topic hello-kafka
  • 创建了名为hello-kafka的主题

2、查看主题

  • 在master节点上查看主题 - hello-kafka
  • 执行命令:kafka-topics.sh --describe --zookeeper master:2181 --topic hello-kafka
  • 在slave1节点上查看主题 - hello-kafka
  • 执行命令:kafka-topics.sh --describe --zookeeper master:2181 --topic hello-kafka
  • 在slave2节点上查看主题 - hello-kafka
  • 执行命令:kafka-topics.sh --describe --zookeeper master:2181 --topic hello-kafka
  • AR: Assigned Replicas的缩写,是每个partition下所有副本(replicas)的统称
  • ISR: In-Sync Replicas的缩写,是指副本同步队列,ISR是AR中的一个子集

3、生产消息

  • 在slave1节点上启动生产者,基于主题hello-kafka生产消息
  • 执行命令:kafka-console-producer.sh --broker-list master:9092 --topic hello-kafka,生产了3条消息

4、消费消息

  • 在slave2节点上启动消费者,从头开始消费主题hello-kafka的消息
  • 执行命令:kafka-console-consumer.sh --bootstrap-server master:9092 --from-beginning --topic hello-kafka
  • 注意:消费的顺序往往跟生产的顺序不一致

5、删除主题

  • 在slave1节点上删除主题hello-kafka
  • 执行命令:kafka-topics.sh --delete --zookeeper master:2181 --topic hello-kafka
  • 主题hello-kafka只是打了删除标记,如果delete.topic.enable没有设置为true,那么这个删除操作不会有什么影响,主题依然存在。
  • 修改三个节点的server.properties文件,添加一句delete.topic.enable=true


  • 重启三个节点的Kafka,再在slave1节点上执行删除主题的操作
  • 执行命令:kafka-topics.sh --delete --zookeeper master:2181 --topic hello-kafka
  • 查看主题hello-kafka是否已被删除
  • 可以看到,主题hello-kafka已被删除掉。

6、检测容错性

  • 在master节点上创建一个拥有1一个分区和3个副本的主题
  • 执行命令:kafka-topics.sh --create --zookeeper master:2181 --replication-factor 3 --partitions 1 --topic test
  • 查看主题信息:执行命令kafka-topics.sh --describe --zookeeper master:2181 --topic test
  • 目前slave1(brokder.id=2)是Leader,副本存在三个服务器上,都是存活的
  • 启动生产者,生产消息,执行命令:kafka-console-producer.sh --broker-list master:9092 --topic test
  • 启动消费者,消费消息,执行命令:kafka-console-consumer.sh --bootstrap-server master:9092 --from-beginning --topic test
  • 目前,Kafka Leader在slave1虚拟机,将其kafka进程杀死。
  • 此时,查看主题test信息,执行命令:kafka-topics.sh --describe --zookeeper master:2181 --topic test
  • 现在slave2节点成为Kafka的Leader,副本存放在三个服务器上,只有master和slave2节点还存活着
  • 此时,重启slave1 (broker.id=2)的kafka服务,它能成为Leader吗?执行命令:kafka-server-start.sh config/server.properties &
  • 再次,查看主题test信息,执行命令:kafka-topics.sh --describe --zookeeper master:2181 --topic test
  • Leader依然是slave2(broker.id=3),而不是刚刚启动了Kafka的slave1。

九、Java程序操作Kafka

(一)准备工作

1、在三个节点进入Kafka安装目录

  • 执行命令:cd $KAFKA_HOME


2、在三个节点启动ZooKeeper服务

  • 执行命令:zkServer.sh start


3、在三个节点启动Kafka服务

  • 执行命令:kafka-server-start.sh config/server.properties &


4、在master节点上创建主题

  • 执行命令:kafka-topics.sh --create --zookeeper master:2181 --replication-factor 3 --partitions 3 --topic foo
  • 查看主题foo,执行命令: kafka-topics.sh --describe --zookeeper master:2181 --topic foo

(二)Java程序操作Kafka

1、创建Maven项目

  • 创建Maven项目 - KafkaDemo

2、添加Kafka依赖

  • 在pom.xml文件里添加Kafka依赖

3、创建日志属性文件

  • 在resources目录下创建log4j.properties
log4j.rootLogger=error, stdout, logfile
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/kafka.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n

4、创建包

  • 创建net.huawei.kafka

5、创建类

  • net.huawei.kafka包里创建TestKafka
package net.huawei.kafka;

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.*;
import org.junit.Test;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

/**
 * 功能:操作Kafka
 * 作者:华卫
 * 日期:2022年04月20日
 */
public class TestKafka 

    private static String topic = "foo";//定义主题

    @Test
    public void sendMessage() 
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "master:9092,slave1:9092,slave2:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        Producer<String, String> producer = new KafkaProducer<>(props);
        for (int i = 1; i <= 10; i++) 
            ProducerRecord<String, String> message = new ProducerRecord<>(topic, "Kafka, I love you " + i + " times.");
            producer.send(message);
            System.out.println("消息发送成功~" + message.value());
        
        producer.close以上是关于Spark基础学习笔记31:Kafka分布式消息系统的主要内容,如果未能解决你的问题,请参考以下文章

Kafka学习之路

Kafka学习笔记

Kafka学习笔记

学习笔记Kafka—— Kafka简介

学习笔记Kafka—— Kafka简介

Kafka基础整理