kafka专栏SpringBoot下`@KafkaListener`消费监听属性详解
Posted 字母哥哥
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了kafka专栏SpringBoot下`@KafkaListener`消费监听属性详解相关的知识,希望对你有一定的参考价值。
文章目录
一、@KafkaListener
属性概述
KafkaListener有若干的配置属性,这些配置属性使用或者是结合使用,可以方便快捷的帮助我们实现kafka消费者数据监听的需求。这里的属性比较多,先大概了解一下,后续我会介绍。
public @interface KafkaListener
/**
* 消费者的id,如果没有配置或默认生成一个。如果配置了会覆盖groupId,笔者的经验这个配置不需要配
*/
String id() default "";
/**
* 配置一个bean,类型为:org.springframework.kafka.config.KafkaListenerContainerFactory
*/
String containerFactory() default "";
/**
* 三选一:该消费者组监听的Topic名称
*/
String[] topics() default ;
/**
* 三选一:通过为消费者组指定表达式匹配监听多个Topic(笔者从来没用过,也不建议使用)
*/
String topicPattern() default "";
/**
* 三选一:消费组指定监听Topic的若干分区。
*/
TopicPartition[] topicPartitions() default ;
/**
* 没用过,不知道作用
*/
String containerGroup() default "";
/**
* Listener的异常处理器,后续会介绍
* @since 1.3
*/
String errorHandler() default "";
/**
* 消费者组的分组id
* @since 1.3
*/
String groupId() default "";
/**
* 设否设置id属性为消费组组id
* @since 1.3
*/
boolean idIsGroup() default true;
/**
* 消费者组所在客户端的客户端id的前缀,用于kafka客户端分类
* @since 2.1.1
*/
String clientIdPrefix() default "";
/**
* 用于SpEL表达式,获取当前Listener的配置信息
* 如获取监听Topic列表的SpEL表达式为 : "#__listener.topicList"
* @return the pseudo bean name.
* @since 2.1.2
*/
String beanRef() default "__listener";
/**
* 当前消费者组启动多少了消费者线程,并行执行消费动作
* @since 2.2
*/
String concurrency() default "";
/**
* 是否自动启动,true or false
* @since 2.2
*/
String autoStartup() default "";
/**
* Kafka consumer 属性配置,支持所有的apache kafka 消费者属性配置
* 但不包括group.id 和 client.id 配置属性
* @since 2.2.4
*/
String[] properties() default ;
/**
* 笔者从来没用过,自己理解下面的这段英文吧
* When false and the return type is an @link Iterable return the result as the
* value of a single reply record instead of individual records for each element.
* Default true. Ignored if the reply is of type @code Iterable<Message<?>>.
* @return false to create a single reply record.
* @since 2.3.5
*/
boolean splitIterables() default true;
/**
* 笔者从来没用过,自己理解下面的这段英文吧
* Set the bean name of a
* @link org.springframework.messaging.converter.SmartMessageConverter (such as the
* @link org.springframework.messaging.converter.CompositeMessageConverter) to use
* in conjunction with the
* @link org.springframework.messaging.MessageHeaders#CONTENT_TYPE header to perform
* the conversion to the required type. If a SpEL expression is provided
* (@code #...), the expression can either evaluate to a
* @link org.springframework.messaging.converter.SmartMessageConverter instance or a
* bean name.
* @return the bean name.
* @since 2.7.1
*/
String contentTypeConverter() default "";
二、结合自定义配置是实现监听(常规用法)
通常我们会把消费者监听的主题,消费者组名称,消费者组中消费者数量等常用信息做成自定义配置(而不是在代码中写死),如下所示:
myconsumer:
topic: topic-a,topic-b
group-id: group-demo
concurrency: 5
下面的消费者监听器监听了两个topic:topic-a,topic-b(使用SpEL表达式逗号分割为字符串数组),该消费者组命名为group-demo,包含5个消费者线程并行消费。
@KafkaListener(topics = "#'$myconsumer.topic'.split(',')",
groupId = "$myconsumer.group-id",
concurrency="$myconsumer.concurrency")
public void readMsg(ConsumerRecord consumerRecord)
//监听到数据之后,进行处理操作
三、指定Topic分区及偏移量进行消费
在某些特殊场景下,希望消费Topic主题中的某几个分区(不是全部分区消费)。或者是针对某个分区从指定的偏移量开始消费。
@KafkaListener(topicPartitions =
@TopicPartition(topic = "topic-a", partitions = "0", "1" ),
@TopicPartition(topic = "topic-b", partitions = "0","4",partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "300"))
)
public void readMsg(ConsumerRecord<?, ?> record)
上面例子监听topic-a
的0,1分区(可能包含不只2个分区);监听topic-b
的第0和4分区 ,并且第1分区从offset为300的开始消费;
五、指定监听器工厂(实现消息过滤)
@Configuration
public class KafkaInitialConfiguration
// 监听器工厂
@Autowired
private ConsumerFactory consumerFactory;
// 配置一个消息过滤策略
@Bean
public ConcurrentKafkaListenerContainerFactory myFilterContainerFactory()
ConcurrentKafkaListenerContainerFactory factory = newConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory);
// 被过滤的消息将被丢弃
factory.setAckDiscarded(true);
//设置消息过滤策略
factory.setRecordFilterStrategy(new RecordFilterStrategy()
@Override
public boolean filter(ConsumerRecord consumerRecord)
//这里做逻辑判断
//返回true的消息将会被丢弃
return true;
);
return factory;
使用方法,myFilterContainerFactory是上文中bean方法的名称。
@KafkaListener(containerFactory ="myFilterContainerFactory")
六、properties配置
除了上面提到的一些配置属性,实际上apache kafka consumer支持的原生配置属性,要比Spring 提供的配置属性多得多。所有的apache kafka原生配置属性都可以通过properties配置来传递。
@KafkaListener(properties = "enable.auto.commit:false","max.poll.interval.ms:6000" )
七、注解方式获取消息头及消息体
- @Payload:获取的是消息的消息体,也就是发送的数据内容
- @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY):获取发送消息的key
- @Header(KafkaHeaders.RECEIVED_PARTITION_ID):获取当前消息是从哪个分区中监听到的
- @Header(KafkaHeaders.RECEIVED_TOPIC):获取监听的TopicName
- @Header(KafkaHeaders.RECEIVED_TIMESTAMP):获取到数据时间的时间戳
@KafkaListener(topics = "topic-a")
public void readMsg(@Payload String data,
@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) Integer key,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts)
八、消息转发
Spring-Kafka只需要通过一个@SendTo注解即可以实现消息的转发,被注解方法的return值即转发的消息内容:
@Component
public class KafkaConsumer
// 消费监听
@KafkaListener(topics = "topic1")
@SendTo("topic2")
public String listen1(String data)
System.out.println("业务A收到消息:" + data);
return data + "(已处理)";
// 消费监听
@KafkaListener(topics = "topic2")
public void listen2(String data)
System.out.println("业务B收到消息:" + data);
- 消息发往topic1,topic1处理完成的数据通过SendTo发往topic2
- topic2如果收到数据,打印
业务B收到消息:
+ 你发送的消息数据
以上是关于kafka专栏SpringBoot下`@KafkaListener`消费监听属性详解的主要内容,如果未能解决你的问题,请参考以下文章