分布式实时消息队列Kafka

Posted 大数据Manor

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了分布式实时消息队列Kafka相关的知识,希望对你有一定的参考价值。

分布式实时消息队列Kafka(三)

知识点01:课程回顾

  1. 请简述Kafka的集群架构及角色功能?

    • Kafka:分布式主从架构
      • 主: Controller:管理集群中的Topic、分区、副本选举
      • 从:Broker:对外接受读写请求,存储分区数据
    • Zookeeper
      • 辅助选举Active的主节点:Crontroller
      • 存储核心元数据
  2. 请简述Kafka中Topic管理的脚本及常用选项参数?

    • 使用命令行中的脚本命令实现管理
    • 脚本:kafka-topics.sh
    • 常用选项
      • –topic
      • –create
      • –list
      • –describe
      • –delete
      • –alter:调整Topic的配置
      • –bootstrap-server / --broker-list
  3. 请简述如何使用Kafka Simple Java API 实现数据生产?描述具体的类及方法

    • step1:构建生产者连接对象:KafkaProducer

      • 需要配置对象:管理配置,例如连接地址:Properties
    • step2:KafkaProducer:send:生产数据到Kafka中

      • 需要构建一个生产的数据对象:ProducerRecord

      • ProducerRecord(Topic,Value)

      • ProducerRecord(Topic,Key,Value)

      • ProducerRecord(Topic,Partition,Key,Value)

  4. 请简述如何使用Kafka Simple Java API 实现数据消费?描述具体的类及方法

    • step1:构建消费者连接对象:KafkaConsumer

      • 需要配置对象:管理配置,例如连接地址:Properties
    • step2:消费者需要订阅Topic

      • KafkaConsumer:subscribe(List)
    • step3:消费数据

      • KafkaConsumer:poll

      • ConsumerRecords:拉取到的所有数据

      • ConsumerRecord:消费到的每一条数据

        • topic

        • partition

        • offset

        • key

        • value

  5. 请简述Kafka生产数据时如何保证生产数据不丢失?

    • acks:返回的确认,当接收方收到数据以后,就会返回一个确认的消息
    • 生产者向Kafka生产数据,根据配置要求Kafka返回ACK
      • ack=0:生产者不管Kafka有没有收到,直接发送下一条
        • 优点:快
        • 缺点:容易导致数据丢失,概率比较高
      • ack=1:生产者将数据发送给Kafka,Kafka等待这个分区leader副本写入成功,返回ack确认,生产者发送下一条
        • 优点:性能和安全上做了平衡
        • 缺点:依旧存在数据丢失的概率,但是概率比较小
      • ack=all/-1:生产者将数据发送给Kafka,Kafka等待这个分区所有副本全部写入,返回ack确认,生产者发送下一条
        • 优点:数据安全
        • 缺点:慢
    • 如果Kafka没有返回ACK怎么办?
      • 生产者会等待Kafka返回ACK,有一个超时时间,如果Kafka在规定时间内没有返回ACK,说明数据丢失了
      • 生产者有重试机制,重新发送这条数据给Kafka
      • 问题:如果ack在中途丢失,Kafkahi导致数据重复问题,怎么解决?

知识点02:课程目标

  1. 生产数据的分区规则?【重点】

    • 常见的规则

      • MapReduce:Hash分区

        • 优点:相同的Key会进入同一个分区

          • reduce0:part-r-00000

            hadoop		6
            spark		7
            
          • reduce1:part-r-00001

            hbase		9
            hive		10
            
        • 缺点:数据倾斜的问题

          • 如果所有Key的Hash取余的结果一样,导致数据分配不均衡的问题
      • Hbase:范围分区

      • 轮询分区

        • 优点:数据分配更加均衡
        • 缺点:相同Key的数据进入不同的分区中
      • 随机分区

      • 槽位分区

    • 有几种规则?

    • 每种规则的优缺点是什么?

  2. Kafka中消费数据如何保证不丢失不重复【重点】

    • Kafka中的消费者是如何消费数据?
      • 消费者根据每个分区的Offset来进行消费
      • 问题:数据丢失和数据重复的问题
    • 如何解决数据安全的问题?
      • 如果保证每次消费的offset是正确的offset?

知识点03:生产分区规则

  • 目标掌握Kafka生产者生产数据的分区规则

  • 路径

    • 问题:为什么生产数据的方式不同,分区的规则就不一样?

      - ProducerRecord(Topic,Value)
      - ProducerRecord(Topic,Key,Value)
      - ProducerRecord(Topic,Partition,Key,Value)
      
    • step1:先判断是否指定了分区

      - ProducerRecord(Topic,Partition,Key,Value)
      
    • step2:再判断是否给定了Key

      • 指定了Key

        - ProducerRecord(Topic,Key,Value)
        
      • 没有指定Key

        - ProducerRecord(Topic,Value)
        
  • 实施

    • 如果指定了分区的规则:写入所指定的分区中

      image-20210331090404644

    • 如果没指定分区

      image-20210331090558530

      • 默认调用的是DefaultPartitioner分区器中partition这个方法

        image-20210331090803709

        • 如果指定了Key:按照Key的Hash取余分区的个数,来写入对应的分区

        • 如果没有指定Key

          • 2.x之前:轮询分区

            • 优点:数据分配相对均衡

              Topic		part		key		value
              topic		0			1		itcast1
              topic		1			2		itcast2
              topic		2			3		itcast3
              topic		0			4		itcast4
              topic		1			5		itcast5
              topic		2			6		itcast6
              topic		0			7		itcast7
              topic		1			8		itcast8
              topic		2			9		itcast9
              
            • 缺点:性能非常差

              • Kafka生产者写入数据:先将数据放入一个缓存中,与分区构建一个连接,发送一个批次的数据
              • 第一条数据:先构建0分区的连接,第二条不是0分区的,所以直接构建一个批次,发送第一条
              • 第二条数据:先构建1分区的连接,第三条不是1分区的,所以直接构建一个批次,发送第二条
              • ……
              • 每条数据需要构建一个批次,9条数据,9个批次,每个批次一条数据
              • 批次多,每个批次数据量少,性能比较差
              • 希望:批次少,每个批次数据量多,性能比较好
          • 2.x之后:黏性分区

            • 设计:让数据尽量的更加均衡,实现少批次多数据

            • 规则

              • 第一次:将所有数据随机选择一个分区,全部写入这个分区中,将这次的分区编号放入缓存中

                bigdata01	1	37	null	itcast0
                bigdata01	1	38	null	itcast1
                bigdata01	1	39	null	itcast2
                bigdata01	1	40	null	itcast3
                bigdata01	1	41	null	itcast4
                bigdata01	1	42	null	itcast5
                bigdata01	1	43	null	itcast6
                bigdata01	1	44	null	itcast7
                bigdata01	1	45	null	itcast8
                bigdata01	1	46	null	itcast9
                
              • 第二次开始根据缓存中是否有上一次的编号

                • 有:直接使用上一次的编号
                • 如果没有:重新随机选择一个
  • 小结

    • Kafka中生产数据的分区规则是什么?
    • 是否指定了分区
      • 就写入对应的分区
    • 如果没有执行分区
      • 是否指定了Key:Key的Hash取余分区
      • 没有指定Key:黏性分区
        • 尽量保证数据均衡前提下,实现少批次多数据

知识点04:自定义开发生产分区器

  • 目标掌握Kafka自定义开发生产分区器,以随机分区为例

  • 路径

    • step1:开发一个类实现Partitioner接口
    • step2:实现partition方法
    • step3:生产者加载分区器
  • 实施

    • 开发一个随机分区器

      package bigdata.itcast.cn.kafka.partition;
      
      import org.apache.kafka.clients.producer.Partitioner;
      import org.apache.kafka.common.Cluster;
      
      import java.util.Map;
      import java.util.Random;
      
      /**
       * @ClassName UserPartition
       * @Description TODO 自定义分区器,实现随机分区
       * @Date 2021/3/31 9:21
       * @Create By     Frank
       */
      public class UserPartition implements Partitioner {
      
          /**
           * 返回这条数据对应的分区编号
           * @param topic:Topic的名
           * @param key:key的值
           * @param keyBytes:key的字节
           * @param value:value的值
           * @param valueBytes:value的字节
           * @param cluster:集群对象
           * @return
           */
          @Override
          public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
              //获取Topic的分区数
              Integer count = cluster.partitionCountForTopic(topic);
              //构建一个随机对象
              Random random = new Random();
              //随机得到一个分区编号
              int part = random.nextInt(count);
              return part;
          }
      
          @Override
          public void close() {
              //释放资源
          }
      
          @Override
          public void configure(Map<String, ?> configs) {
              //获取配置
          }
      }
      
      
    • 加载分区器

      //指定分区器的类
      props.put("partitioner.class","bigdata.itcast.cn.kafka.partition.UserPartition");
      
  • 小结

    • 如何构建一个自定义分区器
    • step1:构建一个类实现Partitioner接口
    • step2:实现partition方法:定义分区逻辑
    • step3:加载指定分区器即可

知识点05:消费者消费过程及问题

  • 目标掌握Kafka消费者消费过程及消费问题

  • 路径

    • step1:消费者是如何消费Topic中的数据的?
    • step2:如果消费者故障重启,消费者怎么知道自己上次消费的位置的?
  • 实施

    • Kafka中消费者消费数据的规则

      • 消费者消费Kafka中的Topic根据Offset进行消费,每次从上一次的位置继续消费

      • 第一次消费规则:由属性决定

        auto.offset.reset=latest | earliest
        latest:默认的值,从Topic每个分区的最新的位置开始消费
        earliest:从最早的位置开始消费,每个分区的offset为0开始消费
        
      • 第二次消费开始:根据上一次消费的Offset位置+1继续进行消费

      • image-20210331094448841

      • 问题1:消费者如何知道上一次消费的位置是什么?

        • 每个消费者都将自己上一次消费的offset记录自己的内存中
      • 问题2:如果因为网络资源原因,消费者故障了,重启消费者,原来内存中offset就没有了,消费者怎么知道上一次消费的位置?

    • Kafka Offset偏移量管理

      • Kafka将每个消费者消费的位置主动记录在一个Topic中:__consumer_offsets
      • 如果下次消费者没有给定请求offset,kafka就根据自己记录的offset来提供消费的位置

      image-20210331095219512

    • 提交的规则:根据时间自动提交

      props.setProperty("enable.auto.commit", "true");//是否自动提交offset
      props.setProperty("auto.commit.interval.ms", "1000");//提交的间隔时间
      
  • 小结

    • 消费者如何消费kafka中的Topic的数据
      • 第一次消费:根据属性
        • auto.offset.reset:lastest/earliest
      • 第二次消费开始:消费者在内存中记录上一次消费的offset + 1 = 这一次要消费的位置
    • 问题:如果消费者故障,重启,不知道上一次的位置怎么办?
      • Kafka中记录了每个消费者这一次要消费的位置
      • 由消费者定期的自动提交

知识点06:自动提交问题

  • 目标了解Kafka自动提交Offset存在的问题

  • 路径

    • step1:自动提交是否会出现数据丢失问题
    • step2:自动提交是否会出现数据重复问题
  • 实施

    • 自动提交的规则

      • 根据时间周期来提交下一次要消费的offset

        props.setProperty("enable.auto.commit", "true");//是否自动提交offset
        props.setProperty("auto.commit.interval.ms", "1000");//提交的间隔时间
        
    • 数据丢失的情况

      • 如果刚消费,还没处理,就达到提交周期,记录了当前 的offset
      • 最后处理失败,需要重启,重新消费处理
      • Kafka中已经记录消费过了,从上次消费的后面进行消费
    • 数据重复的情况

      • 如果消费并处理成功,但是没有提交offset,程序故障
      • 重启以后,kafka中记录的还是之前的offset,重新又消费一遍
      • 数据重复问题
  • 小结

    • 消费是否成功,是根据处理的结果来决定的
    • 提交offset是根据时间来决定了
    • 需要:根据处理的结果来决定是否提交offset
      • 如果消费并处理成功:提交offset
      • 如果消费处理失败:不提交offset

知识点07:实现手动提交Topic的Offset

  • 目标了解Kafka如何实现手动提交Topic的Offset实现

  • 路径

    • step1:关闭自动提交
    • step2:消费完整后手动提交
  • 实施

    • 关闭自动提交

              props.setProperty("enable.auto.commit", "false");//是否自动提交offset
      //        props.setProperty("auto.commit.interval.ms", "1000");//提交的间隔时间
      
    • 手动提交Offset

      //处理完成以后,手动提交offset
                  consumer.commitSync();
      
  • 小结

    • 关闭自动提交
    • 根据处理的结果来实现手动提交
      • 如果成功以后,再提交

知识点08:手动提交Topic Offset的问题

  • 目标了解Kafka实现手动提交Topic的Offset的问题

  • 路径

    • step1:Offset的设计层次
      • Offset是什么级别的概念?
        • 分区级别的概念
    • step2:手动提交Topic Offset出现数据重复问题
    • step3:解决方案是什么?
  • 实施

    • Offset的设计

      • Offset是分区级别,每个分区单独管理一套offset
    • 手动提交Topic Offset的过程中会出现数据重复

      • 举个栗子

      • 一个消费者,消费一个Topic,Topic有三个分区

        • 第一次消费

          • part0

            0	hadoop
            1	hive
            
          • part1

            0	hive
            1	spark
            2	hue
            
          • part2

            0	spark
            1	hadoop
            
        • 问题:part0和part1都处理成功,当处理part2时候,程序故障,处理使用

          • offset有没有提交?没有提交
          • 重启消费者:Kafka中没有消费记录,但是消费者刚刚分区0和分区1已经消费成功了
          • 所有分区都重新消费
    • 原因

      • Offset是分区级别的
      • 提交offset是按照整个Topic级别来提交的
    • 解决

      • 提交offset的时候,按照分区来提交
      • 消费成功一个分区,就提交一个分区的offset
  • 小结

    • 导致问题:数据重复
    • 导致原因:offset是分区级别,提交时topic级别,只要有一个分区失败,整个提交失败,实际上部分分区已经处理成功了

知识点09:手动提交分区Offset的实现

  • 目标掌握Kafka实现手动提交Partition的Offset

  • 路径

    • step1:消费每个分区的数据
    • step2:处理输出每个分区的数据
    • step3:手动提交每个分区的Offset
  • 实施

    • 获取所有数据中的分区

      Set<TopicPartition> partitions = records.partitions();
      
    • 从所有数据中取出每个分区的数据,输出,手动提交分区的Offset

      //取出每个Partition的数据
                  for (TopicPartition partition : partitions) {
                      //将这个partition的数据从records中取出
                      List<ConsumerRecord<String, String>> partRecords = records.records(partition);
                      //遍历这个分区的每一条数据
                      //取出每一条数据
                      long offset = 0;
                      for (ConsumerRecord<String, String> record : partRecords) {
                          //获取topic
                          String topic = record.topic();
                          //获取分区
                          int part= record.partition();
                          //获取offset
                          offset = record.offset();
                          //获取Key
                          String key = record.key();
                          //获取Value
                          String value = record.value();
                          System.out.println(topic+"\\t"+part+"\\t"+offset+"\\t"+key+"\\t"+value);
                      }
                      //分区数据处理结束,提交分区的offset
                      Map<TopicPartition, OffsetAndMetadata> offsets = Collections.singletonMap(partition,new OffsetAndMetadata(offset+1));
                      consumer.commitSync(offsets);
                  }
      
    • 观察结果

      • 每个分区处理成功,就提交offset
  • 小结

    • 怎么实现基于分区提交offset
    • step1:获取所有数据
    • step2:获取所有分区
    • step3:处理每个分区的数据
    • step4:处理成功,提交分区处理的offset + 1
    • 注意:工作中:一般不将Offset由Kafka存储,一般自己存储
      • 如果处理成功:将offset存储在mysql或者Zookeeper中
      • 如果重启程序:从MySQL或者Zookeeper读取上一次的offset来实现

知识点10:指定消费Topic分区的数据

  • 目标掌握Kafka如何实现消费指定分区的数据

  • 路径

    • step1:构建Topic分区对象
    • step2:指定消费Topic的分区
    • step3:输出消费结果
  • 实施

    • 构建Topic分区对象

      //构建分区对象
      TopicPartition part0 = new TopicPartition("bigdata01", 0);
      TopicPartition part1 = new TopicPartition("bigdata01", 1);
      
    • 指定消费Topic分区

      //指定消费某些分区的数据
      consumer.assign(Arrays.asList(part0,part1));
      
    • 观察结果

      image-20210331111428637

  • 小结

    • 构建Topic的分区对象:TopicPartition
    • 消费者指定消费分区:consumer.assign(Collection)

附录一:Maven依赖

<repositories>
    <repository>
        <id>aliyun</id>
        <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
    </repository>
</repositories>
<dependencies>
    <!-- Kafka的依赖 -->
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>2.4.1</version>
    </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-streams</artifactId>
        <version>2.4.1</version>
    </dependency>
</dependencies>

7943)]

  • 小结

    • 构建Topic的分区对象:TopicPartition
    • 消费者指定消费分区:consumer.assign(Collection)

附录一:Maven依赖

<repositories>
    <repository>
        <id>aliyun</id>
        <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
    </repository>
</repositories>
<dependencies>
    <!-- Kafka的依赖 -->
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>2.4.1</version>
    </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-streams</artifactId>
        <version>2.4.1</version>
    </dependency>
</dependencies>

以上是关于分布式实时消息队列Kafka的主要内容,如果未能解决你的问题,请参考以下文章

分布式实时消息队列Kafka

分布式实时消息队列Kafka

分布式实时消息队列Kafka

Kafka 消息队列系列之分布式消息队列Kafka

分布式消息队列之kafka

消息队列和缓存的区别