springboot整合kafka实现消息推送

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了springboot整合kafka实现消息推送相关的知识,希望对你有一定的参考价值。

参考技术A 本篇文章主要介绍的是springboot整合kafka。

1.使用docker安装kafka,移步 https://www.jianshu.com/p/89b19f5b28ec

创建一个名为springboot-kafka-common的微服务,打包方式为jar,存放一些公共配置和公共类,如util等
1.配置pom文件

pom文件中以父工程作为父依赖,就不需要额外引入依赖了
2.新建一个user实体类

3.创建application-common.yml配置文件,主要添加kafka的公共配置

1.pom文件配置

2.application.yml配置文件,配置端口,设置微服务名称,引入公共服务模块中的application-common.yml

3.controller层
创建UserController

4.service层
创建UserService

创建UserServiceImpl

5.创建启动类

1.pom文件

2.创建yml配置文件

3.创建consumer消费者类

4.启动类

启动producer和consumer两个服务模块
访问producer微服务中的接口 http://localhost:8081/api/user/getUser
会发现consumer微服务中的控制台打印了producer中创建并推送过来的的user实体

本文GitHub源码: https://github.com/lixianguo5097/springboot/tree/master/springboot-kafka

CSDN: https://blog.csdn.net/qq_27682773
: https://www.jianshu.com/u/e99381e6886e
博客园: https://www.cnblogs.com/lixianguo
个人博客: https://www.lxgblog.com `

springboot整合kafka,kafka消息过滤

需求

出于成本考虑,我们只使用了两个topic,测试环境一个,线上一个,然后大家的消息都发到同一个topic里,所以在消费时,就需要实现消息过滤。kafka是没有tag等功能的,所以过滤只能在消费端实现。下面直接上代码

代码

配置文件

spring:
  kafka:
    bootstrap-servers: 127.0.0.1:9092 # kafka连接接地址
    #    client-id: # 发送请求时传给服务器的id
    consumer:
      topic: TEST_XXX
      bootstrap-servers: 127.0.0.1:9092 # 会覆盖spring.kafka.bootstrap-servers 配置
      group-id: TEST_GROUP # 消费者所属消息组
      key-serializer: org.apache.kafka.common.serialization.StringDeserializer # 反序列化key的类
      value-serializer: org.springframework.kafka.support.serializer.JsonDeserializer # 反序列化value的类

消费过滤代码:

    /**
     * 消息过滤
     * @return
     */
    @Bean
    public ConcurrentKafkaListenerContainerFactory filterContainerFactory() 
        ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
        factory.setConsumerFactory(consumerFactory);
        //设置可以丢弃消息  配合RecordFilterStrategy使用
        factory.setAckDiscarded(true);
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);

        factory.setRecordFilterStrategy(new RecordFilterStrategy() 
            @Override
            public boolean filter(ConsumerRecord consumerRecord) 
                String data = (String) consumerRecord.value();
                log.info("filterContainerFactory filter : "+data);
                if (data.indexOf("test") != -1) 
                    return false;
                
                //返回true将会被丢弃
                return true;
            
        );
        return factory;
    

实际消费代码:

 @KafkaListener(topics = "$spring.kafka.consumer.topic",groupId = "$spring.kafka.consumer.group-id",containerFactory = "filterContainerFactory")
    public void consumerDatabase(String data, Acknowledgment ack)
    log.info("group1接收到消息时间:",dateTime);
   //业务处理代码
    //手动提交偏移量
    ack.acknowledge();

原理:消息过滤器

消息过滤器可以在消息抵达监听容器前被拦截,过滤器根据系统业务逻辑去筛选出需要的数据再交由KafkaListener处理。

配置消息只需要为监听容器工厂配置一个RecordFilterStrategy(消息过滤策略),

返回true的时候消息将会被抛弃,返回false时,消息能正常抵达监听容器。

以上是关于springboot整合kafka实现消息推送的主要内容,如果未能解决你的问题,请参考以下文章

SpringBoot整合Kafka消息队列并实现发布订阅和消费

springboot整合kafka,kafka消息过滤

springboot整合kafka,kafka消息过滤

SpringBoot开发案例之整合Kafka实现消息队列

实用水文篇--SpringBoot整合Netty实现消息推送服务器

Springboot 整合Kafka 实现手动提交 offset