SpringBoot中使用kafka
Posted gidybzc
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SpringBoot中使用kafka相关的知识,希望对你有一定的参考价值。
在能够在windows下使用命令行启动kafka服务器,创建topic、producer、以及consumer后,尝试在JAVA中使用Kafka。
使用IDEA创建SpringBoot项目
这个使用IDEA创建一个新的SpringBoot项目就可以,也可以在https://start.spring.io/下载一个新的项目。
配置依赖
pom文件如下:
1 <?xml version="1.0" encoding="UTF-8"?> 2 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 3 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 4 <modelVersion>4.0.0</modelVersion> 5 <parent> 6 <groupId>org.springframework.boot</groupId> 7 <artifactId>spring-boot-starter-parent</artifactId> 8 <version>2.1.2.RELEASE</version> 9 <relativePath/> <!-- lookup parent from repository --> 10 </parent> 11 <groupId>com.example</groupId> 12 <artifactId>demo</artifactId> 13 <version>0.0.1-SNAPSHOT</version> 14 <name>demo</name> 15 <description>Demo project for Spring Boot</description> 16 17 <properties> 18 <java.version>1.8</java.version> 19 </properties> 20 21 <dependencies> 22 <dependency> 23 <groupId>org.springframework.boot</groupId> 24 <artifactId>spring-boot-starter-web</artifactId> 25 </dependency> 26 27 <dependency> 28 <groupId>org.projectlombok</groupId> 29 <artifactId>lombok</artifactId> 30 <optional>true</optional> 31 </dependency> 32 33 <dependency> 34 <groupId>org.springframework.boot</groupId> 35 <artifactId>spring-boot-starter-test</artifactId> 36 <scope>test</scope> 37 </dependency> 38 39 <dependency> 40 <groupId>org.springframework.kafka</groupId> 41 <artifactId>spring-kafka</artifactId> 42 <version>2.2.0.RELEASE</version> 43 </dependency> 44 45 <dependency> 46 <groupId>com.google.code.gson</groupId> 47 <artifactId>gson</artifactId> 48 <version>2.8.2</version> 49 </dependency> 50 51 </dependencies> 52 53 <build> 54 <plugins> 55 <plugin> 56 <groupId>org.springframework.boot</groupId> 57 <artifactId>spring-boot-maven-plugin</artifactId> 58 </plugin> 59 </plugins> 60 </build> 61 62 </project>
主要引入了 spring-kafka 、lombok 、 gson 依赖。
相关配置
1 #============== kafka =================== 2 # 指定kafka 代理地址,可以多个 3 spring.kafka.bootstrap-servers=127.0.0.1:9092 4 5 #=============== provider ======================= 6 7 spring.kafka.producer.retries=0 8 # 每次批量发送消息的数量 9 spring.kafka.producer.batch-size=16384 10 spring.kafka.producer.buffer-memory=33554432 11 12 # 指定消息key和消息体的编解码方式 13 spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer 14 spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer 15 16 #=============== consumer ======================= 17 # 指定默认消费者group id 18 spring.kafka.consumer.group-id=test-consumer-group 19 20 spring.kafka.consumer.auto-offset-reset=earliest 21 spring.kafka.consumer.enable-auto-commit=true 22 spring.kafka.consumer.auto-commit-interval=100 23 24 # 指定消息key和消息体的编解码方式 25 spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer 26 spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
创建producer类
KafkaSender通过KafkaTemplate发送消息到topic
1 @Component 2 public class KafkaSender { 3 4 private Logger log = LoggerFactory.getLogger(KafkaSender.class); 5 6 @Resource 7 private KafkaTemplate<String, String> kafkaTemplate; 8 9 private Gson gson = new GsonBuilder().create(); 10 11 //发送消息方法 12 public void send() { 13 Message message = new Message(); 14 message.setId(System.currentTimeMillis()); 15 message.setMsg(UUID.randomUUID().toString()); 16 message.setSendTime(new Date()); 17 log.info("+++++++++++++++++++++ message = {}", gson.toJson(message)); 18 kafkaTemplate.send("test22", gson.toJson(message)); 19 } 20 }
创建consumer类
1 @Component 2 public class KafkaReceiver { 3 private Logger log = LoggerFactory.getLogger(KafkaSender.class); 4 5 @KafkaListener(topics = {"test22"}) 6 public void listen(ConsumerRecord<?, ?> record) { 7 Optional<?> kafkaMessage = Optional.ofNullable(record.value()); 8 if (kafkaMessage.isPresent()) { 9 Object message = kafkaMessage.get(); 10 log.info("----------------- record =" + record); 11 log.info("------------------ message =" + message); 12 } 13 } 14 }
message实体类
1 @Data 2 public class Message { 3 private Long id; 4 5 private String msg; 6 7 private Date sendTime; 8 }
我在这里省略了getter、setter方法。
在启动项目之前,请先启动kafka服务,注意端口号是否一致。kafka相关参见前一篇随笔《windows下安装并使用kafka》
参考:https://blog.csdn.net/tzs_1041218129/article/details/78988439
以上是关于SpringBoot中使用kafka的主要内容,如果未能解决你的问题,请参考以下文章
SpringBoot中使用Kafka报错:Failed to construct kafka consumer