springboot整合kafka,kafka消息过滤
Posted 盖丽男
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了springboot整合kafka,kafka消息过滤相关的知识,希望对你有一定的参考价值。
需求
出于成本考虑,我们只使用了两个topic,测试环境一个,线上一个,然后大家的消息都发到同一个topic里,所以在消费时,就需要实现消息过滤。kafka是没有tag等功能的,所以过滤只能在消费端实现。下面直接上代码
代码
配置文件
spring:
kafka:
bootstrap-servers: 127.0.0.1:9092 # kafka连接接地址
# client-id: # 发送请求时传给服务器的id
consumer:
topic: TEST_XXX
bootstrap-servers: 127.0.0.1:9092 # 会覆盖spring.kafka.bootstrap-servers 配置
group-id: TEST_GROUP # 消费者所属消息组
key-serializer: org.apache.kafka.common.serialization.StringDeserializer # 反序列化key的类
value-serializer: org.springframework.kafka.support.serializer.JsonDeserializer # 反序列化value的类
消费过滤代码:
/**
* 消息过滤
* @return
*/
@Bean
public ConcurrentKafkaListenerContainerFactory filterContainerFactory()
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory);
//设置可以丢弃消息 配合RecordFilterStrategy使用
factory.setAckDiscarded(true);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
factory.setRecordFilterStrategy(new RecordFilterStrategy()
@Override
public boolean filter(ConsumerRecord consumerRecord)
String data = (String) consumerRecord.value();
log.info("filterContainerFactory filter : "+data);
if (data.indexOf("test") != -1)
return false;
//返回true将会被丢弃
return true;
);
return factory;
实际消费代码:
@KafkaListener(topics = "$spring.kafka.consumer.topic",groupId = "$spring.kafka.consumer.group-id",containerFactory = "filterContainerFactory")
public void consumerDatabase(String data, Acknowledgment ack)
log.info("group1接收到消息时间:",dateTime);
//业务处理代码
//手动提交偏移量
ack.acknowledge();
原理:消息过滤器
消息过滤器可以在消息抵达监听容器前被拦截,过滤器根据系统业务逻辑去筛选出需要的数据再交由KafkaListener处理。
配置消息只需要为监听容器工厂配置一个RecordFilterStrategy(消息过滤策略),
返回true的时候消息将会被抛弃,返回false时,消息能正常抵达监听容器。
以上是关于springboot整合kafka,kafka消息过滤的主要内容,如果未能解决你的问题,请参考以下文章
SpringBoot系列八:SpringBoot整合消息服务(SpringBoot 整合 ActiveMQSpringBoot 整合 RabbitMQSpringBoot 整合 Kafka)