Kafka在SpringBoot 2.0中的整合

Posted weafteam

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kafka在SpringBoot 2.0中的整合相关的知识,希望对你有一定的参考价值。

一、Windows平台Kafka的环境搭建

注意:确保JAVA环境变量的正确

1.ZooKeeper的安装

 
   
   
 
  1. dataDir=D:\data\logs\zookeeper

  2. dataLogDir=D:\data\logs\zookeeper

然后进入bin目录双击zkServer.cmd运行。如下图:Kafka在SpringBoot 2.0中的整合

2.Kafka的安装

 
   
   
 
  1. log.dirs=D:\data\logs\kafka

我们可以看到bin目录下的是linux的启动脚本,然后有个单独的文件夹装着windows的脚本。 我们在根目录下打开命令行,运行以下命令启动Kafka。 我们在运行前需要注意以下几点

  1. 确认JAVA环境变量没有问题

  2. 路径不能有空格,不然可能会出现无法加载主类的错误。

  3. 出现无法加载主类错误,可修改bin\windows目录中的kafka-run-class.bat中 set COMMAND=%JAVA% %KAFKAHEAPOPTS% %KAFKAJVMPERFORMANCEOPTS% %KAFKAJMXOPTS% %KAFKALOG4JOPTS% -cp %CLASSPATH% %KAFKAOPTS% %* 中"%CLASSPATH%"加上双引号

Kafka在SpringBoot 2.0中的整合

 
   
   
 
  1. .\bin\windows\kafka-server-start.bat .\config\server.properties

二、SpringBoot2.0相关配置

pom文件加入以下依赖

pom.xml
 
   
   
 
  1. <!-- kafka -->

  2. <dependency>

  3.    <groupId>org.springframework.kafka</groupId>

  4.    <artifactId>spring-kafka</artifactId>

  5.    <version>2.1.5.RELEASE</version>

  6. </dependency>

我这里SpringBoot的配置文件使用的是YAML。 在相应环境中配置Kafka

application-local.yml
 
   
   
 
  1. server:

  2.    port: 7777

  3. spring:

  4.    datasource:

  5.        name: test

  6.        driverClassName: com.mysql.jdbc.Driver

  7.        url: jdbc:mysql://.....

  8.        username: ...

  9.        password: ....

  10.    redis:

  11.      database: 0

  12.      host: localhost

  13.      port: 6379

  14.      jedis:

  15.        pool:

  16.          min-idle: 0

  17.          max-idle: 8

  18.          max-active: 8

  19.          max-wait: -1ms

  20.      password: 123456

  21.    kafka:

  22.        consumer:

  23.          group-id: foo

  24.          auto-offset-reset: earliest

  25.          key-deserializer: org.apache.kafka.common.serialization.StringDeserializer

  26.          value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

  27.        producer:

  28.          key-serializer: org.apache.kafka.common.serialization.StringSerializer

  29.          value-serializer: org.apache.kafka.common.serialization.StringSerializer

  30.        bootstrap-servers: localhost:9092

  31. app:

  32.      topic:

  33.        foo: foo.t

可以仅关注spring.kafka和app.topic节点 更多spring.kafka配置信息请查看官网文档

三、代码

主要代码结构Kafka在SpringBoot 2.0中的整合消费者代码

 
   
   
 
  1. package com.xxx.kafka.consumer;

  2. import lombok.extern.slf4j.Slf4j;

  3. import org.springframework.kafka.annotation.KafkaListener;

  4. import org.springframework.messaging.handler.annotation.Payload;

  5. import org.springframework.stereotype.Service;

  6. /**

  7. * @Author :yaxuSong

  8. * @Description:

  9. * @Date: 17:56 2018/4/23

  10. * @Modified by:

  11. */

  12. @Slf4j

  13. @Service

  14. public class Receiver {

  15.    @KafkaListener(topics = "${app.topic.foo}")

  16.    public void listen(@Payload String message) {

  17.        log.info("received message='{}'", message);

  18.    }

  19. }

生产者代码

 
   
   
 
  1. package com.xxx.kafka.producer;

  2. import lombok.extern.slf4j.Slf4j;

  3. import org.springframework.beans.factory.annotation.Autowired;

  4. import org.springframework.beans.factory.annotation.Value;

  5. import org.springframework.kafka.core.KafkaTemplate;

  6. import org.springframework.stereotype.Service;

  7. /**

  8. * @Author :yaxuSong

  9. * @Description:

  10. * @Date: 17:57 2018/4/23

  11. * @Modified by:

  12. */

  13. @Service

  14. @Slf4j

  15. public class Sender {

  16.    @Autowired

  17.    private KafkaTemplate<String, String> kafkaTemplate;

  18.    @Value("${app.topic.foo}")

  19.    private String topic;

  20.    public void send(String message){

  21.        log.info("sending message='{}' to topic='{}'", message, topic);

  22.        kafkaTemplate.send(topic, message);

  23.    }

  24. }

测试代码

 
   
   
 
  1. package com.xxx.controller;

  2. import com.xxx.controller.entry.ResMsg;

  3. import com.xxx.Sender;

  4. import lombok.extern.slf4j.Slf4j;

  5. import org.springframework.beans.factory.annotation.Autowired;

  6. import org.springframework.web.bind.annotation.RequestMapping;

  7. import org.springframework.web.bind.annotation.RestController;

  8. /**

  9. * @Author :yaxuSong

  10. * @Description:

  11. * @Date: 11:21 2018/4/21

  12. * @Modified by:

  13. */

  14. @Slf4j

  15. @RequestMapping("test")

  16. @RestController

  17. public class TestController {

  18.    @Autowired

  19.    private Sender sender;

  20.    @RequestMapping("send")

  21.    public ResMsg send(String content){

  22.        sender.send("Spring Kafka and Spring Boot Send Message:"+content);

  23.        return ResMsg.success();

  24.    }

  25. }

四、简单的测试

运行项目Kafka在SpringBoot 2.0中的整合测试发送查看接收结果:

五、SpringBoot-Demo


以上是关于Kafka在SpringBoot 2.0中的整合的主要内容,如果未能解决你的问题,请参考以下文章

SpringBoot整合Kafka

SpringBoot整合Kafka

SpringBoot整合Kafka

SpringBoot整合Kafka

SpringBoot进阶教程(六十二)整合Kafka

springboot 整合kafka