RabbitMQ持久化
Posted marklogzhu
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ持久化相关的知识,希望对你有一定的参考价值。
在项目中,有时候需要消息保障100%投递,我们来看下 RabbitMQ 是怎么支持的
一、RabbitMQ 持久化配置
1.1 交换机持久化配置
设置 durable 属性为 true。
实例:
String exchangeType = "topic";
String exchangeName = "persistenceExchange";
channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
1.2 队列持久化配置
设置 durable 属性为 true。
实例:
String queueName = "persistenceQueue";
channel.queueDeclare(queueName, false, false, false, null);
1.3 消息持久化配置
设置消息属性 Delivery mode 为 2 。
实例:
AMQP.BasicProperties properties =
new AMQP.BasicProperties().builder()
// 设置消息是否持久化
.deliveryMode(2)
.build();
channel.basicPublish(exchangeName, routingKey, properties, "持久化消息".getBytes());
二、生产者确认
标记为持久化后的消息也不能完全保证不会丢失。当 RabbitMQ 接收到生产者的消息,但是还没有来得及保存到磁盘上,服务器就挂了(比如机房断电),那么重启后,RabbitMQ 中的这条未及时保存的消息就会丢失。因为RabbitMQ 不做实时立即的磁盘同步(fsync)。这种情况下,对于持久化要求不是特别高的简单任务队列来说,还是可以满足的。如果需要更强大的保证,那么就需要使用生产者确认反馈机制。
2.1 confirm 确认机制
实现原理
生产者将信道设置成 confirm 模式 ,所有在该信道上面发布的消息都将会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后,broker 就会发送一个确认给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了,如果消息和队列是可持久化的,那么确认消息会在将消息写入磁盘之后发出,broker 回传给生产者的确认消息中 delivery-tag 域包含了确认消息的序列号,此外 broker 也可以设置basic.ack 的 multiple 域,表示到这个序列号之前的所有消息都已经得到了处理;
confirm 模式 最大的好处在于是异步的,一旦发布一条消息,生产者应用程序就可以在等信道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用便可以通过回调方法来处理该确认消息,如果 RabbitMQ 因为自身内部错误导致消息丢失,就会发送一条 nack 消息,生产者应用程序同样可以在回调方法中处理该 nack 消息;
实例
消费者
public class Consumer {
public static void main(String[] args) throws Exception {
// 1.创建连接工厂对象
ConnectionFactory connectionFactory = new ConnectionFactory();
// 设置主机
connectionFactory.setHost("111.231.83.100");
// 设置端口
connectionFactory.setPort(5672);
// 设置虚拟主机
connectionFactory.setVirtualHost("/");
// 2.获取一个连接对象
final Connection connection = connectionFactory.newConnection();
// 3.创建 Channel
final Channel channel = connection.createChannel();
String exchangeType = "topic";
// 4.声明交换机
String exchangeName = "confirmExchange";
channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
// 5.申明队列
String queueName = "confirmQueue";
channel.queueDeclare(queueName, false, false, false, null);
// 6.将交换机和队列进行绑定关系
String routingKey1 = "#";
channel.queueBind(queueName, exchangeName, routingKey1);
// 7.循环消费
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, true, queueingConsumer);
System.err.println("消费端启动");
while (true) {
QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
String msg = new String(delivery.getBody());
System.err.println("消费端消费: " + msg);
}
}
}
生产者:
public class Producer {
public static void main(String[] args) throws Exception {
// 1.创建连接工厂对象
ConnectionFactory connectionFactory = new ConnectionFactory();
// 设置主机
connectionFactory.setHost("111.231.83.100");
// 设置端口
connectionFactory.setPort(5672);
// 设置虚拟主机
connectionFactory.setVirtualHost("/");
// 2.获取一个连接对象
final Connection connection = connectionFactory.newConnection();
// 3.创建 Channel
final Channel channel = connection.createChannel();
// 4.开启消息确认模式
channel.confirmSelect();
// 5.声明交换机
String exchangeName = "confirmExchange";
// 6.发送消息
channel.basicPublish(exchangeName, "message", null, "确认事件消息".getBytes());
// 7. 添加确认监听
channel.addConfirmListener(new ConfirmListener() {
/**发送成功回调**/
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
System.err.println(" deliveryTag=" + deliveryTag);
System.err.println(" multiple=" + multiple);
System.err.println("-------发送消息至MQServer成功!-----------");
}
/**发送失败回调**/
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.err.println(" deliveryTag=" + deliveryTag);
System.err.println(" multiple=" + multiple);
System.err.println("-------发送消息至MQServer失败!-----------");
}
});
}
}
先启动消费者,然后启动生产者,观察控制台输出:
消费者控制台输出:
消费端启动
消费端消费: 确认事件消息
生产者控制台输出:
deliveryTag=1
multiple=false
-------发送消息至MQServer成功!-----------
2.2 Return 消息机制
Return 消息是用于处理一些不可路由的消息!生产者通过指定一个 exchange 和 routingkey 把消息送达到某个队列中去,然后消费者监听队列,进行消费处理。但是在某些情况下,如果我们在发送消息时,当前的exchange 不存在或者指定的 routingkey 路由不到,这个时候如果要监听这种不可达的消息,就要使用 return Listener。
原理
在基础 API 中有一个关键的配置项 Mandatory ,如果为 true,则监听器会接收到路由不可达的消息,然后进行后续处理,如果为 false,则 broker 端自动删除该消息。
实例: routingKey 不存在
生产者:
public class Producer {
public static void main(String[] args) throws Exception {
// 1.创建连接工厂对象
ConnectionFactory connectionFactory = new ConnectionFactory();
// 设置主机
connectionFactory.setHost("111.231.83.100");
// 设置端口
connectionFactory.setPort(5672);
// 设置虚拟主机
connectionFactory.setVirtualHost("/");
// 2.获取一个连接对象
final Connection connection = connectionFactory.newConnection();
// 3.创建 Channel
final Channel channel = connection.createChannel();
channel.confirmSelect();
// 4.监听 ReturnListener
channel.addReturnListener(new ReturnListener() {
@Override
public void handleReturn(int replyCode, String replyText, String exchange,
String routingKey, AMQP.BasicProperties properties, byte[] body)
throws IOException {
System.err.println("replyCode: " + replyCode);
System.err.println("replyText: " + replyText);
System.err.println("exchange: " + exchange);
System.err.println("routingKey: " + routingKey);
System.err.println("properties: " + properties);
System.err.println("body: " + new String(body));
}
});
// 5.声明交换机,使用 defaultExchange
String exchangeName = "";
// 6.发送消息
// 配置Mandatory为 true,否则 RabbitMQ 会自动删除该消息,ReturnListener 无法监听
channel.basicPublish(exchangeName, "business.a",true, null, "ReturnListener消息".getBytes());
}
}
启动生产者,观察控制台输出:
replyCode: 312
replyText: NO_ROUTE
exchange:
routingKey: business.a
properties: #contentHeader<basic>(content-type=null, content-encoding=null, headers=null, delivery-mode=null, priority=null, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null)
body: ReturnListener消息
注:当 exchange 不存在的时候, return Listener 未被回调,暂不知道原因。
三、消费者手工签收
RabbitMQ 服务器将消息发送给消费者后,如果没有消费者没有返回确认信息的话, RabbitMQ 服务器将会持续发送直至这个消息被消费掉或过期。
消费者确认有两种方式:
- 自动确认模式(automatic acknowledgement model):当 RabbbitMQ 将消息发送给应用后,消费者端自动回送一个确认消息,此时 RabbitMQ 删除此消息。
- 显式确认模式(explicit acknowledgement model):消费者收到消息后,可以在执行一些逻辑后,消费者自己决定什么时候发送确认回执(acknowledgement),RabbitMQ 收到回执后才删除消息,这样就保证消费端不会丢失消息
3.1自动确认
自动确认模式 只需要设置 autoAck = true 即可。
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, true, queueingConsumer);
3.2 手动确认
- 设置关闭自动确认 autoAck = false 即可。
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, false, queueingConsumer);
- 手动确认
// 消费成功确认
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
// 消费失败,重回队列
channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, true);
// 消费失败,丢弃消息
channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, false);
//或者使用 basicReject
// 消费失败,重回队列
channel.basicReject(delivery.getEnvelope().getDeliveryTag(), true);
// 消费失败,丢弃消息
channel.basicReject(delivery.getEnvelope().getDeliveryTag(), false);
以上是关于RabbitMQ持久化的主要内容,如果未能解决你的问题,请参考以下文章