RabbitMQ发送端事务管理 —— 事务机制 和 确认机制

Posted yifanSJ

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ发送端事务管理 —— 事务机制 和 确认机制相关的知识,希望对你有一定的参考价值。

一、AMQP提供

事务机制,比较消耗性能

try {
    channel.txSelect();
    channel.basicPublish(EXCHANGE_NAME, "queue22", true, MessageProperties.PERSISTENT_TEXT_PLAIN,
            msg.getBytes());
    channel.addReturnListener(new ReturnListener() {
        public void handleReturn(int arg0, String arg1, String arg2, String arg3, BasicProperties arg4,
                byte[] arg5) throws IOException {
            System.out.println("返回的消息是:" + new String(arg5));
        }
    });
    channel.txCommit();
} catch (Exception e) {
    e.printStackTrace();
    channel.txRollback();
}

二、RabbitMQ提供

消息确认机制(效率比事务机制高)

try {
    channel.confirmSelect();
    channel.basicPublish(EXCHANGE_NAME, "queue2", MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());
    if(!channel.waitForConfirms()) {
        System.out.println("send message failed");
    }
} catch (Exception e) {
    e.printStackTrace();
}

但是确认模式,是每发送一条消息后就调用channel.waitForConfirms()方法,之后等待服务器的确认,这实际上是一种串行、同步等待的方式,事务机制和它一样(但在实际大数量量测试时,会发现确认机制只比事务机制效率高一点点。是因为同步等待的方式下,confirm机制发送一条消息需要通信交互的命令是2条:Basic.Publish和Basic.Ack;事务机制是3条:Basic.Publish、Tx.Commit/Tx.Commit-Ok或者Tx.Rollback/Tx.Rollback-Ok)

优化方案一:

批量confirm方法,比较简单,不写代码了,就是将channel.basicPublish用循环框起来,发送次数到达一定数量后,进行waitForConfirms()即可。缺点是,如果这批出现返回Basic.Nack或者超时情况,客户端需要将这一批重新发送,当消息经常丢失是,性能不升反降。

注意:要将发送出去的消息存入缓存 中,可以是ArrayList或者BlockingQueue之类的,然后适时的清空,或者重发里面的信息。

优化方案二:

异步confirm方法,提供一个回调方法,服务端确认了一条或者多条消息后客户端会回调这个方法进行处理

channel.confirmSelect();
channel.addConfirmListener(new ConfirmListener() {
    public void handleNack(long deliveryTag, boolean multiple) throws IOException {
        System.out.println("Nack,SeqNo:"+deliveryTag+", multiple:"+multiple);
        if(multiple) {
            confirmSet.headSet(deliveryTag-1).clear();
        }else {
            confirmSet.remove(deliveryTag);
        }
    }
    public void handleAck(long deliveryTag, boolean multiple) throws IOException {
        System.out.println("deliveryTag:"+deliveryTag+", multiple:"+multiple);
        if(multiple) {
            confirmSet.headSet(deliveryTag-1).clear();
        }else {
            confirmSet.remove(deliveryTag);
        }
        //注意这里需要添加处理消息重发的场景
    }
});
channel.basicPublish(EXCHANGE_NAME, "queue22", true, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());

三、总结

1、事务机制和确认机制是互斥的,不能共存。

2、事务机制和确认机制确保的是消息能够正确地发送到RabbitMQ的交换器,如果些交换器没有匹配队列,那么消息也会丢失。

3、普通事务机制和普通confirm的方式吞吐量很低,但方式简单,不需要在客户端维护状态(这里指的是维护deliveryTag及缓存未确认的消息)。

  批量confirm方式的问题在于遇到RabbitMQ服务端返回Basicnack需要重发批量导致的性能降低。

  异步confirm方式和批量confirm一样需要在客户端维护状态。

以上是关于RabbitMQ发送端事务管理 —— 事务机制 和 确认机制的主要内容,如果未能解决你的问题,请参考以下文章

07. RabbitMQ消息成功确认机制

学习RabbitMQ:AMQP事务机制

面试官:引入RabbitMQ后,你如何保证全链路数据100%不丢失?

RabbitMQ之消息确认机制(事务+Confirm)

RabbitMQ之消息确认机制(事务+Confirm)

RabbitMQ事物模式