kafka专栏SpringBoot下`@KafkaListener`消费监听属性详解

Posted 字母哥哥

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了kafka专栏SpringBoot下`@KafkaListener`消费监听属性详解相关的知识,希望对你有一定的参考价值。

文章目录

一、@KafkaListener属性概述

KafkaListener有若干的配置属性,这些配置属性使用或者是结合使用,可以方便快捷的帮助我们实现kafka消费者数据监听的需求。这里的属性比较多,先大概了解一下,后续我会介绍。

public @interface KafkaListener 

   /**
    * 消费者的id,如果没有配置或默认生成一个。如果配置了会覆盖groupId,笔者的经验这个配置不需要配
    */
   String id() default "";

   /**
    * 配置一个bean,类型为:org.springframework.kafka.config.KafkaListenerContainerFactory
    */
   String containerFactory() default "";

   /**
    * 三选一:该消费者组监听的Topic名称
    */
   String[] topics() default ;

   /**
    * 三选一:通过为消费者组指定表达式匹配监听多个Topic(笔者从来没用过,也不建议使用)
    */
   String topicPattern() default "";

   /**
    * 三选一:消费组指定监听Topic的若干分区。
    */
   TopicPartition[] topicPartitions() default ;

   /**
    * 没用过,不知道作用
    */
   String containerGroup() default "";

   /**
    * Listener的异常处理器,后续会介绍
    * @since 1.3
    */
   String errorHandler() default "";

   /**
    * 消费者组的分组id
    * @since 1.3
    */
   String groupId() default "";

   /**
    * 设否设置id属性为消费组组id
    * @since 1.3
    */
   boolean idIsGroup() default true;

   /**
    * 消费者组所在客户端的客户端id的前缀,用于kafka客户端分类
    * @since 2.1.1
    */
   String clientIdPrefix() default "";

   /**
    * 用于SpEL表达式,获取当前Listener的配置信息
    * 如获取监听Topic列表的SpEL表达式为 : "#__listener.topicList"
    * @return the pseudo bean name.
    * @since 2.1.2
    */
   String beanRef() default "__listener";

   /**
    * 当前消费者组启动多少了消费者线程,并行执行消费动作
    * @since 2.2
    */
   String concurrency() default "";

   /**
    * 是否自动启动,true or false
    * @since 2.2
    */
   String autoStartup() default "";

   /**
    * Kafka consumer 属性配置,支持所有的apache kafka 消费者属性配置
    * 但不包括group.id 和 client.id 配置属性
    * @since 2.2.4
    */
   String[] properties() default ;

   /**
    * 笔者从来没用过,自己理解下面的这段英文吧
    * When false and the return type is an @link Iterable return the result as the
    * value of a single reply record instead of individual records for each element.
    * Default true. Ignored if the reply is of type @code Iterable<Message<?>>.
    * @return false to create a single reply record.
    * @since 2.3.5
    */
   boolean splitIterables() default true;

   /**
    *  笔者从来没用过,自己理解下面的这段英文吧
    * Set the bean name of a
    * @link org.springframework.messaging.converter.SmartMessageConverter (such as the
    * @link org.springframework.messaging.converter.CompositeMessageConverter) to use
    * in conjunction with the
    * @link org.springframework.messaging.MessageHeaders#CONTENT_TYPE header to perform
    * the conversion to the required type. If a SpEL expression is provided
    * (@code #...), the expression can either evaluate to a
    * @link org.springframework.messaging.converter.SmartMessageConverter instance or a
    * bean name.
    * @return the bean name.
    * @since 2.7.1
    */
   String contentTypeConverter() default "";



二、结合自定义配置是实现监听(常规用法)

通常我们会把消费者监听的主题,消费者组名称,消费者组中消费者数量等常用信息做成自定义配置(而不是在代码中写死),如下所示:

myconsumer:
    topic: topic-a,topic-b
    group-id: group-demo
    concurrency: 5

下面的消费者监听器监听了两个topic:topic-a,topic-b(使用SpEL表达式逗号分割为字符串数组),该消费者组命名为group-demo,包含5个消费者线程并行消费。

@KafkaListener(topics = "#'$myconsumer.topic'.split(',')",
        groupId = "$myconsumer.group-id",
        concurrency="$myconsumer.concurrency")
public void readMsg(ConsumerRecord consumerRecord) 
     //监听到数据之后,进行处理操作

三、指定Topic分区及偏移量进行消费

在某些特殊场景下,希望消费Topic主题中的某几个分区(不是全部分区消费)。或者是针对某个分区从指定的偏移量开始消费。

@KafkaListener(topicPartitions =
         @TopicPartition(topic = "topic-a", partitions =  "0", "1" ),
          @TopicPartition(topic = "topic-b", partitions = "0","4",partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "300"))
        )
public void readMsg(ConsumerRecord<?, ?> record) 
    


上面例子监听topic-a的0,1分区(可能包含不只2个分区);监听topic-b的第0和4分区 ,并且第1分区从offset为300的开始消费;

五、指定监听器工厂(实现消息过滤)

@Configuration
public class KafkaInitialConfiguration 
 
    // 监听器工厂
    @Autowired
    private ConsumerFactory consumerFactory;
 
    // 配置一个消息过滤策略
    @Bean
    public ConcurrentKafkaListenerContainerFactory myFilterContainerFactory() 
        ConcurrentKafkaListenerContainerFactory factory = newConcurrentKafkaListenerContainerFactory();
        factory.setConsumerFactory(consumerFactory);
        // 被过滤的消息将被丢弃
        factory.setAckDiscarded(true);
        //设置消息过滤策略
        factory.setRecordFilterStrategy(new RecordFilterStrategy() 
            @Override
            public boolean filter(ConsumerRecord consumerRecord) 
                //这里做逻辑判断
                //返回true的消息将会被丢弃
                return true;
            
        );
        return factory;
    


使用方法,myFilterContainerFactory是上文中bean方法的名称。

@KafkaListener(containerFactory ="myFilterContainerFactory")

六、properties配置

除了上面提到的一些配置属性,实际上apache kafka consumer支持的原生配置属性,要比Spring 提供的配置属性多得多。所有的apache kafka原生配置属性都可以通过properties配置来传递。

@KafkaListener(properties = "enable.auto.commit:false","max.poll.interval.ms:6000" )

七、注解方式获取消息头及消息体

  • @Payload:获取的是消息的消息体,也就是发送的数据内容
  • @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY):获取发送消息的key
  • @Header(KafkaHeaders.RECEIVED_PARTITION_ID):获取当前消息是从哪个分区中监听到的
  • @Header(KafkaHeaders.RECEIVED_TOPIC):获取监听的TopicName
  • @Header(KafkaHeaders.RECEIVED_TIMESTAMP):获取到数据时间的时间戳
@KafkaListener(topics = "topic-a")
public void  readMsg(@Payload String data,
                         @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) Integer key,
                         @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
                         @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
                         @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts) 


八、消息转发

Spring-Kafka只需要通过一个@SendTo注解即可以实现消息的转发,被注解方法的return值即转发的消息内容:

@Component
public class KafkaConsumer 
    // 消费监听
    @KafkaListener(topics = "topic1")
    @SendTo("topic2")
    public String listen1(String data) 
        System.out.println("业务A收到消息:" + data);
        return data + "(已处理)";
    
 
    // 消费监听
    @KafkaListener(topics = "topic2")
    public void listen2(String data) 
        System.out.println("业务B收到消息:" + data);
    

  • 消息发往topic1,topic1处理完成的数据通过SendTo发往topic2
  • topic2如果收到数据,打印业务B收到消息: + 你发送的消息数据

以上是关于kafka专栏SpringBoot下`@KafkaListener`消费监听属性详解的主要内容,如果未能解决你的问题,请参考以下文章

SpringBoot整合Kafka下

kafka专栏针对kafka的简单介绍

SpringBoot SpringBoot整合Kafka

spring boot怎么启动kafka

springboot 集成kafka

SpringBoot中使用kafka