kafka学习Spring Boot 整合 Kafka
Posted 有梦想的肥宅
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了kafka学习Spring Boot 整合 Kafka相关的知识,希望对你有一定的参考价值。
文章更新时间:2020/06/08
一、创建Spring boot 工程
创建过程不再描述,创建后的工程结构如下:
POM文件中要加入几个依赖:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.1.9.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.zhbf</groupId> <artifactId>springboot</artifactId> <version>0.0.1-SNAPSHOT</version> <name>springboot</name> <description>Demo project for Spring Boot</description> <properties> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <!--引入kafka依赖--> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <!-- 添加 gson 依赖 --> <dependency> <groupId>com.google.code.gson</groupId> <artifactId>gson</artifactId> <version>2.8.5</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
启动SpringbootApplication.java,出现下图界面则说明工程创建好了:
二、创建kafka生产者类,并通过控制器调用
kafka生产者类
/** * Kafka消息生产类 */ @Log @Component public class KafkaProducer { @Resource private KafkaTemplate<String, String> kafkaTemplate; @Value("${kafka.topic.user}") private String topicUser;//topic名称 /** * 发送用户消息 * * @param user 用户信息 */ public void sendUserMessage(User user) { GsonBuilder builder = new GsonBuilder(); builder.setPrettyPrinting(); builder.setDateFormat("yyyy-MM-dd HH:mm:ss"); String message = builder.create().toJson(user); kafkaTemplate.send(topicUser, message); log.info("\\n生产消息至Kafka\\n" + message); } }
application.yml配置文件
启动ZK、kafka通讯的服务器broker,并启动消费者监听
启动方式参考上一篇文章,戳这里~
配置一个控制器,即调用kafka生成消息的入口
/** * 测试控制器 * PS:@RestController 注解: 该注解是 @Controller 和 @ResponseBody 注解的合体版 */ @RestController @RequestMapping("/kafka") public class KafkaController { @Autowired private User user; @Autowired private KafkaProducer kafkaProducer; @RequestMapping("/createMsg") public void createMsg() { kafkaProducer.sendUserMessage(user); } }
启动SpringbootApplication,并通过浏览器访问控制器,生成消息
可以看到控制台和消费者窗口都打印了kafka生成的消息。
三、创建kafka消费者类,并通过控制器调用
kafka消费者类
public class KafkaConsumerDemo { @Value("${kafka.topic.user}") private String topicUser;//topic名称 public void consume() { Properties props = new Properties(); // 必须设置的属性 props.put("bootstrap.servers", "127.0.0.1:9092"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("group.id", "group-user"); // 可选设置属性 //提交方式配置 // 自动提交offset,每1s提交一次(提交后的消息不再消费,避免重复消费问题) props.put("enable.auto.commit", "true");//自动提交offset:true【PS:只有当消息提交后,此消息才不会被再次接受到】 props.put("auto.commit.interval.ms", "1000");//自动提交的间隔 //消费方式配置 /** * earliest: 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费 * latest: 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据 * none: topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常 */ props.put("auto.offset.reset", "earliest ");//earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费 //拉取消息设置 props.put("max.poll.records", "100 ");//每次poll操作最多拉取多少条消息(一般不主动设置,取默认的就好) //根据上面的配置,新增消费者对象 KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); // 订阅topic-user topic consumer.subscribe(Collections.singletonList(topicUser)); while (true) { // 从服务器开始拉取数据 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); records.forEach(record -> { System.out.printf("成功消费消息:topic = %s ,partition = %d,offset = %d, key = %s, value = %s%n", record.topic(), record.partition(), record.offset(), record.key(), record.value()); }); } } }
重启SpringbootApplication,并通过浏览器访问控制器,消费消息
以上是关于kafka学习Spring Boot 整合 Kafka的主要内容,如果未能解决你的问题,请参考以下文章