RabbitMQ发送端事务管理 —— 事务机制 和 确认机制
Posted yifanSJ
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ发送端事务管理 —— 事务机制 和 确认机制相关的知识,希望对你有一定的参考价值。
一、AMQP提供
事务机制,比较消耗性能
try { channel.txSelect(); channel.basicPublish(EXCHANGE_NAME, "queue22", true, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes()); channel.addReturnListener(new ReturnListener() { public void handleReturn(int arg0, String arg1, String arg2, String arg3, BasicProperties arg4, byte[] arg5) throws IOException { System.out.println("返回的消息是:" + new String(arg5)); } }); channel.txCommit(); } catch (Exception e) { e.printStackTrace(); channel.txRollback(); }
二、RabbitMQ提供
消息确认机制(效率比事务机制高)
try { channel.confirmSelect(); channel.basicPublish(EXCHANGE_NAME, "queue2", MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes()); if(!channel.waitForConfirms()) { System.out.println("send message failed"); } } catch (Exception e) { e.printStackTrace(); }
但是确认模式,是每发送一条消息后就调用channel.waitForConfirms()方法,之后等待服务器的确认,这实际上是一种串行、同步等待的方式,事务机制和它一样(但在实际大数量量测试时,会发现确认机制只比事务机制效率高一点点。是因为同步等待的方式下,confirm机制发送一条消息需要通信交互的命令是2条:Basic.Publish和Basic.Ack;事务机制是3条:Basic.Publish、Tx.Commit/Tx.Commit-Ok或者Tx.Rollback/Tx.Rollback-Ok)
优化方案一:
批量confirm方法,比较简单,不写代码了,就是将channel.basicPublish用循环框起来,发送次数到达一定数量后,进行waitForConfirms()即可。缺点是,如果这批出现返回Basic.Nack或者超时情况,客户端需要将这一批重新发送,当消息经常丢失是,性能不升反降。
注意:要将发送出去的消息存入缓存 中,可以是ArrayList或者BlockingQueue之类的,然后适时的清空,或者重发里面的信息。
优化方案二:
异步confirm方法,提供一个回调方法,服务端确认了一条或者多条消息后客户端会回调这个方法进行处理
channel.confirmSelect(); channel.addConfirmListener(new ConfirmListener() { public void handleNack(long deliveryTag, boolean multiple) throws IOException { System.out.println("Nack,SeqNo:"+deliveryTag+", multiple:"+multiple); if(multiple) { confirmSet.headSet(deliveryTag-1).clear(); }else { confirmSet.remove(deliveryTag); } } public void handleAck(long deliveryTag, boolean multiple) throws IOException { System.out.println("deliveryTag:"+deliveryTag+", multiple:"+multiple); if(multiple) { confirmSet.headSet(deliveryTag-1).clear(); }else { confirmSet.remove(deliveryTag); } //注意这里需要添加处理消息重发的场景 } }); channel.basicPublish(EXCHANGE_NAME, "queue22", true, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());
三、总结
1、事务机制和确认机制是互斥的,不能共存。
2、事务机制和确认机制确保的是消息能够正确地发送到RabbitMQ的交换器,如果些交换器没有匹配队列,那么消息也会丢失。
3、普通事务机制和普通confirm的方式吞吐量很低,但方式简单,不需要在客户端维护状态(这里指的是维护deliveryTag及缓存未确认的消息)。
批量confirm方式的问题在于遇到RabbitMQ服务端返回Basicnack需要重发批量导致的性能降低。
异步confirm方式和批量confirm一样需要在客户端维护状态。
以上是关于RabbitMQ发送端事务管理 —— 事务机制 和 确认机制的主要内容,如果未能解决你的问题,请参考以下文章