SpringBoot中使用kafka

Posted gidybzc

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SpringBoot中使用kafka相关的知识,希望对你有一定的参考价值。

在能够在windows下使用命令行启动kafka服务器,创建topic、producer、以及consumer后,尝试在JAVA中使用Kafka。

使用IDEA创建SpringBoot项目

这个使用IDEA创建一个新的SpringBoot项目就可以,也可以在https://start.spring.io/下载一个新的项目。

配置依赖

 pom文件如下:

 1 <?xml version="1.0" encoding="UTF-8"?>
 2 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 3          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 4     <modelVersion>4.0.0</modelVersion>
 5     <parent>
 6         <groupId>org.springframework.boot</groupId>
 7         <artifactId>spring-boot-starter-parent</artifactId>
 8         <version>2.1.2.RELEASE</version>
 9         <relativePath/> <!-- lookup parent from repository -->
10     </parent>
11     <groupId>com.example</groupId>
12     <artifactId>demo</artifactId>
13     <version>0.0.1-SNAPSHOT</version>
14     <name>demo</name>
15     <description>Demo project for Spring Boot</description>
16 
17     <properties>
18         <java.version>1.8</java.version>
19     </properties>
20 
21     <dependencies>
22         <dependency>
23             <groupId>org.springframework.boot</groupId>
24             <artifactId>spring-boot-starter-web</artifactId>
25         </dependency>
26 
27         <dependency>
28             <groupId>org.projectlombok</groupId>
29             <artifactId>lombok</artifactId>
30             <optional>true</optional>
31         </dependency>
32 
33         <dependency>
34             <groupId>org.springframework.boot</groupId>
35             <artifactId>spring-boot-starter-test</artifactId>
36             <scope>test</scope>
37         </dependency>
38 
39         <dependency>
40             <groupId>org.springframework.kafka</groupId>
41             <artifactId>spring-kafka</artifactId>
42             <version>2.2.0.RELEASE</version>
43         </dependency>
44 
45         <dependency>
46             <groupId>com.google.code.gson</groupId>
47             <artifactId>gson</artifactId>
48             <version>2.8.2</version>
49         </dependency>
50 
51     </dependencies>
52 
53     <build>
54         <plugins>
55             <plugin>
56                 <groupId>org.springframework.boot</groupId>
57                 <artifactId>spring-boot-maven-plugin</artifactId>
58             </plugin>
59         </plugins>
60     </build>
61 
62 </project>

主要引入了 spring-kafka 、lombok 、 gson 依赖。

相关配置

 1 #============== kafka ===================
 2 # 指定kafka 代理地址,可以多个
 3 spring.kafka.bootstrap-servers=127.0.0.1:9092
 4 
 5 #=============== provider  =======================
 6 
 7 spring.kafka.producer.retries=0
 8 # 每次批量发送消息的数量
 9 spring.kafka.producer.batch-size=16384
10 spring.kafka.producer.buffer-memory=33554432
11 
12 # 指定消息key和消息体的编解码方式
13 spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
14 spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
15 
16 #=============== consumer  =======================
17 # 指定默认消费者group id
18 spring.kafka.consumer.group-id=test-consumer-group
19 
20 spring.kafka.consumer.auto-offset-reset=earliest
21 spring.kafka.consumer.enable-auto-commit=true
22 spring.kafka.consumer.auto-commit-interval=100
23 
24 # 指定消息key和消息体的编解码方式
25 spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
26 spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

创建producer类

 KafkaSender通过KafkaTemplate发送消息到topic

 1 @Component
 2 public class KafkaSender {
 3 
 4     private Logger log = LoggerFactory.getLogger(KafkaSender.class);
 5 
 6     @Resource
 7     private KafkaTemplate<String, String> kafkaTemplate;
 8 
 9     private Gson gson = new GsonBuilder().create();
10 
11     //发送消息方法
12     public void send() {
13         Message message = new Message();
14         message.setId(System.currentTimeMillis());
15         message.setMsg(UUID.randomUUID().toString());
16         message.setSendTime(new Date());
17         log.info("+++++++++++++++++++++  message = {}", gson.toJson(message));
18         kafkaTemplate.send("test22", gson.toJson(message));
19     }
20 }

创建consumer类

 1 @Component
 2 public class KafkaReceiver {
 3     private Logger log = LoggerFactory.getLogger(KafkaSender.class);
 4     
 5     @KafkaListener(topics = {"test22"})
 6     public void listen(ConsumerRecord<?, ?> record) {
 7         Optional<?> kafkaMessage = Optional.ofNullable(record.value());
 8         if (kafkaMessage.isPresent()) {
 9             Object message = kafkaMessage.get();
10             log.info("----------------- record =" + record);
11             log.info("------------------ message =" + message);
12         }
13     }
14 }

message实体类

1 @Data
2 public class Message {
3     private Long id;
4 
5     private String msg;
6 
7     private Date sendTime;
8 }

我在这里省略了getter、setter方法。

在启动项目之前,请先启动kafka服务,注意端口号是否一致。kafka相关参见前一篇随笔《windows下安装并使用kafka

 

参考:https://blog.csdn.net/tzs_1041218129/article/details/78988439

 

以上是关于SpringBoot中使用kafka的主要内容,如果未能解决你的问题,请参考以下文章

SpringBoot中使用kafka

SpringBoot Kafka 整合实例教程

SpringBoot中使用Kafka报错:Failed to construct kafka consumer

Windows平台整合SpringBoot+KAFKA__第2部分_代码编写前传

springboot kafka发送消息支持成功失败通知

Springboot配置kafka用户名密码