Java中动态声明与绑定Rabbit MQ队列以及延迟队列的实现与使用
Posted YQS_Love
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java中动态声明与绑定Rabbit MQ队列以及延迟队列的实现与使用相关的知识,希望对你有一定的参考价值。
一 概述
通常,我们会通过Spring schemal来配置队列基础设施、队列声明以及绑定等功能,这样让我们能够很方便的通过Spring注入对象去使用它,但有不足之处就是,在项目中如果我们想要多个队列去分担不同的任务,此时我们不得不创建很多不同的Rabbit MQ Spring schemal,那么这种做法就显得太过繁琐与笨重了。反之,在Java代码里动态的去声明和绑定队列,就会方便很多了,而在schemal中我们只需引入Rabbit MQ的相关配置即可。本篇博客会讲解如何在Java代码中动态的声明与绑定队列以及延迟队列的实现。
注意:
- 本篇博客介绍的是Java语言下的使用,客户端使用的是Spring AMQP,版本为1.7.7(详情见pom.xml);
- 本篇博客不使用Rabbit MQ的数据对象转化,如有需要须自行实现;
- 代码中声明的Exchange都为D型的,如果需要别的类型,可自行抽取代码。
二 配置Rabbit MQ
- pom.xm 引入Spring AMQP
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>1.7.7.RELEASE</version>
</dependency>
- 在 config.properties(一个自定义的属性配置文件)中配置Rabbit MQ相关,需要在Spring的schemal 中导入
# Rabbit MQ
rabbit.username=test
rabbit.password=123123
rabbit.port=5672
rabbit.host=192.168.30.218
rabbit.virtual.host=/rabbit
- 编写applicationContext-rabbitmq.xml schemal
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd">
<rabbit:connection-factory id="rabbitMqConnectionFactory"
host="$rabbit.host" port="$rabbit.port"
username="$rabbit.username"
password="$rabbit.password"
virtual-host="$rabbit.virtual.host"
channel-cache-size="300"
publisher-confirms="true"/>
<rabbit:template id="rabbitAmqpTemplate" connection-factory="rabbitMqConnectionFactory"/>
<rabbit:admin id="rabbitAdmin" connection-factory="rabbitMqConnectionFactory"/>
</beans>
- 抽取生产者和消费者公共配置接口
IRabbitMqConfig
package com.bell.rabbitmq;
/**
* @Author: yqs
* @Date: 2019/1/25
* @Time: 18:33
* Copyright © Bell All Rights Reserved.
*/
public interface IRabbitMqConfig
/**
* queue name
*
* @return
*/
String queueName();
/**
* queue exchange name
*
* @return
*/
String queueExchangeName();
/**
* queue route key
*
* @return
*/
String queueRouteKey();
- 抽取生产者和消费者公共配置抽象类
AbstractRabbitMqBase
并实现IRabbitMqConfig
接口,但在抽象类型不实现IRabbitMqConfig
接口
package com.bell.rabbitmq;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
/**
* @Author: yqs
* @Date: 2019/1/25
* @Time: 18:37
* Copyright © Bell All Rights Reserved.
*/
public abstract class AbstractRabbitMqBase implements IRabbitMqConfig
@Resource
private RabbitAdmin rabbitAdmin;
@Resource
private RabbitTemplate rabbitAmqpTemplate;
@PostConstruct
private void init()
Queue queue = new Queue(queueName());
DirectExchange exchange = new DirectExchange(queueExchangeName());
rabbitAdmin.declareQueue(queue);
rabbitAdmin.declareExchange(exchange);
rabbitAdmin.declareBinding(BindingBuilder.bind(queue).to(exchange).with(queueRouteKey()));
/**
* 发布字符串信息到队列中
*
* @param
*/
protected void publishMessage(String message)
rabbitAmqpTemplate.convertAndSend(queueExchangeName(), queueRouteKey(), message);
/**
* 发布Message对象信息到队列中
*
* @param message
*/
protected void publishMessage(Message message)
rabbitAmqpTemplate.send(queueExchangeName(), queueRouteKey(), message);
- 抽取消费者公共抽象类并继承
AbstractRabbitMqBase
,同时实现ChannelAwareMessageListener
,此处我们使用ChannelAwareMessageListener
去接收消息,除此之外,我们需要在增加一个抽象方法getConsumerCount()
,用于配置要启用多少个消费者,同时需要实现onDestroy()
方法,在类销毁时去断开与MQ服务器的链接,而不是异常退出,保证消息不丢失或被正常ACK
package com.bell.rabbitmq;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.annotation.Resource;
import java.util.Objects;
/**
* @Author: yqs
* @Date: 2019/1/25
* @Time: 18:50
* Copyright © Bell All Rights Reserved.
*/
public abstract class AbstractRabbitMqConsumer extends AbstractRabbitMqBase implements ChannelAwareMessageListener
@Resource
private ConfigService configService;
@Resource
private ConnectionFactory consumerConnectionFactory;
private SimpleMessageListenerContainer[] rabbitMqListener = null;
@PostConstruct
private void init()
rabbitMqListener = new SimpleMessageListenerContainer[getConsumerCount()];
for (int i = 0; i < getConsumerCount(); ++i)
rabbitMqListener[i] = new SimpleMessageListenerContainer(consumerConnectionFactory);
rabbitMqListener[i].setMessageListener(this);
rabbitMqListener[i].setAcknowledgeMode(AcknowledgeMode.MANUAL);
rabbitMqListener[i].setQueueNames(queueName());
rabbitMqListener[i].start();
/**
* 需要创建多少个消费者
*
* @return
*/
protected abstract int getConsumerCount();
@PreDestroy
private void onDestroy()
if (Objects.isNull(rabbitMqListener) || rabbitMqListener.length <= 0)
return;
for (int i = 0; i < getConsumerCount(); ++i)
rabbitMqListener[i].destroy();
三 使用Rabbit MQ
完成了Rabbit MQ的基本引入与配置后,就可以去使用它了
- 生产者的使用。定义一个生产者类
TestRabbitMqProducer
继承AbstractRabbitMqBase
,然后在需要使用的地方通过Spring注入此类,就可以发送消息到队列里了
package com.bell.rabbitmq.test;
import org.springframework.stereotype.Service;
import bell.util.JSONUtil;
import java.util.Objects;
/**
* @Author: yqs
* @Date: 2019/1/25
* @Time: 16:25
* Copyright © Bell All Rights Reserved.
*/
@Service
public class TestRabbitMqProducer extends AbstractRabbitMqBase
public Boolean publish(String data)
// data 字符串我使用json格式的,这样方便反序列化,当然可以使用Rabbit MQ的convert,由于篇幅有限,不做介绍,请自行实现
this.publishMessage(data);
return true;
@Override
public String queueName()
return "rabbitmq.test.queue";
@Override
public String queueExchangeName()
return "rabbitmq.test.exchange";
@Override
public String queueRouteKey()
return "rabbitmq.test.route.key";
// 对于 queue,exchange,route key等参数可以放到一个常量类中,一处定义,多处可用,还能保证生产者与消费者不一致
- 消费者的使用。创建消费者类
TestRabbitMqConsumer
继承AbstractRabbitMqConsumer
类,并实现所有的方法才可使用,此类只要使用@Service
标注后,就可开始消费,不用做其他的操作
package com.bell.rabbitmq.test;
import org.springframework.stereotype.Service;
import bell.util.JSONUtil;
import java.util.Objects;
/**
* @Author: yqs
* @Date: 2019/1/25
* @Time: 16:25
* Copyright © Bell All Rights Reserved.
*/
@Service
public class TestRabbitMqConsumer extends AbstractRabbitMqConsumer
/**
* onMessage方法中不要抛出异常,否则会阻塞此消费者,导致服务端不在向此消费者推送消息
*/
@Override
public void onMessage(Message message, Channel channel) throws IOException
// 我们发送的消息类型是已bytes数组的形式存在的
String eventMessage = new String(message.getBody());
System.out.println(eventMessage);
// 确认收到消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
// 确认收到消息,但是业务异常了,需要重回队列
// channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
@Override
protected int getConsumerCount()
return 1;
@Override
public String queueName()
return "rabbitmq.test.queue";
@Override
public String queueExchangeName()
return "rabbitmq.test.exchange";
@Override
public String queueRouteKey()
return "rabbitmq.test.route.key";
// 对于 queue,exchange,route key等参数可以放到一个常量类中,一处定义,多处可用,还能保证生产者与消费者不一致
到这里,动态的声明和绑定队列就完了, 接下来,我将继续讲解如何配置与使用延迟队列。
四 延迟队列的配置与使用须知
延迟队列的实现有两种方式,第一种是使用插件的方式(详情见官网),第一种方式是通过插件的方式去启用延迟队列,由于插件是官方Rabbit MQ不自带的,所以在Rabbit MQ的后台管理中心是看不到延迟队列信息的,不太利于观察和维护,但使用与否,取决于个人。第二种方式是使用两个队列去实现,一个队列充当消息计时队列,当这些消息变成死信之后,如果不配置死信转发机制,那么这些死信将会被丢弃,反之,要想这些消息被消费,就需要另一个队列接收这些死信,从而让它在次被消费,从而实现延迟的功能。使用本篇博客所配置的延迟队列需要注意以下事项:
1)不对队列设置TTL。
2)不对单个消息设置TTL。也就是说所有的消息得过期时间都是一样的,过期从进入队列开始计算,因消息到达队列有先后顺序,如果同一个队列中的每个消息时间都不一样,那么队头的消息时间还没有过期,而队列中间的消息过期了,队列中间的消息不会立马被转发,只有当他到达队头后才能被转发,因此,为保证过期消息的过期时间不远大于设定的时间,本博客讲解的配置不对队列和单个消息设置不同的时间。
3)因采用的是使用两个队列实现延迟队列,因此需要结合本篇博客前部分的配置,请熟知。
4)延迟队列不得有消费,否则就无法实现延迟功能。
五 延迟队列的配置与使用
- 队列配置参数解释
1)参数x-dead-letter-exchange
为死信转发Exchange;
2)参数x-dead-letter-routing-key
为死信转发Route key;
3)参数x-message-ttl
为消息在队列里的生存时间;
- 创建延迟队列配置抽象类
AbstractRabbitDelayMqBase
并实现IRabbitMqConfig
接口,接口在抽象类中不实现。同时创建messageTtl()
、deadLetterRoutingKey()
、deadLetterExchange()
三个抽象方法,方法作用描述见代码
package com.bell.rabbitmq;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;
/**
* @Author: yqs
* @Date: 2019/1/25
* @Time: 18:37
* Copyright © Bell All Rights Reserved.
*/
public abstract class AbstractRabbitDelayMqBase implements IRabbitMqConfig
@Resource
private RabbitAdmin producerRabbitAdmin;
@Resource
private RabbitTemplate producerAmqpTemplate;
@PostConstruct
private void init()
// 此处配置延迟队列相关参数
Map<String, Object> args = new HashMap<>(3);
args.put("x-dead-letter-exchange", deadLetterExchange());
args.put("x-dead-letter-routing-key", deadLetterRoutingKey());
args.put("x-message-ttl", messageTtl());
DirectExchange exchange = new DirectExchange(queueExchangeName());
Queue queue = new Queue(queueName(), true, false, false, args);
producerRabbitAdmin.declareQueue(queue);
producerRabbitAdmin.declareExchange(exchange);
producerRabbitAdmin.declareBinding(BindingBuilder.bind(queue).to(exchange).with(queueRouteKey()));
/**
* 消息生存时间 (单位毫秒)
*
* @return
*/
protected abstract Integer messageTtl();
/**
* 死信交Routing
*
* @return
*/
protected abstract String deadLetterRoutingKey();
/**
* 死信Exchange
*
* @return
*/
protected abstract String deadLetterExchange();
/**
* 发布消息到队列中
*
* @param
*/
protected void publishMessage(String message)
producerAmqpTemplate.convertAndSend(queueExchangeName(), queueRouteKey(), message);
/**
* 发布消息队列中
*
* @param message
*/
protected void publishMessage(Message message)
producerAmqpTemplate.send(queueExchangeName(), queueRouteKey(), message);
- 创建延迟队列生产者类
TestRabbitMqDelayProducer
继承AbstractRabbitDelayMqBase
类,并实现所有方法,在实现方法时需要注意,延迟队列死信转发的Exchange和Route key必须同接收死信队列的Exchange和Route key保持一致,且延迟队列不需要消费者,接收死信的队列的生产者可根据业务需求可有可无
package com.bell.rabbitmq.test;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.stereotype.Service;
import bell.util.JSONUtil;
import javax.annotation.PostConstruct;
import java.util.Objects;
/**
* @Author: yqs
* @Date: 2019/1/25
* @Time: 14:53
* Copyright © Bell All Rights Reserved.
*/
@Service
public class TestRabbitMqDelayProducer extends AbstractRabbitDelayMqBase
/**
* 消息过期时间 (一天)
*/
private static final int MESSAGE_DELAY_TIME = 24 * 3600 * 1000;
public Boolean publishMessage(String data)
// data为json字符串,同样,可以使用Rabbit MQ的convert,自行实现
publishMessage(data);
return true;
@Override
protected Integer messageTtl()
return MESSAGE_DELAY_TIME;
@Override
public String queueName()
return "rabbitmq.test.delay.queue";
@Override
public String queueExchangeName()
return "rabbitmq.test.delay.exchange";
@Override
public String queueRouteKey()
return "rabbitmq.test.delay.route.key";
@Override
protected String deadLetterRoutingKey()
return "rabbitmq.test.delay.receive.route.key";
@Override
protected String deadLetterExchange()
return "rabbitmq.test.delay.receive.exchange";
// 对于 queue,exchange,route key等参数可以放到一个常量类中,一处定义,多处可用,还能保证生产者与消费者不一致
- 创建延迟队列消费者类
TestRabbitMqDelayConsumer
并继承AbstractRabbitMqConsumer
抽象类
package com.bell.rabbitmq.test;
import org.springframework.stereotype.Service;
import bell.util.JSONUtil;
import java.util.Objects;
/**
* @Author: yqs
* @Date: 2019/1/25
* @Time: 16:25
* Copyright © Bell All Rights Reserved.
*/
@Service
public class TestRabbitMqDelayConsumer extends AbstractRabbitMqConsumer
/**
* onMessage方法中不要抛出异常,否则会阻塞此消费者,导致服务端不在向此消费者推送消息
*/
@Override
public void onMessage(Message message, Channel channel) throws IOException
// 我们发送的消息类型是已bytes数组的形式存在的
String eventMessage = new String(message.getBody());
System.out.println(eventMessage);
// 确认收到消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
// 确认收到消息,但是业务异常了,需要重回队列
// channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
@Override
protected int getConsumerCount()
return 1;
@Override
public String queueName()
return "rabbitmq.test.delay.receive.queue";
// 此处的 Exchange 和 Route key必须同延迟队列里声明的死信转发保持一致
@Override
以上是关于Java中动态声明与绑定Rabbit MQ队列以及延迟队列的实现与使用的主要内容,如果未能解决你的问题,请参考以下文章