分布式实时消息队列Kafka

Posted 大数据Manor

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了分布式实时消息队列Kafka相关的知识,希望对你有一定的参考价值。

分布式实时消息队列Kafka(二)

知识点01:课程回顾

  1. 什么是消息队列?

    • 用于两个系统之间或者两个模块之间实现消息传递,基于队列机制实现数据缓存
  2. 消息队列的优点是什么?

    • 实现解耦
    • 通过异步,提高性能
  3. 消息队列的缺点是什么?

    • 架构更加复杂:如果消息队列出现故障,整个系统都会故障
      • 分布式集群
      • 副本机制
    • 数据维护更加复杂:不丢失,不重复
      • 生产安全:幂等性机制
      • 消费安全:Offset
  4. 什么是同步与异步?

    • 同步:立即一致性
    • 异步:最终一致性
  5. 什么是Kafka?

    • Kafka是一个基于订阅发布模式的高性能、高吞吐的实时消息队列系统
  6. Kafka在大数据的应用场景是什么?

    • 用于实时架构中:实现数据的临时存储
  7. Kafka中的Producer、Consumer、Consumer Group 、Broker分别是什么?

    • Producer:生产者,负责写入数据到Kafka
    • Consumer:消费者,负责从Kafka消费读取数据
    • Consumer Group:消费者组
      • Kafka中的数据消费必须以消费者组为单位
      • 一个消费者组可以包含多个消费者,注意多个消费者消费的数据加在一起是一份完整的数据
      • 目的:提高性能
        • 消费者组消费Topic
        • 消费者组中的消费者消费Topic的分区
    • Broker:Kafka一个节点
      • 多个节点,构建Kafka集群
      • 主从架构:类似于Zookeeper
        • HDFS:NameNode、DataNode
        • Hbase:HMaster、HRegionServer
        • Kafka:Kafka
          • 主:Kafka Controler
          • 从:Kafka Broker
          • 启动Kafka时候,会从所有的Broker选举一个Controler,如果Controller故障,会从其他的Broker重新选举一个
          • 选举:使用ZK是实现辅助选举
  8. Kafka中的Topic与Partition是什么?

    • Topic:逻辑上实现数据存储的分类,类似于数据库中的表概念

    • Partition:Topic中用于实现分布式存储的物理单元,一个Topic可以有多个分区

      • 每个分区可以存储在不同的节点,实现分布式存储
    • 副本机制:Kafka中每个分区可以构建多个副本【副本个数 <= 机器的个数】

      • 将一个分区的多个副本分为两种角色

      • leader副本:负责对外提供读写请求

      • follower副本:负责与leader同步数据,如果leader故障,follower要重新选举一个成为leader

        • 选举:不由ZK实现选举,由Kafka Crontroller来决定谁是leader
  9. Kafka中的Segment是什么?

    • Segment:对分区内部的数据进行更细的划分,分区段,文件段

      • 类似于Region中划分store

      • 规则:按照文件产生的时间或者大小

      • 目的:提高写入和查询性能

        • 文件名称可以用于检索数据:用offset命名的
      • 组成:每个Segment由两个文件组成

        • .log:存储的数据
        • .index:对应.log文件的索引信息
  10. Kafka中的Offset是什么?

    • Offset是kafka中存储数据时给每个数据做的标记或者编号
      • 分区级别的编号
      • 从0开始编号
    • 功能:消费者根据offset来进行消费,保证顺序消费,数据安全

知识点02:课程目标

  1. Kafka的集群如何搭建启动?

    • 实现Kafka分布式集群的安装部署【按照笔记一步步搭建】
  2. Kafka的Topic如何创建管理?【掌握】

    • 命令行实现

    • 创建Topic

    • 查看Topic信息

    • 删除、列举Topic

  3. Kafka的Java API如何实现?【掌握类和方法】

    • Java API
    • 开发生产者
    • 开发消费者

知识点03:Kafka集群架构

  • 目标了解Kafka集群架构及角色功能

  • 路径

    img

    • Kafka集群有哪些角色?
    • Kafka每个角色的功能是什么?
    • Zookeeper在架构中的作用是什么?
  • 实施

    • 架构角色
      • Kafka
      • Zookeeper
    • Kafka中的每个角色以及对应的功能
      • 分布式主从架构
      • 主:Kafka Controller
        • 负责管理所有从节点:Topic、分区和副本
        • 每次启动集群,会从所有Broker中选举一个Controller,由ZK实现
      • 从:Kafka Broker
        • 对外提供读写请求
        • 其他的Broker监听Controller,如果Controller,会重新从Broker选举一个新的
    • ZK的功能
      • 辅助选举Active的主节点
      • 存储元数据
  • 小结

    • kafka是一个主从架构,整体对外提供分布式读写
    • ZK主要负责选举Controller和实现元数据存储

知识点04:Kafka分布式集群部署

  • 目标实现Kafka分布式集群的搭建部署

  • 路径

    • step1:选择版本
    • step2:下载解压安装
    • step:3:修改配置文件
  • 实施

    • 版本的选型

      • 0.8.x:老的版本,很多的问题
      • 0.10.x +:消息功能上基本没有问题
      • 选择:kafka_2.12-2.4.1.tgz
        • Kafka:2.4.1
        • Scala:2.12,Kafka是由Scala语言开发
    • 下载解压安装

      • 下载:http://archive.apache.org/dist/kafka/

      • 上传到第一台机器

        cd /export/software/
        rz
        
      • 解压

        tar -zxvf kafka_2.12-2.4.1.tgz -C /export/server/
        cd /export/server/kafka_2.12-2.4.1/
        mkdir logs
        
        • bin:一般用于存放客户端操作命令脚本
        • sbin:一般用于存放集群的启动和关闭的命令脚本,如果没有这个命令,脚本放在bin目录中
        • conf/etc/config:配置文件目录
        • lib:jar包的存放目录
        • logs:一般用于存放服务日志
    • 修改配置

      • 切换到配置文件目录

        cd /export/server/kafka_2.12-2.4.1/config
        

        image-20210330091020349

      • 修改server.properties

        #21行:唯一的 服务端id
        broker.id=0
        #60行:指定kafka的日志及数据【segment【.log,.index】】存储的位置
        log.dirs=/export/server/kafka_2.12-2.4.1/logs 
        #123行:指定zookeeper的地址
        zookeeper.connect=node1:2181,node2:2181,node3:2181
        

      #在最后添加两个配置,允许删除topic,当前kafkaServer的主机名
      delete.topic.enable=true
      host.name=node1

      
      - 分发
      
      ```shell
      cd /export/server/
      scp -r kafka_2.12-2.4.1 node2:$PWD
      scp -r kafka_2.12-2.4.1 node3:$PWD
      
      • 第二台:server.properties

        #21行:唯一的 服务端id
        

      broker.id=1
      #最后
      host.name=node2

      
      - 第三台:server.properties
      
      ```properties
      #21行:唯一的 服务端id
      broker.id=2
      #最后
      host.name=node3
      
      • 添加环境变量

        vim /etc/profile
        

      #KAFKA_HOME
      export KAFKA_HOME=/export/server/kafka_2.12-2.4.1
      export PATH=: P A T H : PATH: PATH:KAFKA_HOME/bin

      
      ```shell
      source /etc/profile
      
  • 小结

    • 按照笔记一步步来,不做过多要求,只要配置含义,实现安装即可
    • 解压安装
    • 修改配置:server.properties

知识点05:Kafka启动与关闭

  • 目标掌握kafka集群的启动与关闭命令及脚本封装

  • 路径

    • step1:如何启动Kafka集群?
    • step2:如何关闭Kafka集群?
    • step3:如何封装启动及关闭脚本?
  • 实施

    • 启动Zookeeper

      /export/server/zookeeper-3.4.6/bin/start-zk-all.sh 
      
    • 启动Kafka

      bin/kafka-server-start.sh config/server.properties >>/dev/null 2>&1 &
      
       >>/dev/null 2>&1 &:在后台运行
      
    • 关闭Kafka

      bin/kafka-server-stop.sh 
      
    • 封装Kafka脚本

      • 启动脚本

        vim /export/server/kafka_2.12-2.4.1/bin/start-kafka.sh
        
        #!/bin/bash
        KAFKA_HOME=/export/server/kafka_2.12-2.4.1
        
        for number in {1..3}
        do
                host=node${number}
                echo ${host}
                /usr/bin/ssh ${host} "cd ${KAFKA_HOME};source /etc/profile;export JMX_PORT=9988;${KAFKA_HOME}/bin/kafka-server-start.sh ${KAFKA_HOME}/config/server.properties >>/dev/null 2>&1 &"
                echo "${host} started"
        done
        
        chmod u+x /export/server/kafka_2.12-2.4.1/bin/start-kafka.sh
        
      • 关闭脚本

        vim /export/server/kafka_2.12-2.4.1/bin/stop-kafka.sh
        
        #!/bin/bash
        KAFKA_HOME=/export/server/kafka_2.12-2.4.1
        
        for number in {1..3}
        do
          host=node${number}
          echo ${host}
          /usr/bin/ssh ${host} "cd ${KAFKA_HOME};source /etc/profile;${KAFKA_HOME}/bin/kafka-server-stop.sh"
          echo "${host} stoped"
        done
        
        chmod u+x /export/server/kafka_2.12-2.4.1/bin/stop-kafka.sh
        
  • 小结

    • 启动:kafka-server-start.sh
    • 关闭:kafka-server-stop.sh

知识点06:Topic管理:创建与列举

  • 目标掌握Kafka集群中Topic的管理命令,实现创建Topic及列举Topic

  • 路径

    • step1:Topic脚本的使用
    • step2:创建Topic
    • step3:列举Topic
  • 实施

    • Topic管理脚本

      image-20210330093455692

      • 查看用法

        Create, delete, describe, or change a topic.
        Option                                   Description                            
        ------                                   -----------                            
        --alter                                  Alter the number of partitions,        
                                                   replica assignment, and/or           
                                                   configuration for the topic.         
        --at-min-isr-partitions                  if set when describing topics, only    
                                                   show partitions whose isr count is   
                                                   equal to the configured minimum. Not 
                                                   supported with the --zookeeper       
                                                   option.                              
        --bootstrap-server <String: server to    REQUIRED: The Kafka server to connect  
          connect to>                              to. In case of providing this, a     
                                                   direct Zookeeper connection won't be 
                                                   required.                            
        --command-config <String: command        Property file containing configs to be 
          config property file>                    passed to Admin Client. This is used 
                                                   only with --bootstrap-server option  
                                                   for describing and altering broker   
                                                   configs.                             
        --config <String: name=value>            A topic configuration override for the 
                                                   topic being created or altered.The   
                                                   following is a list of valid         
                                                   configurations:                      
                                                        cleanup.policy                        
                                                        compression.type                      
                                                        delete.retention.ms                   
                                                        file.delete.delay.ms                  
                                                        flush.messages                        
                                                        flush.ms                              
                                                        follower.replication.throttled.       
                                                   replicas                             
                                                        index.interval.bytes                  
                                                        leader.replication.throttled.replicas 
                                                        max.compaction.lag.ms                 
                                                        max.message.bytes                     
                                                        message.downconversion.enable         
                                                        message.format.version                
                                                        message.timestamp.difference.max.ms   
                                                        message.timestamp.type                
                                                        min.cleanable.dirty.ratio             
                                                        min.compaction.lag.ms                 
                                                        min.insync.replicas                   
                                                        preallocate                           
                                                        retention.bytes                       
                                                        retention.ms                          
                                                        segment.bytes                         
                                                        segment.index.bytes                   
                                                        segment.jitter.ms                     
                                                        segment.ms                            
                                                        unclean.leader.election.enable        
                                                 See the Kafka documentation for full   
                                                   details on the topic configs.It is   
                                                   supported only in combination with --
                                                   create if --bootstrap-server option  
                                                   is used.                             
        --create                                 Create a new topic.                    
        --delete                                 Delete a topic                         
        --delete-config <String: name>           A topic configuration override to be   
                                                   removed for an existing topic (see   
                                                   the list of configurations under the 
                                                   --config option). Not supported with 
                                                   the --bootstrap-server option.       
        --describe                               List details for the given topics.     
        --disable-rack-aware                     Disable rack aware replica assignment  
        --exclude-internal                       exclude internal topics when running   
                                                   list or describe command. The        
                                                   internal topics will be listed by    
                                                   default                              
        --force                                  Suppress console prompts               
        --help                                   Print usage information.               
        --if-exists                              if set when altering or deleting or    
                                                   describing topics, the action will   
                                                   only execute if the topic exists.    
                                                   Not supported with the --bootstrap-  
                                                   server option.                       
        --if-not-exists                          if set when creating topics, the       
                                                   action will only execute if the      
                                                   topic does not already exist. Not    
                                                   supported with the --bootstrap-      
                                                   server option.                       
        --list                                   List all available topics.             
        --partitions <Integer: # of partitions>  The number of partitions for the topic 
                                                   being created or altered (WARNING:   
                                                   If partitions are increased for a    
                                                   topic that has a key, the partition  
                                                   logic or ordering of the messages    
                                                   will be affected). If not supplied   
                                                   for create, defaults to the cluster  
                                                   default.                             
        --replica-assignment <String:            A list of manual partition-to-broker   
          broker_id_for_part1_replica1 :           assignments for the topic being      
          broker_id_for_part1_replica2 ,           created or altered.                  
          broker_id_for_part2_replica1 :                                                
          broker_id_for_part2_replica2 , ...>                                           
        --replication-factor <Integer:           The replication factor for each        
          replication factor>                      partition in the topic being         
                                                   created. If not supplied, defaults   
                                                   to the cluster default.              
        --topic <String: topic>                  The topic to create, alter, describe   
                                                   or delete. It also accepts a regular 
                                                   expression, except for --create      
                                                   option. Put topic name in double     
                                                   quotes and use the '\\' prefix to     
                                                   escape regular expression symbols; e.
                                                   g. "test\\.topic".                    
        --topics-with-overrides                  if set when describing topics, only    
                                                   show topics that have overridden     
                                                   configs                              
        --unavailable-partitions                 if set when describing topics, only    
                                                   show partitions whose leader is not  
                                                   available                            
        --under-min-isr-partitions               if set when describing topics, only    
                                                   show partitions whose isr count is   
                                                   less than the configured minimum.    
                                                   Not supported with the --zookeeper   
                                                   option.                              
        --under-replicated-partitions            if set when describing topics, only    
                                                   show under replicated partitions     
        --version                                Display Kafka version.                 
        --zookeeper <String: hosts>              DEPRECATED, The connection string for  
                                                   the zookeeper connection in the form 
                                                   host:port. Multiple hosts can be     
                                                   given to allow fail-over. 
        
    • 创建Topic

      bin/kafka-topics.sh --create --topic bigdata01 --partitions 3 --replication-factor 2 --bootstrap-server node1:9092,node2:9092,node3:9092
      
      • –create:创建
      • –topic :指定操作的Topic的名称
      • –partitions:指定分区个数,默认为1
      • –replication-factor:副本因子,默认为1
      • –bootstrap-server:指定Kafka服务端地址
    • 列举Topic

      bin/kafka-topics.sh --list -bootstrap-server node1:9092,node2:9092,node3:9092
      
      • –list:表示列举

        image-20210330094749604

  • 小结

    • 创建:create
    • 指定分区个数
      • 指定副本个数
    • 列举:list
    • 必选:–bootstrap-server:服务端地址
      • 端口:9092

知识点07:Topic管理:查看与删除

  • 目标掌握Kafka集群中Topic的管理命令,实现查看Topic信息及删除Topic

  • 路径

    • step1:查看Topic详细信息
    • step2:删除Topic
  • 实施

    • 查看Topic信息

      bin/kafka-topics.sh --describe --topic bigdata02  --bootstrap-server node1:9092,node2:9092,node3:9092
      
      Topic: bigdata02        PartitionCount: 2       ReplicationFactor: 3    Configs: segment.bytes=1073741824
      
      
              Topic: bigdata02        Partition: 0    Leader: 0       Replicas: 0,1,2 Isr: 0,1,2
              Topic: bigdata02        Partition: 1    Leader: 2       Replicas: 2,0,1 Isr: 2,0,1
      
      • Partition:分区编号
      • Replicas:分区副本所在的Kafka Broker ID
        • 每个分区的副本有两种角色
        • leader副本
        • follower副本
      • Leader:leader 副本所在的Kafka节点
      • Isr:In-Sync-Replicas:正在同步的副本,可用的副本
        • 用于leader故障时,选举新的leader
    • 删除Topic

      bin/kafka-topics.sh --delete --topic bigdata02  --bootstrap-server node1:9092,node2:9092,node3:9092
      
  • 小结

    • 查看信息:describe
    • 删除:delete

知识点08:生产者及消费者测试

  • 目标:了解命令行如何模拟测试生产者和消费者

  • 路径

    • step1:构建一个生产者往Topic中生产数据
      • 指定Topic
      • 指定Kafka集群地址
    • step2:构建一个消费者从Topic中消费数据
      • 指定Topic
      • 指定Kafka集群地址
  • 实施

    • 命令行提供的脚本

      image-20210330101943181

    • Console生产者

      bin/kafka-console-producer.sh --topic bigdata01 --broker-list node1:9092,node2:9092,node3:9092
      

      image-20210330102711779

    • Console消费者

      bin/kafka-console-consumer.sh --topic bigdata01 --bootstrap-server node1:9092,node2:9092,node3:9092  --from-beginning
      
      • –from-beginning:从每个分区的最初开始消费,默认从最新的offset进行消费

      image-20210330102756214

  • 小结

    • 只要生产者不断生产,消费就能实时的消费到Topic中的数据

知识点09:可视化工具Kafka Tool

  • 目标了解Windows版可视化工具Kafka Tool的使用

  • 路径

    • step1:安装Kafka Tool
    • step2:启动构建连接
    • step3:查看Kafka集群信息
  • 实施

    • 安装Kafka Tool:不断下一步即可

      image-20210329163448255

      image-20210329163613726

    • 构建集群连接:连接Kafka集群

      image-20210329163535693

      image-20210329163710836

    • 查看集群信息

      image-20210330104345929

  • 小结

    • 可视化工具,界面或者交互性不是很友好
    • 后面会学习:Kafka Eagle

知识点10:Kafka集群压力测试

  • 目标了解如何实现Kafka集群的吞吐量及压力测试

  • 路径

    • step1:生产压力测试
    • step2:消费压力测试
  • 实施

    • 创建Topic

      bin/kafka-topics.sh --create --topic bigdata --partitions 2 --replication-factor 2 --bootstrap-server node1:9092,node2:9092,node3:9092
      
    • 生产测试

      kafka-producer-perf-test.sh --topic bigdata --num-records 5000000 --throughput -1 --record-size 1000 --producer-props bootstrap.servers=node1:9092,node2:9092,node3:9092 acks=1
      
      • –num-records:写入数据的条数
      • –throughput:是否做限制,-1表示不限制
      • –record-size:每条数据的字节大小

      image-20210330104918488

    • 消费测试

      kafka-consumer-perf-test.sh --topic bigdata --broker-list node1:9092,node2:9092,node3:9092  --fetch-size 1048576 --messages 5000000
      

      [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-86jBAgCD-1625805582217)(20210330_分布式实时消息队列Kafka(二).assets/image-20210330105146154.png)]

  • 小结

    • 工作中一般根据实际的需求来调整参数,测试kafka集群的最高性能,判断是否能满足需求

知识点11:Kafka API 的应用

  • 目标了解工作中使用Kafka API的场景

  • 路径

    • step1:工作中使用Kafka的方式
    • step2:Kafka API的分类
  • 实施

    • 命令行使用Kafka

      • 一般只用于topic的管理:创建、删除
    • 大数据架构中使用Kafka

      • Java API:构建生产者和消费者

      • 工作中一般不用自己开发生产者和消费者

      • 生产者:数据采集工具

        • Flume:Kafka sink
          • 配置kafka集群地址
          • Topic的名称
      • 消费者:实时计算程序

        • SparkStream:KafkaUtil

          KafkaUtil.createDirectStream
          
      • 这些软件的API已经将Kafka生产者和消费者的API封装了,只要调用即可

      • 重点掌握:用到哪些类和方法

    • Kafka的API的分类

      • High Level API:高级API
        • 基于了SimpleAPI做了封装,让用户开发更加方便
        • 但是由于封装了底层的API,有很多的东西不能控制,无法控制数据安全
      • Simple API:简单API
        • 并不简单,最原始的API
        • 自定义控制所有消费和生产、保证数据安全
  • 小结

    • 大数据工作中一般不自己开发Java API:掌握类和方法即可
    • 只使用Simple API来实现开发

知识点12:生产者API:构建KafkaProducer

  • 目标了解如何通过Java API构建生产者

  • 路径

    • step1:构建集群配置对象
      • 指定服务端集群地址
    • step2:构建Kafka Porducer对象
      • 加载配置
  • 实施

    • 构建集群配置对象

       //todo:1-构建连接,Kafka生产者对象
      //构建配置对象,指定生产者的配置
      Properties props = new Properties();
      props.put("bootstrap.servers", "node1:9092,node2:9092,node3:9092");//指定集群地址
      /**
               * acks:表示生产者生产数据时,怎么保证数据不丢失,Kafka接受写入数据以后,可以给生产者返回一个ack,表示收到这条数据,生产者发送下一条
               * 0:生产者不管Kafka有没有返回ack,都直接发送下一条
               *              快、数据容易丢失
               * 1:生产者发送数据给Kafka的某个分区,写入leader副本以后,kafka就返回ack,生产者发送下一条
               *              相对安全机制,有一定的概率,数据会丢失
               * all:生产者发送数据给Kafka的某个分区,写入leader副本并且所有follower同步成功以后,kafka就返回ack,生产者发送下一条
               *              最安全,性能较差
               */
      props.put("acks", "all");
      //指定Key的序列化的类
      props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
      //指定Value的序列化的类
      props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
      
      
    • 构建Kafka Producer加载配置

              //构建生产者对象
              KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
      
  • 小结

    • Properties:构建生产者的配置
    • 集群地址
      • acks
      • 序列化的类
    • KafkaProducer:生产者的对象

知识点13:生产者API:生产数据到Kafka

  • 目标了解如何将数据写入Kafka中

  • 路径

    • step1:构建ProducerRecord对象
    • step2:调用KafkaProducer的send方法将数据写入Kafka
  • 实施

    • 构建Producer对象

      //构建一条数据的对象
      //ProducerRecord(String topic, V value)
      //            ProducerRecord<String, String> record = new ProducerRecord<String, String>("bigdata01","itcast"+i);
      //ProducerRecord(String topic, K key, V value)
      //        ProducerRecord<String, String> record = new ProducerRecord<String, String>("bigdata01",i+"","itcast"+i);
      //ProducerRecord(String topic, Integer partition, K key, V value)
      ProducerRecord<String, String> record = new ProducerRecord<String, String>("bigdata01",0,i+"","itcast"+i);
      
    • 调用send方法

       //生产数据的数据
       producer.send(record);
      
  • 小结

    • ProducerRecord:表示生产每一条数据
    • Topic
      • Key
      • Value
      • 可选:Partition
    • KafkaProducer:send:写入数据到Kafka

知识点14:消费者API:构建KafkaConsumer

  • 目标了解如何通过Java API构建消费者

  • 路径

    • step1:构建集群配置对象
    • step2:构建Kafka Consumer对象
  • 实施

    • 构建集群配置对象

      //todo:1-构建连接,消费者对象
              Properties props = new Properties();
              props.setProperty("bootstrap.servers", "node1:9092");//服务端地址
              props.setProperty("group.id", "test01");//消费者组的id
              props.setProperty("enable.auto.commit", "true");//是否自动提交offset
              props.setProperty("auto.commit.interval.ms", "1000");//提交的间隔时间
              //指定key和value反序列化的类
              props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
              props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
      
    • 构建Kafka Consumer加载配置

              KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
      
  • 小结

    • Properties:配置对象
    • KafkaConsumer:消费者对象

知识点15:消费者API:消费Topic数据

  • 目标了解如何从Kafka中消费数据

  • 路径

    • step1:消费者订阅Topic
    • step2:调用poll方法从Kafka中拉取数据,获取返回值
    • step3:从返回值中输出:Topic、Partition、Offset、Key、Value
  • 实施

    • 消费者订阅Topic

      //订阅Topic
      consumer.subscribe(Arrays.asList("bigdata01"));
      
    • 拉取数据

      //消费数据
      ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
      
    • 输出数据

      //取出每一条数据
      for (ConsumerRecord<String, String> record : records) {
          //获取topic
          String topic = record.topic();
          //获取分区
          int partition = record.partition();
          //获取offset
          long offset = record.offset();
          //获取Key
          String key = record.key();
          //获取Value
          String value = record.value();
          System.out.println(topic+"\\t"+partition+"\\t"+offset+"\\t"+key+"\\t"+value);
      }
      
  • 小结

    • KafkaConsumer:subscribe:负责订阅Kafka的Topic
    • KafkaConsumer:poll:负责拉取消费数据
    • ConsumerRecords:消费到的所有数据的集合
    • ConsumerRecord:消费到的每一条数据
      • topic:获取Topic
      • partition:获取分区
      • offset:获取offset
      • key:获取key
      • value:获取value

附录一:Maven依赖

<repositories>
    <repository>
        <id>aliyun</id>
        <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
    </repository>
</repositories>
<dependencies>
    <!-- Kafka的依赖 -->
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>2.4.1</version>
    </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-streams</artifactId>
        <version>2.4.1</version>
    </dependency>
</dependencies>

cord.key();
//获取Value
String value = record.value();
System.out.println(topic+"\\t"+partition+"\\t"+offset+"\\t"+key+"\\t"+value);
}
```

  • 小结

    • KafkaConsumer:subscribe:负责订阅Kafka的Topic
    • KafkaConsumer:poll:负责拉取消费数据
    • ConsumerRecords:消费到的所有数据的集合
    • ConsumerRecord:消费到的每一条数据
      • topic:获取Topic
      • partition:获取分区
      • offset:获取offset
      • key:获取key
      • value:获取value

附录一:Maven依赖

<repositories>
    <repository>
        <id>aliyun</id>
        <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
    </repository>
</repositories>
<dependencies>
    <!-- Kafka的依赖 -->
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>2.4.1</version>
    </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-streams</artifactId>
        <version>2.4.1</version>
    </dependency>
</dependencies>

以上是关于分布式实时消息队列Kafka的主要内容,如果未能解决你的问题,请参考以下文章

分布式实时消息队列Kafka

分布式实时消息队列Kafka

分布式实时消息队列Kafka

Kafka 消息队列系列之分布式消息队列Kafka

分布式消息队列之kafka

消息队列和缓存的区别