一.Kafka入门到精通-Kafka快速入门

Posted 墨家巨子@俏如来

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了一.Kafka入门到精通-Kafka快速入门相关的知识,希望对你有一定的参考价值。

前言

在内卷严重的程序员圈子中,原地踏步就是退步,所以不能再躺平啦,赶紧爬起来学习,接下来博主将推出《Kafka入门到精通》系列文章,让你可以在企业中玩起Kafka来得心应手,此乃升职加薪必备呀。

Kafka认识

Kafka 是由Linkedin公司开发的,支持多分区、多副本,基于 Zookeeper 的分布式消息流平台,是一款基于发布订阅模式的开源消息引擎系统。相对于其他的消息组件来说Kafka拥有更好的吞吐量、内置分区、具有复制和容错的功能,这使它成为一个非常理想的大型消息处理应用。使用场景如:网页浏览记录,日志收集,监控数据等等。

Kafka 的标准定位是分布式流式处理平台,早期的定位是以消息引擎的身份出现的,随着 Kafka 的不断演进, Kafka 开发团队日益发现经 Kafka 交由下游数据处理平台做的事情 Kafka 自己也可以做,因此在 Kafka 0.10.0.0 版本正式推出了 Kafka Streams ,即流式处理组件 。自 Kafka 正式成为了 个流式处理框架,而不仅仅是消息引擎了。

如图所示,Kafka的工作流程为

  • 生产者发送消息到Kafka集群
  • 消费者从Kafka拉取消息
  • Kafka依赖于Zookeeper处理服务的协调

Kafka快速安装(windows)

第一步:下载kafka ,https://www.apache.org/dyn/closer.cgi?path=/kafka/1.0.0/kafka_2.11-1.0.0.tgz

下载后解压,由于Kafka 使用 ZooKeeper作为服务协调工具, 如果你还没有ZooKeeper服务器,你需要先启动一个ZooKeeper服务器,Kafka 内置提供了 ZooKeeper 服务器以及 组相关的管理脚本,我们直接使用这个内置ZooKeeper 即可,进入到bin\\windows 目录,执行如下列命令(windows):

zookeeper-server-start.bat ../../config/zookeeper.properties

zookeeper.properties是作为zookeeper的配置文件,比如你想修改zookeeper的默认端口通过配置文件修 clientPort=2181项即可 ,如下:

如果是在linux启动,cd到bin目录,执行下面命令

zookeeper-server-start.sh ../config/zookeeper.properties

启动效果如下

对于运行 Kafka 言,至少要求安装Java 7版本 ,Zookeeper的端口是2181接下来我们启动 Kafka 务器,进入到bin\\windows 目录,执行如下列命令(windows):

kafka-server-start.bat ../../config/server.properties

server.properties作为kafka的配置文件,我们关注三个配置,你也可以根据情况进行修改

broker.id=0  #如果要集群部署,broker.id不能重复
advertised.listeners=PLAINTEXT://127.0.0.1:9092  #kafka的地址和端口 
zookeeper.connect=127.0.0.1:2181 #zookeeper的地址和端口

启动效果如下

控制台输出结尾处的“ Kafka Server O], started ,标志 Kafka 服务器启动成功,默认的服务端口是 9092

Kafka快速入门

下面我们基于SpringBoot来快速入门Kafka,做一个发送消息和接收消息的案例

第一步:搭建SpringBoot工程,导入依赖

<parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.2.5.RELEASE</version>
    </parent>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.28</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

第二步,创建启动类,yml配置如下

spring:
  application:
    name: application-kafka
  kafka:
    bootstrap-servers: localhost:9092 #这个是kafka的地址,对应你server.properties中配置的
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer #序列化
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      group-id: test-consumer-group #消费者的ID,这个对应 config/consumer.properties中的group.id

第三步:创建测试类,发送消息, 通过 注入 KafkaTemplate 发送


@SpringBootTest(classes = KafkaApplication.class)
@RunWith(SpringRunner.class)
public class ProducerTest 

    @Autowired
    private KafkaTemplate<String,String> kafkaTemplate;

    @Test
    public void send() throws InterruptedException 
        //发送消息:第一个参数是topic,第二个参数是消息的内容。Topic我们可以理解为对消息的分类
        kafkaTemplate.send("topic-hello", "你好kafka");
        System.out.println("发送完成");
        Thread.sleep(2000);
    


第四步:创建消费者,接收消息, 通过 :@KafkaListener(topics = “topic”) 监听topic中的消息

@Component
public class HelloConsumer 

    @KafkaListener(topics = "topic-hello")
    public void handler(String message)
        System.out.println("收到消息:"+message);
    


第五步:使用命令 创建Topic,否则会找不到Topic

创建名为“topic-hello”的TOPIC , 进入 bin/windows目录,cmd执行:

kafka-topics.bat --create --zookeeper localhost:2181 --topic topic-hello --partitions 1 --replication-factor 1

执行测试方法,查看控制台效果

文章结束,希望对你有所帮助,喜欢的话点赞收藏加评论哦

以上是关于一.Kafka入门到精通-Kafka快速入门的主要内容,如果未能解决你的问题,请参考以下文章

kafka入门到精通

三.Kafka入门到精通-SpringBoot整合Kafka(上)

二.Kafka入门到精通-Kafka核心概念

二.Kafka入门到精通-Kafka核心概念

三.Kafka入门到精通-SpringBoot整合Kafka(同步&异步消息&事务消息&手动确认)

kafka入门