RabbitMQ的消息优先
RabbitMQ可以设置队列的优先级,在队列中的高优先级消息会被优先消费。在设置优先级时,首先需要设置队列的最高优先级,然后在生产者发送消息时设置该条消息的优先级,最后在队列中的高优先级的消息会被先发送给消费者消费
设置队列的最高优先级
设置队列的最高优先级在声明队列时进行设置,代码如下:
Map<String, Object> queueArgs = new HashMap<>(1);
queueArgs.put("x-max-priority", 10);
channel.queueDeclare(QUEUE_NAME, false, false, false, queueArgs);
设置消息的优先级
设置消息的优先级在生产者生成消息时进行设置,代码如下:
BasicProperties properties = new BasicProperties.Builder()
.priority(i)
.build();
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, properties, message);
注意:当消费者消费速度大于生产端,且Broker中没有消息堆积的话,也就是说当生产者生产一条消息就被消费者消费,消息队列中没有消息堆积的话,设置消息优先级是没有意义的
例子
生产者
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
for (int i = 0; i < 10; i++) {
BasicProperties properties = new BasicProperties.Builder()
.priority(i)
.build();
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, properties,
String.valueOf(i).getBytes(StandardCharsets.UTF_8));
}
消费者
Channel channel = connection.createChannel();
channel.exchangeDeclare(PriorityProducer.EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
Map<String, Object> queueArgs = new HashMap<>(1);
queueArgs.put("x-max-priority", 10);
channel.queueDeclare(QUEUE_NAME, false, false, false, queueArgs);
channel.queueBind(QUEUE_NAME, PriorityProducer.EXCHANGE_NAME, PriorityProducer.ROUTING_KEY);
channel.basicQos(1);
channel.basicConsume(QUEUE_NAME, false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, StandardCharsets.UTF_8);
System.out.print(message + " ");
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
输出如下: 0 9 8 7 6 5 4 3 2 1
由于消费者设置了消息预取数量为1,所以会先取0消费,然后造成消息在消息队列中的积压,后面取的话就会先取优先级高的消息
RabbitMQ实现延迟消息
RabbitMQ使用AMQP协议,在AMQP协议中没有直接实现延迟消息,所以我们使用死信队列(DLX)和消息存活时间(TTL)模拟出延迟队列
死信队列(DLX)
当消息在队列中变为死信消息(Dead Message)后,该消息会被Publish到该队列的DLX(Dead-Letter-Exchange)中。DLX就是一个Exchange,当消息被发送到DLX后可以路由到队列中进行重新消费
消息在消息队列中变为死信消息的几种情况:
- 消息被拒绝并且不会重新进入队列(requeue=false)
- 消息TTL过期
- 消息队列达到最大长度
在声明队列时设置该队列的死信队列以及发送消息到死信队列的Routing Key,代码如下:
Map<String, Object> queueArgs = new HashMap<>(2);
// 设置死信队列
queueArgs.put("x-dead-letter-exchange", DelayProducer.DLX_EXCHANGE_NAME);
// 设置死信Roting Key,不设置默认使用该Queue的Routing Key
queueArgs.put("x-dead-letter-routing-key", DelayProducer.DLX_ROUTING_KEY);
channel.queueDeclare(PLAIN_QUEUE_NAME, false, false, false, queueArgs);
实现延迟队列
可以通过DDL和DLX实现延迟队列,具体实现逻辑如下:
把消息发送到普通的队列中(该队列设置死信队列),当消息DDL到期后会发送到死信队列中,然后通过消费死信队列中的消息实现延迟队列,示例代码如下:
生产者
Channel channel = connection.createChannel();
channel.exchangeDeclare(PLAIN_EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
channel.exchangeDeclare(DLX_EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
BasicProperties properties = new BasicProperties.Builder()
// 设置消息的TTL
.expiration("60000")
.build();
channel.basicPublish(PLAIN_EXCHANGE_NAME, PLAIN_ROUTING_KEY, properties,
"Hello".getBytes(StandardCharsets.UTF_8));
消费者
Channel channel = connection.createChannel();
channel.exchangeDeclare(DelayProducer.PLAIN_EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
channel.exchangeDeclare(DelayProducer.DLX_EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
Map<String, Object> queueArgs = new HashMap<>(2);
// 设置死信队列
queueArgs.put("x-dead-letter-exchange", DelayProducer.DLX_EXCHANGE_NAME);
// 设置死信Roting Key,不设置默认使用该Queue的Routing Key
queueArgs.put("x-dead-letter-routing-key", DelayProducer.DLX_ROUTING_KEY);
channel.queueDeclare(PLAIN_QUEUE_NAME, false, false, false, queueArgs);
channel.queueBind(PLAIN_QUEUE_NAME, DelayProducer.PLAIN_EXCHANGE_NAME, DelayProducer.PLAIN_ROUTING_KEY);
channel.queueDeclare(DLX_QUEUE_NAME, false, false, false, null);
channel.queueBind(DLX_QUEUE_NAME, DelayProducer.DLX_EXCHANGE_NAME, DelayProducer.DLX_ROUTING_KEY);
// 由于消息到普通队列中TTL时间内没有消费,所以该消息会被发送到死信队列中,所以我们通过消费死信队列来实现延迟消息
channel.basicConsume(DLX_QUEUE_NAME, false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, StandardCharsets.UTF_8);
System.out.println(message);
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
RabbitMQ消费模式
由前面的消息队列系列文章可以看出来,消费者可以获取消息有Pull、Push模型。RabbitMQ两种模型都支持,但是其对Pull模型支持不太好,需要自己实现轮询查询是否有消息。下面是两种模型的简单使用
Push模型
Push模型是RabbitMQ服务器主动推送消息给Consumer。这种模型有点像设计模式中的时间驱动模式,需要Consumer注册回调接口到RabbitMQ服务器中,当RabbitMQ服务器有消息时会主动回调接口发送消息。Push模型有慢消费的缺点,RabbitMQ通过设置消费者预取消息数量来控制服务器发送消息的速度。我们经常用到就是这种模式,Consumer示例代码如下:
Channel channel = connection.createChannel();
channel.exchangeDeclare(PullProducer.EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
channel.queueDeclare(QUEUE_NAME, false,false, false, null);
channel.queueBind(QUEUE_NAME, PullProducer.EXCHANGE_NAME, PullProducer.ROUTING_KEY);
channel.basicConsumer(QUEUE_NAME, false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {
// 处理消息逻辑
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
Pull模型
Pull模型Consumer主动去RabbitMQ服务器拉消息。这种模式的缺点是消息延迟和忙等,需要自己设计轮询方案。Consumer示例代码如下,没有实现轮询方案:
Connection connection = Basic.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(PullProducer.EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
channel.queueDeclare(QUEUE_NAME, false,false, false, null);
channel.queueBind(QUEUE_NAME, PullProducer.EXCHANGE_NAME, PullProducer.ROUTING_KEY);
while (true) {
GetResponse response = channel.basicGet(QUEUE_NAME, false);
if (response == null) {
continue;
}
String message = new String(response.getBody(), StandardCharsets.UTF_8);
// 处理消息逻辑
channel.basicAck(response.getEnvelope().getDeliveryTag(), false);
}
Reference
http://blog.csdn.net/u013256816/article/details/55105495
http://blog.csdn.net/u013256816/article/details/62890189