springboot整合kafka实现消息推送
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了springboot整合kafka实现消息推送相关的知识,希望对你有一定的参考价值。
参考技术A 本篇文章主要介绍的是springboot整合kafka。1.使用docker安装kafka,移步 https://www.jianshu.com/p/89b19f5b28ec
创建一个名为springboot-kafka-common的微服务,打包方式为jar,存放一些公共配置和公共类,如util等
1.配置pom文件
pom文件中以父工程作为父依赖,就不需要额外引入依赖了
2.新建一个user实体类
3.创建application-common.yml配置文件,主要添加kafka的公共配置
1.pom文件配置
2.application.yml配置文件,配置端口,设置微服务名称,引入公共服务模块中的application-common.yml
3.controller层
创建UserController
4.service层
创建UserService
创建UserServiceImpl
5.创建启动类
1.pom文件
2.创建yml配置文件
3.创建consumer消费者类
4.启动类
启动producer和consumer两个服务模块
访问producer微服务中的接口 http://localhost:8081/api/user/getUser
会发现consumer微服务中的控制台打印了producer中创建并推送过来的的user实体
本文GitHub源码: https://github.com/lixianguo5097/springboot/tree/master/springboot-kafka
CSDN: https://blog.csdn.net/qq_27682773
: https://www.jianshu.com/u/e99381e6886e
博客园: https://www.cnblogs.com/lixianguo
个人博客: https://www.lxgblog.com `
springboot整合kafka,kafka消息过滤
需求
出于成本考虑,我们只使用了两个topic,测试环境一个,线上一个,然后大家的消息都发到同一个topic里,所以在消费时,就需要实现消息过滤。kafka是没有tag等功能的,所以过滤只能在消费端实现。下面直接上代码
代码
配置文件
spring:
kafka:
bootstrap-servers: 127.0.0.1:9092 # kafka连接接地址
# client-id: # 发送请求时传给服务器的id
consumer:
topic: TEST_XXX
bootstrap-servers: 127.0.0.1:9092 # 会覆盖spring.kafka.bootstrap-servers 配置
group-id: TEST_GROUP # 消费者所属消息组
key-serializer: org.apache.kafka.common.serialization.StringDeserializer # 反序列化key的类
value-serializer: org.springframework.kafka.support.serializer.JsonDeserializer # 反序列化value的类
消费过滤代码:
/**
* 消息过滤
* @return
*/
@Bean
public ConcurrentKafkaListenerContainerFactory filterContainerFactory()
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory);
//设置可以丢弃消息 配合RecordFilterStrategy使用
factory.setAckDiscarded(true);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
factory.setRecordFilterStrategy(new RecordFilterStrategy()
@Override
public boolean filter(ConsumerRecord consumerRecord)
String data = (String) consumerRecord.value();
log.info("filterContainerFactory filter : "+data);
if (data.indexOf("test") != -1)
return false;
//返回true将会被丢弃
return true;
);
return factory;
实际消费代码:
@KafkaListener(topics = "$spring.kafka.consumer.topic",groupId = "$spring.kafka.consumer.group-id",containerFactory = "filterContainerFactory")
public void consumerDatabase(String data, Acknowledgment ack)
log.info("group1接收到消息时间:",dateTime);
//业务处理代码
//手动提交偏移量
ack.acknowledge();
原理:消息过滤器
消息过滤器可以在消息抵达监听容器前被拦截,过滤器根据系统业务逻辑去筛选出需要的数据再交由KafkaListener处理。
配置消息只需要为监听容器工厂配置一个RecordFilterStrategy(消息过滤策略),
返回true的时候消息将会被抛弃,返回false时,消息能正常抵达监听容器。
以上是关于springboot整合kafka实现消息推送的主要内容,如果未能解决你的问题,请参考以下文章
SpringBoot整合Kafka消息队列并实现发布订阅和消费