springboot整合kafka,kafka消息过滤

Posted 盖丽男

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了springboot整合kafka,kafka消息过滤相关的知识,希望对你有一定的参考价值。

需求

出于成本考虑,我们只使用了两个topic,测试环境一个,线上一个,然后大家的消息都发到同一个topic里,所以在消费时,就需要实现消息过滤。kafka是没有tag等功能的,所以过滤只能在消费端实现。下面直接上代码

代码

配置文件

spring:
  kafka:
    bootstrap-servers: 127.0.0.1:9092 # kafka连接接地址
    #    client-id: # 发送请求时传给服务器的id
    consumer:
      topic: TEST_XXX
      bootstrap-servers: 127.0.0.1:9092 # 会覆盖spring.kafka.bootstrap-servers 配置
      group-id: TEST_GROUP # 消费者所属消息组
      key-serializer: org.apache.kafka.common.serialization.StringDeserializer # 反序列化key的类
      value-serializer: org.springframework.kafka.support.serializer.JsonDeserializer # 反序列化value的类

消费过滤代码:

    /**
     * 消息过滤
     * @return
     */
    @Bean
    public ConcurrentKafkaListenerContainerFactory filterContainerFactory() 
        ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
        factory.setConsumerFactory(consumerFactory);
        //设置可以丢弃消息  配合RecordFilterStrategy使用
        factory.setAckDiscarded(true);
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);

        factory.setRecordFilterStrategy(new RecordFilterStrategy() 
            @Override
            public boolean filter(ConsumerRecord consumerRecord) 
                String data = (String) consumerRecord.value();
                log.info("filterContainerFactory filter : "+data);
                if (data.indexOf("test") != -1) 
                    return false;
                
                //返回true将会被丢弃
                return true;
            
        );
        return factory;
    

实际消费代码:

 @KafkaListener(topics = "$spring.kafka.consumer.topic",groupId = "$spring.kafka.consumer.group-id",containerFactory = "filterContainerFactory")
    public void consumerDatabase(String data, Acknowledgment ack)
    log.info("group1接收到消息时间:",dateTime);
   //业务处理代码
    //手动提交偏移量
    ack.acknowledge();

原理:消息过滤器

消息过滤器可以在消息抵达监听容器前被拦截,过滤器根据系统业务逻辑去筛选出需要的数据再交由KafkaListener处理。

配置消息只需要为监听容器工厂配置一个RecordFilterStrategy(消息过滤策略),

返回true的时候消息将会被抛弃,返回false时,消息能正常抵达监听容器。

以上是关于springboot整合kafka,kafka消息过滤的主要内容,如果未能解决你的问题,请参考以下文章

springboot整合kafka,kafka消息过滤

springboot整合kafka实现消息推送

SpringBoot系列八:SpringBoot整合消息服务(SpringBoot 整合 ActiveMQSpringBoot 整合 RabbitMQSpringBoot 整合 Kafka)

SpringBoot整合Kafka消息队列并实现发布订阅和消费

SpringBoot进阶教程(六十二)整合Kafka

springboot整合kafka