分布式实时消息队列Kafka

Posted 大数据Manor

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了分布式实时消息队列Kafka相关的知识,希望对你有一定的参考价值。

分布式实时消息队列Kafka(四)

知识点01:课程回顾

  1. Kafka中生产者的数据分区规则是什么?
    • 先判断是否指定了分区
      • 指定分区:写入对应分区
      • 没有指定:判断是否指定了Key
        • 指定了Key:按照Key的Hash分区
        • 没有指定Key:按照黏性分区
          • 特点:优先将所有数据构建一个Batch,提交到一个分区中,尽量保证数据分配均衡
    • 自定义分区规则
      • step1:开发一个类实现Partitioner
      • step2:实现一个partition方法
      • step3:生产者指定分区器
  2. Kafka中消费者消费数据的规则是什么?
    • 基础规则:每个消费者消费Topic分区的数据按照Offset进行消费
    • 第一次消费:根据属性决定:latest【最新位置】,earliest【最早位置:-1,请求 -1 + 1】
    • 从第二次开始:自动将上一次的位置 + 1向Kafka进行请求
    • 问题1:如果消费者故障了,重启消费者,如何能知道上一次消费的位置?
      • 原因:offset保存内存中,重启以后就没有了
  3. 如果消费者遇到故障,Kafka怎么保证不重复不丢失?
    • Kafka将每个消费者消费的offset存储在一个独立的Topic中:__consumer_offsets
    • 如果消费者故障,重启,从这个Topic查询上一次的offset + 1
    • 问题2:这个Topic中记录的offset是怎么来的?
      • 默认机制:根据时间周期由消费者自动提交
      • 导致问题:数据重复或者数据丢失问题
      • 解决问题:根据处理的结果来实现基于每个分区的手动提交
        • 消费一个分区、处理一个分区、处理成功,提交这个分区的offset
    • 工作中:不需要管使用手动提交还是自动提交?
      • 因为工作中不利用Kafka的存储来实现offset的恢复
      • 一般我们自己存储offset:将每个分区的offset存储在mysql、Zookeeper、Redis
      • 如果程序故障,从MySQL、Zookeeper、Redis中读取上一次的Offset
  4. Kafka中__consumer_offsets的作用是什么?
    • 用于Kafka自己实现保证消费者消费数据不丢失不重复的问题:记录所有消费者Offset

知识点02:课程目标

  1. 消费者组中多个消费者如何分配分区消费的问题?
    • 分配的规则是什么?
    • 范围分配:默认的分配规则
    • 轮询分配
    • 黏性分配:建议使用的分配规则
  2. Kafka中数据读写的流程
    • 分布式存储工具
      • Zookeeper:分布式协调服务工具
      • HDFS:分布式文件系统
      • Hbase:分布式NoSQL数据库
      • Kafka:分布式消息队列
    • 写的流程是什么?为什么写的很快?
    • 读的流程是什么?为什么读的很快?
    • Kafka如何自动清理过期的数据

知识点03:消费分配策略:基本规则及分配策略

  • 目标掌握Kafka消费者组中多个消费者的分配规则及问题

  • 路径

    • step1:消费者组中消费者分配消费分区的基本规则是什么?
    • step2:如果一个消费者组中有多个消费者,消费者组消费多个Topic,每个Topic有多个分区,如何分配?
  • 实施

    • 基本规则

      • 一个分区的数据只能有一个消费者消费

        • 不能实现多个消费者消费一个分区的数据

        • 原因:每个消费者在逻辑上属于同一个消费者组,但是物理上独立的消费者,无法获取彼此的offset的,不能共同协作的

        • 问题:怎么实现一个消费者组中有多个消费者?

          • 第一个消费者代码

            prop.set("group.id","test01")
            consumer01 = new KafkaConsumer(prop)
            
          • 第二个消费者代码

            prop.set("group.id","test01")
            consumer02 = new KafkaConsumer(prop)
            
          • ……

      • 一个消费者可以消费多个分区的数据

        • 消费者会挨个分区进行消费
    • 分配策略:决定了多个分区如何分配给多个消费者

      • 属性

        partition.assignment.strategy = org.apache.kafka.clients.consumer.RangeAssignor
        
      • 策略

        • RangeAssignor:范围分配,默认的分配策略

        • RoundRobinAssignor:轮询分配,常见于Kafka2.0之前的版本

          org.apache.kafka.clients.consumer.RoundRobinAssignor
          
        • StickyAssignor:黏性分配,2.0之后建议使用

          org.apache.kafka.clients.consumer.StickyAssignor
          
  • 小结

    • 基本规则
      • 一个分区只能由一个消费者来消费
      • 一个消费者可以消费多个分区
    • 分配策略
      • 范围分配
      • 轮询分配
      • 黏性分配

知识点04:消费分配策略:RangeAssignor

  • 目标掌握范围分配策略的规则及应用场景

  • 路径

    • step1:范围分配策略的规则是什么?
    • step2:范围分配的优缺点是什么?
  • 实施

    • 范围分配规则

      • Kafka中默认的分配规则
      • 每个消费者消费一定范围的分区,尽量的实现将分区均分给不同的消费者,如果不能均分,优先将分区分配给编号小的消费者
      • 6个分区:part0 ~ part5
        • 2个消费者
          • C1:part0 ~ part2
          • C2:part3 ~ part5
        • 4个消费者
          • C1:part0 part1
          • C2:part2 part3
          • C3:part4
          • C4:part5
    • 举例

      • 假设一:三个消费者,消费1个Topic,Topic1有6个分区

        消费者分区
        C1T1【0,1】
        C2T1【2,3】
        C3T1【4,5】

        image-20210330151724314

      • 假设二:三个消费者,消费1个Topic,Topic1有7个分区

        消费者分区
        C1T1【0,1,2】
        C2T1【3,4】
        C3T1【5,6】

        image-20210330151902302

      • 假设三:三个消费者,消费3个Topic,Topic1、Topic2、Topic3各有7个分区

        消费者分区
        C1T1【0,1,2】 T2【0,1,2】 T3【0,1,2】
        C2T1【3,4】 T2【3,4】 T3【3,4】
        C3T1【5,6】 T2【5,6】 T3【5,6】
        • 问题:负载不均衡
    • 范围分配优点

      • 如果Topic的个数比较少,分配会相对比较均衡
    • 范围分配缺点

      • 如果Topic的个数比较多,而且不能均分,导致负载不均衡问题
    • 应用:Topic个数少或者每个Topic都均衡的场景

  • 小结

    • 规则:每个消费者消费一定范围的分区,尽量做均分,如果不能均分,优先将分区分配给编号小的消费者
    • 应用:适合于Topic个数少或者每个Topic都能均分场景

知识点05:消费分配策略:RoundRobinAssignor

  • 目标掌握轮询分配策略的规则及应用场景

  • 路径

    • step1:轮询分配的规则是什么?
    • step2:轮询规则有什么优缺点?
  • 实施

    • 轮询分配的规则

      • 按照Topic的分区编号,轮询分配给每个消费者

      • 如果遇到范围分区的场景,能否均衡

        • 三个消费者,消费3个Topic,每个有7个分区

          消费者分区
          C1T1【0,3,6】 T2【2,5】 T3【1,4】
          C2T1【1,4】 T2【0,3,6】 T3【2,5】
          C3T1【2,5】 T2【1,4】 T3【0,3,6】
    • 举例

      • 假设一:三个消费者,消费2个Topic,每个Topic3个分区

        消费者分区
        C1T1【0】 T2【0】
        C2T1【1】 T2【1】
        C3T1【2】 T2【2】

        image-20210330152259434

      • 假设二:三个消费者,消费3个Topic,第一个Topic1个分区,第二个Topic2个分区,第三个Topic三个分区,消费者1消费Topic1,消费者2消费Topic1,Topic2,消费者3消费Topic1,Topic2,Topic3

        T1[0]
        T2[0,1]
        T3[0,1,2]
        
        消费者分区
        C1T1【0】
        C2T2【0】
        C3T2【1】 T3【0】 T3【1】 T3【2】

        image-20210330152315127

        • 问题:负载不均衡
    • 轮询分配的优点

      • 如果有多个消费者,消费的Topic都是一样的,实现将所有Topic的所有分区轮询分配给所有消费者,尽量的实现负载的均衡
      • 大多数的场景都是这种场景
    • 轮询分配的缺点

      • 遇到消费者订阅的Topic是不一致的,不同的消费者订阅了不同Topic,只能基于订阅的消费者进行轮询分配,导致整体消费者负载不均衡的
    • 应用场景:所有消费者都订阅共同的Topic,能实现让所有Topic的分区轮询分配所有的消费者

  • 小结

    • 规则:根据订阅关系,将订阅的Topic的分区排序轮询分配给订阅的消费者
    • 应用:订阅关系都是一致的

知识点06:消费分配策略:StickyAssignor

  • 目标掌握黏性分配策略的规则及应用场景

  • 路径

    • step1:黏性分配的规则是什么?
    • step2:黏性分配有什么特点?
  • 实施

    • 轮询分配的规则

      • 类似于轮询分配,尽量的将分区均衡的分配给消费者
    • 黏性分配的特点

      • 相对的保证的分配的均衡
    • 如果某个消费者故障,尽量的避免网络传输

    • 尽量保证原来的消费的分区不变,将多出来分区均衡给剩余的消费者

    • 举例

    • 假设一:三个消费者,消费2个Topic,每个Topic3个分区

      消费者分区
      C1T1【0】 T2【0】
      C2T1【1】 T2【1】
      C3T1【2】 T2【2】
      - 效果类似于轮询,比较均衡的分配,但底层实现原理有些不一样
      
    • 假设二:三个消费者,消费3个Topic,第一个Topic1个分区,第二个Topic2个分区,第三个Topic三个分区,消费者1消费Topic1,消费者2消费Topic1,Topic2,消费者3消费Topic1,Topic2,Topic3

      消费者分区
      C1T1【0】
      C2T2【0】 T2【1】
      C3T3【0】 T3【1】 T3【2】

      image-20210130114334365

  • 负载均衡的场景

    • 假设三:如果假设一中的C3出现故障

      • 假设一

        消费者分区
        C1T1【0】 T2【0】
        C2T1【1】 T2【1】
        C3T1【2】 T2【2】
      • 轮询:将所有分区重新分配

        消费者分区
        C1T1【0】 T1【2】 T2【1】
        C2T1【1】 T2【0】 T2【2】
      • 黏性:直接故障的分区均分给其他的消费者,其他消费者不用改变原来的分区,降低网络IO消耗

        消费者分区
        C1T1【0】 T2【0】 T1【2】
        C2T1【1】 T2【1】 T2【2】
    • 假设四:如果假设二中的C1出现故障

      • 假设二:轮询

        消费者分区
        C1T1【0】
        C2T2【0】
        C3T2【1】 T3【0】 T3【1】 T3【2】
      • 轮询负载均衡

        消费者分区
        C2T1【0】 T2【1】
        C3T2【0】T3【0】 T3【1】 T3【2】
      • 假设二:黏性

        消费者分区
        C1T1【0】
        C2T2【0】 T2【1】
        C3T3【0】 T3【1】 T3【2】
      • 黏性负载均衡

        消费者分区
        C2T2【0】 T2【1】 T1【0】
        C3T3【0】 T3【1】 T3【2】
  • 小结

    • 规则:尽量保证所有分配均衡,尽量保证每个消费者如果出现故障,剩余消费者依旧保留自己原来消费的分区
    • 特点
      • 分配更加均衡
      • 如果消费者出现故障,提高性能,避免重新分配,将多余的分区均衡的分配给剩余的消费者

知识点07:Kafka存储机制:存储结构

  • 目标掌握Kafka的存储结构设计及概念

  • 路径

    image-20210330213947441

  • 实施

    • Broker:物理存储节点,用于存储Kafka中每个分区的数据
    • Producer:生产者生产数据
    • Topic:逻辑存储对象,用于区分不同数据的的存储
    • Partition:分布式存储单元,一个Topic可以划分多个分区,每个分区可以分布式存储在不同的Broker节点上
    • Segment:分区段,每个分区的数据存储在1个或者多个Segment中,每个Segment由一对文件组成
      • Segment命名规则:最小Offset
  • 小结

    • Broker【物理进程节点】 | Topic【逻辑对象概念】

      • Partition:逻辑上分布式划分的概念、物理上存储数据的单元

        • 分区名称 = Topic名称 + 分区编号
        [root@node1 ~]# ll /export/server/kafka_2.12-2.4.1/logs/
        总用量 1212
        drwxr-xr-x 2 root root   4096 3月  31 08:59 bigdata-0
        drwxr-xr-x 2 root root    215 3月  31 11:23 bigdata01-1
        drwxr-xr-x 2 root root    215 3月  31 11:23 bigdata01-2
        
        • Segment

          -rw-r--r-- 1 root root     530080 3月  30 10:48 00000000000000000000.index
          -rw-r--r-- 1 root root 1073733423 3月  30 10:48 00000000000000000000.log
          
          -rw-r--r-- 1 root root     530072 3月  30 10:49 00000000000001060150.index
          -rw-r--r-- 1 root root 1073734280 3月  30 10:49 00000000000001060150.log
          
          -rw-r--r-- 1 root root     189832 3月  31 11:23 00000000000002120301.index
          -rw-r--r-- 1 root root  384531548 3月  30 10:49 00000000000002120301.log
          

知识点08:Kafka存储机制:写入过程

  • 目标掌握Kafka数据的写入过程

  • 路径

    • Kafka的数据是如何写入的?
    • 为什么Kafka写入速度很快?
  • 实施

    • step1:生产者生产每一条数据,将数据放入一个batch批次中,如果batch满了或者达到一定的时间,提交写入请求
    • step2:Kafka根据分区规则将数据写入分区,获取对应的元数据,将请求提交给leader副本所在的Broker
      • 元数据存储:Zookeeper中
    • step3:先写入这台Broker的PageCache中
      • Kafka也用了内存机制来实现数据的快速的读写:不同于Hbase的内存设计
        • Hbase:JVM堆内存
          • 所有Java程序都是使用JVM堆内存来实现数据管理
          • 缺点
            • GC:从内存中清理掉不再需要的数据,导致GC停顿,影响性能
            • 如果HRegionServer故障,JVM堆内存中的数据就丢失了,只能通过HLog恢复,性能比较差
        • Kafka:操作系统Page Cache
          • 选用了操作系统自带的缓存区域:PageCache
          • 由操作系统来管理所有内存,即使Kafka Broker故障,数据依旧存在PageCache中
    • step4:操作系统的后台的自动将页缓存中的数据SYNC同步到磁盘文件中:最新的Segment的.log中
      • 顺序写磁盘:不断将每一条数据追加到.log文件中
    • step5:其他的Follower到Leader中同步数据
  • 小结

    • Kafka的数据是如何写入的?
      • step1:生产者构建批次,提交给Kafka集群
    • step2:Kafka根据分区规则,检索元数据,将请求转发给Leader副本对应Broker
      • step3:先写Broker的PageCache
      • step4:后台实现将PageCache中顺序写同步到磁盘中:.log文件
      • step5:Follower同步Leader副本的数据
    • 为什么Kafka写入速度很快?
      • 应用了PageCache的页缓存机制
      • 顺序写磁盘的机制

知识点09:Kafka存储机制:Segment

  • 目标掌握Segment的设计及命名规则

  • 路径

    • 为什么要设计Segment?
    • Segment是如何实现的?
  • 实施

    • 设计思想

      • 加快查询效率
        • 通过将分区的数据根据Offset划分多个比较小的Segment文件
        • 在检索数据时,可以根据Offset快速定位数据所在的Segment
        • 加载单个Segment文件查询数据,可以提高查询效率
      • 减少删除数据IO
        • 删除数据时,Kafka以Segment为单位删除某个Segment的数据
        • 避免一条一条删除,增加IO负载,性能较差
    • Segment的基本实现

      • .log:存储真正的数据
      • .index:存储对应的.log文件的索引
    • Segment的划分规则:满足任何一个条件都会划分segment

      • 按照时间周期生成

        #如果达到7天,重新生成一个新的Segment
        log.roll.hours = 168
        
      • 按照文件大小生成

        #如果一个Segment存储达到1G,就构建一个新的Segment
        log.segment.bytes = 1073741824  
        
    • Segment文件的命名规则

      • 以当前文件存储的最小offset来命名的
      00000000000000000000.log			offset : 0 ~ 2344
      00000000000000000000.index
      
      00000000000000002345.log			offset : 2345 ~ 6788
      00000000000000002345.index
      
      00000000000000006789.log			offset : 6789 ~
      00000000000000006789.index
      
  • 小结

    • 为什么要设计Segment?
      • 加快查询效率:将数据划分到多个小文件中,通过offset匹配可以定位某个文件,从小数据量中找到需要的数据
      • 提高删除性能:以Segment为单位进行删除,避免以每一条数据进行删除,影响性能
    • Segment是如何实现的?
      • 组合:一对文件组件
        • .log
        • .index
      • 划分
        • 时间:7天
        • 大小:1G
      • 命名:以这个segment中存储的最小offset来命名

知识点10:Kafka存储机制:读取过程

  • 目标掌握Kafka数据的读取过程
  • 路径
    • Kafka数据是如何被读取的?
    • 为什么Kafka读取数据也很快?
  • 实施
    • step1:消费者根据Topic、Partition、Offset提交给Kafka请求读取数据
    • step2:Kafka根据元数据信息,找到对应的这个分区对应的Leader副本
    • step3:请求Leader副本所在的Broker,先读PageCache,通过零拷贝机制【Zero Copy】读取PageCache
    • step4:如果PageCache中没有,读取Segment文件段,先根据offset找到要读取的那个Segment
    • step5:将.log文件对应的.index文件加载到内存中,根据.index中索引的信息找到Offset在.log文件中的最近位置
      • 最近位置:index中记录的稀疏索引【不是每一条数据都有索引】
    • step6:读取.log,根据索引读取对应Offset的数据
  • 小结
    • Kafka数据是如何被读取的?
      • step1:消费者请求读取数据:Topic+Partition+Offset
      • step2:Kafka根据元数据,找到对应分区的leader副本进行检索
      • step3:先检索PageCache
        • 如果有,就通过零拷贝机制从PageCache中读取数据
        • 如果没有,就读取Segment文件段
      • step4:先根据Offset找到对应的Segment的一对文件
      • step5:先读index,找到offset对应的数据在.log文件中的最近位置
      • step6:根据位置,读取.log文件
    • 为什么Kafka读取数据也很快?
      • 优先基于PageCache内存的读取,使用零拷贝机制
      • 按照Offset有序读取每一条
      • 构建Segment文件段
      • 构建index索引

知识点11:Kafka存储机制:index索引设计

  • 目标掌握Kafka的Segment中index的索引设计

  • 路径

    • .index文件中的索引的内容是什么?
    • 查询数据时如何根据索引找到对应offset的数据?
  • 实施

    • 索引类型

      • 全量索引:每一条数据,都对应一条索引

      • 稀疏索引:部分数据有索引,有一部分数据是没有索引的

        • 优点:减少了索引存储的数据量加快索引的索引的检索效率

        • 什么时候生成一条索引?

          #.log文件每增加4096字节,在.index中增加一条索引
          

        log.index.interval.bytes=4096

        
        - Kafka中选择使用了稀疏索引
        
        
    • 索引内容

      • 两列
        • 第一列:这条数据在这个文件中的位置
        • 第二列:这条数据在文件中的物理偏移量
      是这个文件中的第几条,数据在这个文件中的物理位置
      1,0				--表示这个文件中的第一条,在文件中的位置是第0个字节开始
      3,497			--表示这个文件中的第三条,在文件中的位置是第497个字节开始
      
      image-20210330220753258
      • 这个文件中的第1条数据是这个分区中的第368770条数据,offset = 368769
    • 检索数据流程

      • step1:先根据offset计算这条offset是这个文件中的第几条
      • step2:读取.index索引,根据二分检索,从索引中找到离这条数据最近偏小的位置
      • step3:读取.log文件从最近位置读取到要查找的数据
    • 举例

      • 需求:查找offset = 368772

      • step1:计算是文件中的第几条

        368772 - 368769 = 3 + 1 = 4,是这个文件中的第四条数据
        
      • step2:读取.index索引,找到最近位置

        3,497
        
      • step3:读取.log,从497位置向后读取一条数据,得到offset = 368772的数据

    • 问题:为什么不直接将offset作为索引的第一列?

      • 因为Offset越来越大,导致索引存储越来越大,空间占用越多,检索索引比较就越麻烦
  • 小结

    • .index文件中的索引的内容是什么?
      • 第一列:这条数据是这个文件第几条数据
      • 第二列:存储这条数据在文件中的物理偏移量
    • 查询数据时如何根据索引找到对应offset的数据?
      • step1:先根据给定的offset计算这个offset是这个文件的第几条
      • step2:读取index索引,找到最近物理位置
      • step3:读取.log文件,找到对应的数据

知识点12:Kafka数据清理规则

  • 目标了解Kafka中数据清理的规则

  • 路径

    • Kafka用于实现实时消息队列的数据缓存,不需要永久性的存储数据,如何将过期数据进行清理?
  • 实施

    • 属性配置

      #开启清理
      log.cleaner.enable = true
      #清理规则
      log.cleanup.policy = delete | compact
      
    • 清理规则:delete

      • 基于存活时间规则:最常用的方式

        log.retention.ms
        log.retention.minutes
        log.retention.hours=168/7天
        
      • 基于文件大小规则

        #删除文件阈值,如果整个数据文件大小,超过阈值的一个segment大小,将会删除最老的segment,-1表示不使用这种规则
        log.retention.bytes = -1
        
      • 基于offset消费规则

        • 功能:将明确已经消费的offset的数据删除

        • 如何判断已经消费到什么位置

          • step1:编写一个文件offset.json

            {
              "partitions":[
                 {"topic": "bigdata", "partition": 0,"offset": 2000},
                 {"topic": "bigdata", "partition": 1,"offset": 2000}
               ],
               "version":1
            }
            
          • step2:指定标记这个位置

            kafka-delete-records.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --offset-json-file offset.json 
            
    • 清理规则:compact

    • 功能:将重复的更新数据的老版本删除,保留新版本,要求每条数据必须要有Key,根据Key来判断是否重复

      image-20210330222244406
  • 小结

    • Kafka用于实现实时消息队列的数据缓存,不需要永久性的存储数据,如何将过期数据进行清理?

        {
          "partitions":[
             {"topic": "bigdata", "partition": 0,"offset": 2000},
             {"topic": "bigdata", "partition": 1,"offset": 2000}
           ],
           "version":1
        }
        ```
      
      - step2:指定标记这个位置
      
        ```shell
        kafka-delete-records.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --offset-json-file offset.json 
        ```
      
    • 清理规则:compact

    • 功能:将重复的更新数据的老版本删除,保留新版本,要求每条数据必须要有Key,根据Key来判断是否重复

      image-20210330222244406
  • 小结

    • Kafka用于实现实时消息队列的数据缓存,不需要永久性的存储数据,如何将过期数据进行清理?
    • delete方案:根据时间定期的清理过期的Segment文件

以上是关于分布式实时消息队列Kafka的主要内容,如果未能解决你的问题,请参考以下文章

分布式实时消息队列Kafka

分布式实时消息队列Kafka

分布式实时消息队列Kafka

Kafka 消息队列系列之分布式消息队列Kafka

分布式消息队列之kafka

消息队列和缓存的区别