Kafka 入门篇

Posted Think_Higher

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kafka 入门篇相关的知识,希望对你有一定的参考价值。

文章目录

ELK分布式日志收集快速入门--kafka单体篇

目录

环境搭建

前置条件

  1. JDK
    安装教程自行百度-这个比较简单。

  2. zookeeper

    1. zookeeper安装参考地址((2条消息) 快速搭建-分布式远程调用框架搭建-dubbo+zookper+springboot demo 演示_康世行的博客-CSDN博客

      1 ,复制 conf 文件夹下面的 zoo_sample.cfg 改名为 zoo.cfg 即可。因为没有配置文件,zookeeper 无法启动
      2 创建 dataDir 的临时目录  mkdir -p /temp/zookeeper //配置文件参考下面截图 修改完配置文件之后进行启动
      3 ,启动  sh zkServer.sh start
      
    2. 修改zookeeper配合文件

    3. 启动成功

      ps aux|grep zookeeper
      

    4. 开放端口号

      1 开放2181 端口
        1.1 查看已经开发的端口 ,避免端口冲突
         firewall-cmd --list-ports 
        1.2 开放2181 端口
        firewall-cmd --zone=public --add-port=2181/tcp --permanent
      2 重启防火墙 使用规则生效
      	firewall-cmd --reload 
      2(因为我使用的是腾讯云,所以还得把腾讯云的控制台防火墙端口放
      开 2181
      
      

    安装kafka 2.12.x 版本

    1. 下载kafka安装包

      cd /opt
      
      wget http://archive.apache.org/dist/kafka/2.8.2/kafka_2.12-2.8.2.tgz
      
      1. 安装遇到的问题(由于网站证书不安全导致)

      2. 解决方案

        sudo yum install -y ca-certificates   //继续使用weget进行下载
        

      3. 下载成功

    2. 安装

      1. 解压

         tar zxvf kafka_2.12-2.8.2.tgz
        

      2. 进入kafka目录

        cd kafka_2.12-2.8.2/
        创建 logs 目录
        mkdir logs
        

      3. 修改配置文件

        # 修改以下配置
        # 1.broker.id : 配置的是集群环境,要求每台kafka都有唯一的brokerid
        # 2.log.dir : 数据存放的目录
        # 3.zookeeper.connect : zookeeper连接池地址信息(zookeeper集群)
        # 4.delete.topic.enable : 是否直接删除topic
        # 5.host.name : 主机名称
        # 6.listeners=PLAINTEXT://server1:9092
        
        vim /opt/kafka_2.12-2.8.1/config/server.properties
        
        advertised.listeners=PLAINTEXT://:9092  //在配置文件把这行注释解开
        log.dirs=/opt/kafka_2.12-2.8.1/logs
        zookeeper.connect=server1:2181,server2:2181,server3:2181
        # 文件尾部添加以下内容
        delete.topic.enable=true
        # 退出并保存
        
        
      4. 启动

        ./kafka-server-start.sh -daemon ../config/server.properties //后台启动
        ps aux|grep kafka  //查询kafka 运行状态
        

使用示例(发送消息)

  1. 服务器端测试kafka发送消息和消费消息

    1. 创建topic

      ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic testTopic
      

    2. 查看已经创建的topic

      ./kafka-topics.sh --list --zookeeper localhost:2181
      
      

    3. 发送消息

      ./kafka-console-producer.sh --broker-list localhost:9092 --topic testTopic
      

    4. 消费消息

      ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic testTopic --from-beginning
      
      

  2. 代码测试

    pom

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <parent>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>2.2.8.RELEASE</version>
            <relativePath/> <!-- lookup parent from repository -->
        </parent>
        <groupId>com.example</groupId>
        <artifactId>springBoot-kafka-demo</artifactId>
        <version>0.0.1-SNAPSHOT</version>
        <name>springBoot-kafka-demo</name>
        <description>springBoot-kafka-demo</description>
        <properties>
            <java.version>1.8</java.version>
            <fastjson.version>1.2.58</fastjson.version>
        </properties>
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
            </dependency>
            <!-- kafkfa -->
            <dependency>
                <groupId>org.springframework.kafka</groupId>
                <artifactId>spring-kafka</artifactId>
            </dependency>
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>fastjson</artifactId>
                <version>$fastjson.version</version>
            </dependency>
        </dependencies>
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-maven-plugin</artifactId>
                </plugin>
            </plugins>
        </build>
    
    </project>
    
    

    yml

    server:
      port: 8081
    spring:
      application:
        name: kafka-demo
      kafka:
        bootstrap-servers: 124.222.227.132:9092
        consumer:
          group-id:  kafka-demo-kafka-group
          key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
          value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    
    

    controller

    package com.example.springbootkafkademo.controller;
    
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.PathVariable;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    /**
     * @Description 测试发送消息
     * @ClassName TestController
     * @Author 康世行
     * @Date 22:09 2023/2/5
     * @Version 1.0
     **/
    @RestController
    @RequestMapping("/test")
    public class TestController 
        @Autowired
        private KafkaTemplate<String,String> kafkaTemplate;
    
        @GetMapping("/send/msg/topic")
        public String sendMessage(@PathVariable("msg") String msg,@PathVariable("topic") String topic)
            //发送消息到kafka
            kafkaTemplate.send(topic,msg);
            return "发送成功!";
        
    
    
    
    

    service

    package com.example.springbootkafkademo.service;
    
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.springframework.kafka.annotation.KafkaListener;
    import org.springframework.stereotype.Service;
    
    /**
     * @Description 监听发送的消息
     * @ClassName HelloListener
     * @Author 康世行
     * @Date 22:14 2023/2/5
     * @Version 1.0
     **/
    @Service
    public class HelloListener 
    
        /**
         * 消费者端:指定监听话题
         *
         * @param consumerRecord 监听到数据
         */
        @KafkaListener(topics = "testTopic")
        public void handlerMsg(ConsumerRecord<String, String> consumerRecord) 
            System.out.println("接收到消息:消息值:" + consumerRecord.value() + ",         消息偏移量:" + consumerRecord.offset());
        
    
    
    
    

    测试

    127.0.0.1:8081/test/send/测试kafka发送消息32/testTopic

    测试结果

    ``

    感谢阅读~~,希望对您有帮助。

以上是关于Kafka 入门篇的主要内容,如果未能解决你的问题,请参考以下文章

想自学Java的速来!java研究所改名

别再说自己不会了!java找文件位置

kafka基础篇——kafka生产者客户端

全网最通俗易懂的Kafka入门!

Java面试题(Kafka篇+zookeeper 篇)

Java面试题(Kafka篇+zookeeper 篇)