源码解析: Spring RabbitMQ消费者
Posted Kenny.Wu
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了源码解析: Spring RabbitMQ消费者相关的知识,希望对你有一定的参考价值。
从Spring RabbitMQ消费者启动,到接收消息和执行消费逻辑,一步步了解其实现。
目录
1. 消费者如何启动过程
1.1 启动配置类
创建RabbitListenerAnnotationBeanPostProcessor
@Configuration
public class RabbitBootstrapConfiguration
@Bean(name = RabbitListenerConfigUtils.RABBIT_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public RabbitListenerAnnotationBeanPostProcessor rabbitListenerAnnotationProcessor()
return new RabbitListenerAnnotationBeanPostProcessor();
.....
1.2 创建消费者核心逻辑
核心逻辑在RabbitListenerAnnotationBeanPostProcessor,在Spring Bean初始化过程中执行。
对于每个消息监听都会创建对应的MessageListenerContainer(默认实现为SimpleMessageListenerContainer)
// 通过BeanPostProcessor在Bean创建后,创建消息监听器
public class RabbitListenerAnnotationBeanPostProcessor
implements BeanPostProcessor, Ordered, BeanFactoryAware, BeanClassLoaderAware, EnvironmentAware,
SmartInitializingSingleton
......
@Override
public Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException
Class<?> targetClass = AopUtils.getTargetClass(bean);
// 通过反射获取@RabbitListener修饰的方法
final TypeMetadata metadata = this.typeCache.computeIfAbsent(targetClass, this::buildMetadata);
for (ListenerMethod lm : metadata.listenerMethods)
for (RabbitListener rabbitListener : lm.annotations)
// 创建MethodRabbitListenerEndpoint,并注册到RabbitListenerEndpointRegistrar
processAmqpListener(rabbitListener, lm.method, bean, beanName);
if (metadata.handlerMethods.length > 0)
processMultiMethodListeners(metadata.classAnnotations, metadata.handlerMethods, bean, beanName);
return bean;
protected void processAmqpListener(RabbitListener rabbitListener, Method method, Object bean, String beanName)
Method methodToUse = checkProxy(method, bean);
MethodRabbitListenerEndpoint endpoint = new MethodRabbitListenerEndpoint();
endpoint.setMethod(methodToUse);
processListener(endpoint, rabbitListener, bean, methodToUse, beanName);
// 创建RabbitMQ消费者核心逻辑
protected void processListener(MethodRabbitListenerEndpoint endpoint, RabbitListener rabbitListener, Object bean,
Object adminTarget, String beanName)
endpoint.setBean(bean);
endpoint.setMessageHandlerMethodFactory(this.messageHandlerMethodFactory);
endpoint.setId(getEndpointId(rabbitListener));
// resolveQueues方法会处理创建队列的工作
endpoint.setQueueNames(resolveQueues(rabbitListener));
.......
// registerEndpoint()里核心创建MessageListenerContainer,其默认实现是SimpleMessageListenerContainer
this.registrar.registerEndpoint(endpoint, factory);
......
1.3 PS: BeanPostPorcessor如何被Spring处理?
虽然大家都很熟悉Spring Bean初始化流程里,但唠叨一下
调用链路:getBean -> doGetBean -> createBean -> initializeBean
->applyBeanPostProcessorsBeforeInitialization -> applyBeanPostProcessorsAfterInitialization
public abstract class AbstractAutowireCapableBeanFactory extends AbstractBeanFactory
implements AutowireCapableBeanFactory
// 创建一个Bean实例对象,应用post-processors
protected Object createBean(String beanName, RootBeanDefinition mbd, Object[] args) throws BeanCreationException
// 各种准备工作
......
// 最后调用doCreateBean
Object beanInstance = doCreateBean(beanName, mbdToUse, args);
if (logger.isDebugEnabled())
logger.debug("Finished creating instance of bean '" + beanName + "'");
return beanInstance;
protected Object doCreateBean(final String beanName, final RootBeanDefinition mbd, final Object[] args)
throws BeanCreationException
......
// Initialize the bean instance.
Object exposedObject = bean;
try
populateBean(beanName, mbd, instanceWrapper);
if (exposedObject != null)
// 调用initializeBean
exposedObject = initializeBean(beanName, exposedObject, mbd);
catch (Throwable ex)
.....
// 初始化Bean实例
protected Object initializeBean(final String beanName, final Object bean, RootBeanDefinition mbd)
......
if (mbd == null || !mbd.isSynthetic())
wrappedBean = applyBeanPostProcessorsBeforeInitialization(wrappedBean, beanName);
try
invokeInitMethods(beanName, wrappedBean, mbd);
catch (Throwable ex)
throw new BeanCreationException(
(mbd != null ? mbd.getResourceDescription() : null),
beanName, "Invocation of init method failed", ex);
if (mbd == null || !mbd.isSynthetic())
wrappedBean = applyBeanPostProcessorsAfterInitialization(wrappedBean, beanName);
return wrappedBean;
2. RabbitMQ消息如何被消费
2.1 SimpleMessageListenerContainer
上面说了消费者启动会创建SimpleMessageListenerContainer,它启动时会创建一个AsyncMessageProcessingConsumer内部类的对象(实现了Runnable接口,核心属性是BlockingQueueConsumer),AsyncMessageProcessingConsumer的run()通过while循环不断接收消息并调用我们使用@RabbitListener修饰的方法实现的消费逻辑。
@Override
protected void doStart() throws Exception
......
super.doStart();
synchronized (this.consumersMonitor)
if (this.consumers != null)
throw new IllegalStateException("A stopped container should not have consumers");
// 根据配置的并发数创建对应数量BlockingQueueConsumer
int newConsumers = initializeConsumers();
......
Set<AsyncMessageProcessingConsumer> processors = new HashSet<AsyncMessageProcessingConsumer>();
for (BlockingQueueConsumer consumer : this.consumers)
AsyncMessageProcessingConsumer processor = new AsyncMessageProcessingConsumer(consumer);
processors.add(processor);
// 执行AsyncMessageProcessingConsumer,轮询调用获取队列里的消息并执行消费逻辑
getTaskExecutor().execute(processor);
if (getApplicationEventPublisher() != null)
getApplicationEventPublisher().publishEvent(new AsyncConsumerStartedEvent(this, consumer));
for (AsyncMessageProcessingConsumer processor : processors)
FatalListenerStartupException startupException = processor.getStartupException();
if (startupException != null)
throw new AmqpIllegalStateException("Fatal exception on listener startup", startupException);
2.2 BlockingQueueConsumer
BlockingQueueConsumer扮演一个解耦消息接收和消息消费的角色,一方面负责承接Channel接收的消息并压入BlockingQueue queue,另一方面被AsyncMessageProcessingConsumer轮询调用获取队列里的消息并执行消费逻辑。
// 从队列中获取消息
public Message nextMessage(long timeout) throws InterruptedException, ShutdownSignalException
......
Message message = handle(this.queue.poll(timeout, TimeUnit.MILLISECONDS));
if (message == null && this.cancelled.get())
throw new ConsumerCancelledException();
return message;
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException
......
try
// 如果BlockingQueueConsumer已被标记为停止,调用offer将消息入队,如果队列满了会马上返回false
if (BlockingQueueConsumer.this.abortStarted > 0)
//如果offer失败,发送basic.nack命令通知服务端消息没有消费成功,然后发送basic.cancel命令通知服务端取消订阅,服务端不再发送消息到该消费者
if (!BlockingQueueConsumer.this.queue.offer(
new Delivery(consumerTag, envelope, properties, body, this.queue),
BlockingQueueConsumer.this.shutdownTimeout, TimeUnit.MILLISECONDS))
RabbitUtils.setPhysicalCloseRequired(getChannel(), true);
// Defensive - should never happen
BlockingQueueConsumer.this.queue.clear();
getChannel().basicNack(envelope.getDeliveryTag(), true, true);
getChannel().basicCancel(consumerTag);
try
getChannel().close();
catch (TimeoutException e)
// no-op
else
// 如果BlockingQueueConsumer没有标记为停止,调用put入队,如果队列空间满了则会一直等待直到空间可用
BlockingQueueConsumer.this.queue
.put(new Delivery(consumerTag, envelope, properties, body, this.queue));
catch (InterruptedException e)
Thread.currentThread().interrupt();
以上是关于源码解析: Spring RabbitMQ消费者的主要内容,如果未能解决你的问题,请参考以下文章
Spring Boot 实现 RabbitMQ 延迟消费和延迟重试队列
Spring Cloud Stream 消息驱动 RabbitMQ 基础使用
使用 Spring Cloud Stream 将 RabbitMQ 消费者绑定到现有队列