Rabbit的事务
Posted fuguang
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Rabbit的事务相关的知识,希望对你有一定的参考价值。
加入事务的方法:
txSelect() txCommit() txRollback()
生产者:
package com.kf.queueDemo.transactions; import java.io.IOException; import java.util.concurrent.TimeoutException; import com.kf.utils.RabbitConnectionUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; /** * 简单队列 (事务) txSelect() txCommit() txRollback() * @author kf * */ public class TransactionsQueueProducer { //队列名称 private static String QUEUENAME = "TRANSACTIONSQUEUE"; public static void main(String[] args) throws IOException, TimeoutException{ Connection connection = RabbitConnectionUtils.getConnection(); //创建通道 Channel channel = connection.createChannel(); //通道里放入队列 /** * 第一个参数是 队列名称 * 第二个参数指 要不要持久化 */ channel.queueDeclare(QUEUENAME, false, false, false, null); /* //消息体 String mes = "demo_message汉字"; //发送消息 *//** * 参数为 exchange, routingKey, props, body * exchange 交换机 * routingKey 路由键 * * body 消息体 *//* channel.basicPublish("", QUEUENAME, null, mes.getBytes());*/ /** * 集群环境下,多个消费者情况下。消费者默认采用均摊 */ try { //开启事务 channel.txSelect(); String mes = "demo_message汉字"; System.out.println("发送消息"+mes); channel.basicPublish("", QUEUENAME, null, mes.getBytes()); //提交事务 channel.txCommit(); int i = 1/0; } catch (Exception e) { e.printStackTrace(); //事务回滚 channel.txRollback(); System.out.println("生产者发生错误,事务已回滚"); }finally { channel.close(); connection.close(); } } }
消费者:
package com.kf.queueDemo.transactions; import java.io.IOException; import java.util.concurrent.TimeoutException; import com.kf.utils.RabbitConnectionUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; import com.rabbitmq.client.AMQP.BasicProperties; /** * 简单队列消费者 事务 * @author kf * */ public class TransactionsConsumer { //队列名称 private static String QUEUENAME = "TRANSACTIONSQUEUE"; public static void main(String[] args) throws IOException, TimeoutException{ System.out.println("开始接收消息"); Connection connection = RabbitConnectionUtils.getConnection(); //创建通道 final Channel channel = connection.createChannel(); //通道里放入队列 /** * 第一个参数是 队列名称 * 第二个参数指 要不要持久化 */ channel.queueDeclare(QUEUENAME, false, false, false, null); DefaultConsumer consumer = new DefaultConsumer(channel){ //监听队列 @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { System.out.println("------------进入监听---------"); String s = new String(body, "utf-8"); System.out.println("获取到的消息是:"+s); //手动应答。 /** * 当 channel.basicConsume(QUEUENAME, true, consumer);第二个参数为false时 是手动应答模式 */ // channel.basicAck(envelope.getDeliveryTag(), false); } }; //设置应答模式 /** * 参数: 对列名,是否自动签收,监听的类 */ System.out.println("获取消息的方法之前"); channel.basicConsume(QUEUENAME, true, consumer); System.out.println("获取消息的方法之后"); } }
以上是关于Rabbit的事务的主要内容,如果未能解决你的问题,请参考以下文章