spring-amqp手动停止RabbitListener

Posted 大扑棱蛾子

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了spring-amqp手动停止RabbitListener相关的知识,希望对你有一定的参考价值。

前言

在处理某些业务时,我们可能需要手动停止监听。让消息队列依然可以接收消息,但是先不处理消息。等业务处理完成后再开启监听,处理队列中的消息。所以这里就需要用到RabbitListenerEndpointRegistry中的startstop方法。

停止所有队列

@Autowired
private RabbitListenerEndpointRegistry rabbitListenerEndpointRegistry;

public void stopAll() 
	this.rabbitListenerEndpointRegistry.stop();

这个方法会停止所有队列的监听。

注意官网的这段话
When a container is configured to listen to auto-delete queue(s), or the queue has an x-expires option or the Time-To-Live policy is configured on the Broker, the queue is removed by the broker when the container is stopped (last consumer is cancelled). Before version 1.3, the container could not be restarted because the queue was missing; the RabbitAdmin only automatically redeclares queues etc, when the connection is closed/opens, which does not happen when the container is stopped/started.

如果队列配置的是auto-delete停止监听将删除队列,也就是说一旦停止队列,交换机将无法找到队列,很可能消息会丢失。在1.3版本之前,auto-delete队列停止后是无法重启的,因为队列已经丢失,RabbitAdmin仅在连接打开或关闭的时候重新申明队列。

停止指定的队列

这里要用到rabbitListenerEndpointRegistry.getListenerContainers()方法。源码如下:

	public Collection<MessageListenerContainer> getListenerContainers() 
		return Collections.unmodifiableCollection(this.listenerContainers.values());
	

MessageListenerContainer有下列子类

集成关系如下

AbstractMessageListenerContainer中提供了一个getQueueNames()方法。我们可以通过这个方法获取到监听容器监听了哪些队列。

方法如下:

/**
 * 判断监听器是否监听了指定的队列。
 * @param queueName 队列名称
 * @param listenerContainer 监听容器
 * @return true-监听,false-未监听。
 */
private boolean isQueueListener(String queueName, MessageListenerContainer listenerContainer) 
    if (listenerContainer instanceof AbstractMessageListenerContainer) 
        AbstractMessageListenerContainer abstractMessageListenerContainer = (AbstractMessageListenerContainer) listenerContainer;
        String[] queueNames = abstractMessageListenerContainer.getQueueNames();
        return ArrayUtils.contains(queueNames, queueName);
    
    return false;

停止指定队列

public boolean stop(String queueName) 
    Collection<MessageListenerContainer> listenerContainers = this.rabbitListenerEndpointRegistry.getListenerContainers();
    for (MessageListenerContainer listenerContainer : listenerContainers) 
        if (this.isQueueListener(queueName, listenerContainer)) 
            listenerContainer.stop();
            return true;
        
    
    return false;

重新启用队列

public void start() 
	this.rabbitListenerEndpointRegistry.start();

也可以获取到指定的监听,然后调用start方法,同stop类似。rabbitListenerEndpointRegistry.start()会启动所有监听,如果监听已启动则忽略。

以上是关于spring-amqp手动停止RabbitListener的主要内容,如果未能解决你的问题,请参考以下文章

Spring AMQP 源码分析 06 - 手动消息确认

Spring-AMQP 和直接回复

spring-cloud-sleuth 与 spring-amqp 集成

spring-cloud-sleuth与spring-amqp集成

text 一次玩3次并停止,手动+自动

markdown 如果没有响应,手动强制服务停止