SpringBoot框架下RabbitMQ消息接受功能的实现
Posted pipape
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SpringBoot框架下RabbitMQ消息接受功能的实现相关的知识,希望对你有一定的参考价值。
文章目录
一.概念
RabbitMQ是一个开源的实现AMQP的产品,SpringBoot为了实现RabbitMQ客户端的功能,提供了名字为spring-boot-starter-amqp的starter,开发者在pom或gradle文件中添加这个starter的依赖即可。
二.如何实现接受功能
为实现接受消息的功能,需要向springboot框架注册处理消息的方法,框架为此提供了两种利用java代码注册的方式
1.编程式注册处理消息。注册一个bean,实现RabbitListenerConfigurer接口
@Configuration
public class RabbitListenerConfig
return new RabbitListenerConfigurer()
@Override
public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar)
SimpleRabbitListenerEndpoint endpoint = new SimpleRabbitListenerEndpoint();
endpoint.setQueueNames("myQueue1", "myQueue2");
endpoint.setConcurrency("2");
endpoint.setMessageListener(message ->
System.out.println("收到消息"+new String(message.getBody()));
);
registrar.registerEndpoint(endpoint);
2. 注册式处理消息的方法。使用@RabbitListener注解,在注册参数中设置需要监听的队列名字
@Component
public class MyRabbitReceiver
@RabbitListener(queues = "myQueue1", "myQueue2", concurrency = "2")
public void processMessage(Message message)
System.out.println("收到消息"+new String(message.getBody()));
三.为什么能实现接收功能
-
springboot项目启动时,默认扫描根目录下的META-INF/spring.factory文件,然后找到org.springframework.boot.autoconfigure.EnableAutoConfiguration对应的加载类,文件部分内容如下
文件部分内容如下:
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\\ org.springframework.boot.autoconfigure.admin.SpringApplicationAdminJmxAutoConfiguration,\\ org.springframework.boot.autoconfigure.aop.AopAutoConfiguration,\\ org.springframework.boot.autoconfigure.amqp.RabbitAutoConfiguration,\\
可以看到框架会自动加载RabbitAutoConfiguration类
-
RabbitAutoConfiguration伪代码如下:
@Configuration //表示这个是一个配置类 @ConditionalOnClass(RabbitTemplate.class, Channel.class) //表示classpath是否存在RabbitTemplate.class, Channel.class @EnableConfigurationProperties(RabbitProperties.class) // 根据配置初始化RabbitProperties.class @Import(RabbitAnnotationDrivenConfiguration.class) // 引入RabbitAnnotationDrivenConfiguration.class,用于初始化配置消息接收功能 public class RabbitAutoConfiguration //初始化ConnectionFactory bean @Configuration @ConditionalOnMissingBean(ConnectionFactory.class) protected static class RabbitConnectionFactoryCreator //初始化rabbitTemplate, RabbitAdmin bean @Configuration @Import(RabbitAutoConfiguration.RabbitConnectionFactoryCreator.class) protected static class RabbitTemplateConfiguration //初始化RabbitMessagingTemplate bean @Configuration @ConditionalOnClass(RabbitMessagingTemplate.class) @ConditionalOnMissingBean(RabbitMessagingTemplate.class) @Import(RabbitAutoConfiguration.RabbitTemplateConfiguration.class) protected static class MessagingTemplateConfiguration
- 初始化ConnectionFactory
- 初始化rabbitTemplate、rabbitAdmin
- 初始化rabbitMessagingTemplate
RabbitAutoConfiguration还引入RabbitAnnotationDrivenConfiguration类
-
RabbitAnnotationDrivenConfiguration
@Configuration( proxyBeanMethods = false ) @ConditionalOnClass(EnableRabbit.class) class RabbitAnnotationDrivenConfiguration @Bean( name = "rabbitListenerContainerFactory" ) @ConditionalOnMissingBean( name = "rabbitListenerContainerFactory" ) @ConditionalOnProperty( prefix = "spring.rabbitmq.listener", name = "type", havingValue = "simple", matchIfMissing = true ) SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory, ObjectProvider<ContainerCustomizer<SimpleMessageListenerContainer>> simpleContainerCustomizer) @Configuration( proxyBeanMethods = false ) @EnableRabbit @ConditionalOnMissingBean( name = "org.springframework.amqp.rabbit.config.internalRabbitListenerAnnotationProcessor" ) static class EnableRabbitConfiguration EnableRabbitConfiguration()
添加了@EnableRabbit注解
-
@EnableRabbit注解中通过导入RabbitListenerConfigurationSelector后导入了RabbitBootstrapConfiguration
RabbitBootstrapConfiguration中想容器中添加了RabbitListenerAnnotationBeanPostProcessor实例
-
RabbitListenerAnnotationBeanPostProcessor
-
实现了SmartInitializingSingleton的接口,在IOC容器实例加载完成后,会调用afterSingletonsInstantiated方法,对应了编程式的消息接受实现
public void afterSingletonsInstantiated() this.registrar.setBeanFactory(this.beanFactory); if (this.beanFactory instanceof ListableBeanFactory) Map<String, RabbitListenerConfigurer> instances = ((ListableBeanFactory)this.beanFactory).getBeansOfType(RabbitListenerConfigurer.class); Iterator var2 = instances.values().iterator(); while(var2.hasNext()) RabbitListenerConfigurer configurer = (RabbitListenerConfigurer)var2.next(); configurer.configureRabbitListeners(this.registrar);
-
实现了BeanPostProcessor的postProcessAfterInitialization方法,在每个实例初始化完成后会调用postProcessAfterInitialization方法。
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException Class<?> targetClass = AopUtils.getTargetClass(bean); RabbitListenerAnnotationBeanPostProcessor.TypeMetadata metadata = (RabbitListenerAnnotationBeanPostProcessor.TypeMetadata)this.typeCache.computeIfAbsent(targetClass, this::buildMetadata); RabbitListenerAnnotationBeanPostProcessor.ListenerMethod[] var5 = metadata.listenerMethods; int var6 = var5.length; for(int var7 = 0; var7 < var6; ++var7) RabbitListenerAnnotationBeanPostProcessor.ListenerMethod lm = var5[var7]; RabbitListener[] var9 = lm.annotations; int var10 = var9.length; for(int var11 = 0; var11 < var10; ++var11) RabbitListener rabbitListener = var9[var11]; this.processAmqpListener(rabbitListener, lm.method, bean, beanName); if (metadata.handlerMethods.length > 0) this.processMultiMethodListeners(metadata.classAnnotations, metadata.handlerMethods, bean, beanName); return bean;
-
至此mq消息接受功能的实现完成。
后续将会对Springboot和Rabbit如何建立connection,建立多少个connection,connection和channel的关系,接受功能中相关参数的意义及实现,接受消息的方法是否需要考虑线程安全问题,以及AMQP协议的深入学习!
以上是关于SpringBoot框架下RabbitMQ消息接受功能的实现的主要内容,如果未能解决你的问题,请参考以下文章
SpringBoot:实现RabbitMQ消息收发(TopicExchange模式)