SpringBoot系列之集成kafka实现事件发布
Posted smileNicky
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SpringBoot系列之集成kafka实现事件发布相关的知识,希望对你有一定的参考价值。
事件发布订阅实现,我们经常使用到spring框架提供的ApplicationEventPublisher
,基于kafka的特性,我们也可以简单实现类似的效果
1、kafka环境部署搭建
官网下载链接:https://kafka.apache.org/downloads,最开始用最新版的,发现在我的win10系统没部署成功,所以还是选择2.8.1版本的
在D:\\kafka_2.12-2.8.1\\bin\\windows,使用cmd命令启动zookeeper,window系统修改conf文件夹下面的zookeeper.properties里面的dataDir
zookeeper-server-start.bat ..\\..\\config\\zookeeper.properties
window系统修改conf文件夹下面的log.dirs路径
kafka-server-start.bat ..\\..\\config\\server.properties
2、kafka常用命令使用
启动另外一个cmd参考,创建一个命令为test-topic的topic
kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test-topic
查看kafkatopic列表
kafka-topics.bat --list --zookeeper localhost:2181
启动kafka的生产者,发送topic数据
kafka-console-producer.bat --broker-list localhost:9092 --topic test-topic
启动一个kafka消费者端,可以接收到消息数据
kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test-topic --from-beginning
3、创建一个kafka starter工程
创建一个工程,实现对kafka的api简单封装
jdk选择jdk8的
选择需要的依赖
基于kafka的EventPublisher
package com.example.ebus.publisher;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationEvent;
import org.springframework.kafka.core.KafkaTemplate;
@Slf4j
public class MyEventPublisher
private KafkaTemplate<String, Object> kafkaTemplate;
@Value("$app.ebus.topic:ebus")
private String topic;
public MyEventPublisher(KafkaTemplate<String, Object> kafkaTemplate)
this.kafkaTemplate = kafkaTemplate;
public void publishEvent(Object event)
if (log.isInfoEnabled())
log.info("topic发送:", event.getClass().getName());
kafkaTemplate.send(topic, event);
自动配置类
package com.example.ebus.configuration;
import com.example.ebus.publisher.MyEventPublisher;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.KafkaTemplate;
@Configuration
public class EbusAutoConfiguration
@Bean
public MyEventPublisher myEventPublisher(@Qualifier("kafkaTemplate") KafkaTemplate<String, Object> kafkaTemplate)
return new MyEventPublisher(kafkaTemplate);
META-INF/spring.factories,加上配置
org.springframework.boot.autoconfigure.EnableAutoConfiguration=com.example.ebus.configuration.EbusAutoConfiguration
4、kafka生产端实现
同样创建一个生产者端工程,引入starter,加上yml配置
spring:
kafka:
bootstrap-servers: 127.0.0.1:9092
producer:
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
写个测试接口测试数据:
package com.example.producer.controller;
import com.example.ebus.event.ShopOrderEvent;
import com.example.ebus.publisher.MyEventPublisher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping(value = "/api")
public class ShopOrderController
@Autowired
private MyEventPublisher eventPublisher;
@PostMapping("/order")
public String placeOrder(@RequestBody ShopOrderEvent orderEvent)
eventPublisher.publishEvent(orderEvent);
return orderEvent.getOrderCode();
"orderCode":"123456",
"productName":"三星手机",
"price":1122,
"productDesc":"三星手机"
5、kafka消费者端实现
同样创建一个消费者工程
spring:
kafka:
bootstrap-servers: 127.0.0.1:9092
consumer:
enable-auto-commit: true
group-id: consumer1
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
max-poll-records: 1
properties:
spring:
json:
trusted:
packages: '*'
进行监听,使用kafka的KafkaListener
package com.example.consumer.handler;
import com.example.ebus.event.ShopOrderEvent;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class OrderListenerHandler
@KafkaListener(topics = "$app.ebus.topic:ebus")
public void obtainTopicData(ShopOrderEvent event)
log.info("下单成功,orderCode:" , event.getOrderCode());
本博客代码例子可以在GitHub找到下载链接
参考资料
以上是关于SpringBoot系列之集成kafka实现事件发布的主要内容,如果未能解决你的问题,请参考以下文章
Debezium系列之:联合主键数据发往kafka topic相同分区
SpringBoot系列之集成logback实现日志打印(篇二)