07. RabbitMQ消息成功确认机制
Posted 程序员阿红
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了07. RabbitMQ消息成功确认机制相关的知识,希望对你有一定的参考价值。
07. RabbitMQ消息成功确认机制
-
在实际场景下,有的生产者发送的消息是必须保证成功发送到消息队列中,那么如何保证成功投递呢?
-
- 事务机制
- 发布确认机制
1.事务机制
-
AMQP协议提供的一种保证消息成功投递的方式,通过信道开启 transactional 模式
-
并利用信道 的三个方法来实现以事务方式 发送消息,若发送失败,通过异常处理回滚事务,确保消息成功投递
-
- channel.txSelect(): 开启事务
- channel.txCommit() :提交事务
-
- channel.txRollback() :回滚事务
-
Spring已经对上面三个方法进行了封装,所以我们只能使用原始的代码演示
2.生产者代码
package trascation;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import util.ConnectionUtil;
/**
* @author WeiHong
* @date 2021 - 09 - 15 19:59
*/
public class Sender
public static void main(String[] args) throws Exception
//1.获取连接
Connection connection = ConnectionUtil.getConnection();
//2.在连接中创建信道
Channel channel = connection.createChannel();
//3.声明路由
channel.exchangeDeclare("trascation_exchange_topic","topic");
//4.发送消息
channel.txSelect(); //开启事务
try
channel.basicPublish("trascation_exchange_topic","user.weihong",null,"商品1-降价".getBytes());
System.out.println(3/0);//模拟异常
channel.basicPublish("trascation_exchange_topic","user.libai",null,"商品2-降价".getBytes());
channel.basicPublish("trascation_exchange_topic","users123.wangwu",null,"商品3-降价".getBytes());
System.out.println("生产者已发送!");
channel.txCommit();//事务提交
catch(Exception e)
System.out.println("由于系统异常,消息全部撤回!");
channel.txRollback();//事务回滚
e.printStackTrace();
finally
channel.close();
connection.close();
3.消费者代码
package trascation;
import com.rabbitmq.client.*;
import util.ConnectionUtil;
import java.io.IOException;
/**
* @author WeiHong
* @date 2021 - 09 - 15 20:10
*/
public class Recer1
public static void main(String[] args) throws Exception
//1.创建连接
Connection connection = ConnectionUtil.getConnection();
//2.在连接中创建信道
Channel channel = connection.createChannel();
//3.声明队列
channel.queueDeclare("trscation_queue_topic1",false,false,false,null);
//4.绑定路由
channel.queueBind("trscation_queue_topic1","trascation_exchange_topic","user.#");
//5.定义内部类接收消息
DefaultConsumer consumer = new DefaultConsumer(channel)
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException
String msg = new String(body);
System.out.println("消费者1="+msg);
;
channel.basicConsume("trscation_queue_topic1",true,consumer);
4.实验结果
以上是关于07. RabbitMQ消息成功确认机制的主要内容,如果未能解决你的问题,请参考以下文章