RabbitMQ事物模式

Posted xiaofengfree

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ事物模式相关的知识,希望对你有一定的参考价值。

Rabbit的消息确认机制(事务+confirm)
在rabbmitmq中我们可以通过持久化数据解决rabbitmq服务器异常的数据丢失问题
问题:生产者将消息发送出去之后消息到底有没有到达rabbitmq服务器默认的情况是不知道的;

事物两种方式:
AMQP实现了事务机制
Confirm模式

事务机制
txSelect、txCommit、txRollback
txSelect:用户将当前 channel设置成transation横式
txCommit:用于搜交事务
txRollback:回滚事务

一、AMQ模式

AMQ模式耗时,降低Rabbitmq消息吞吐量性能不好
生产者

技术图片
package tx;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import utils.ConnectionUtil;
import java.io.IOException;

public class TxSend {
    private static String QUE_NAME = "test_que_tx";
    public static void main(String[] args) throws Exception{
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUE_NAME,false,false,false,null);
        String msg = "hellow tx";
        try {
            channel.txSelect();
            channel.basicPublish("", QUE_NAME, null, msg.getBytes());
            channel.txCommit();
        } catch (IOException e) {
            channel.txRollback();
            e.printStackTrace();
        }
        System.out.println("txsend=====>" + msg);
        channel.close();
        connection.close();
    }
}
View Code

消费者

技术图片
package tx;

import com.rabbitmq.client.*;
import utils.ConnectionUtil;
import java.io.IOException;

public class TxRecv {
    private static String QUE_NAME = "test_que_tx";
    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConnection();
        final Channel channel = connection.createChannel();
        channel.queueDeclare(QUE_NAME,false,false,false,null);
        Consumer consumer = new DefaultConsumer(channel) {
            //当消息到达时执行回调方法
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                                       byte[] body) throws IOException {
                String message = new String(body, "utf-8");
                System.out.println("[receivetx]:" + message);
            }
        };
        channel.basicConsume(QUE_NAME,true,consumer);
    }
}
View Code

二、Confirm模式

生产者将信道设置成confirm模式,一旦信道进入 confirm模式,所有在该信道上面发布的消息都会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后,broker就会发送一个确认给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了,如果消息和队列是可持久化的,那么确认消息会将消息写入磁盘之后发出, broker回传给生产者的确认消息中 deliver-tag域包含了确认消息的序列号,此外broker也可以设置 basic.ack的multiple域,表示到这个序列号之前的所有消息都已经得到了处理。

Confirm的最大好处是异步

开启方法:channel.confirmSelect()

编程模式有三种方法:

(1)普通发一条 waitForConfirms()

(2)发一批 waitForConfirms()

(3)异步confirm模式:提供回调方法

1)普通模式代码

生产者

技术图片
package confirm;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import utils.ConnectionUtil;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Send1 {
    static String QUEUE_NAME = "test_queue_confirm1";
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        channel.confirmSelect();
        String message = "This is confirm queue";
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes("utf-8"));
        if(!channel.waitForConfirms()) {
            System.out.println("message send failed");
        } else {
            System.out.println("message send success");
        }
        channel.close();
        connection.close();
    }
}
View Code

消息者

技术图片
package confirm;

import com.rabbitmq.client.*;
import utils.ConnectionUtil;
import java.io.IOException;

public class Recv1 {
    private static String QUE_NAME = "test_queue_confirm1";
    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConnection();
        final Channel channel = connection.createChannel();
        channel.queueDeclare(QUE_NAME,false,false,false,null);
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                                       byte[] body) throws IOException {
                String message = new String(body, "utf-8");
                System.out.println("[receiveconfirm1]:" + message);
            }
        };
        channel.basicConsume(QUE_NAME,true,consumer);
    }
}
View Code

2)批量模式

生产者

技术图片
package confirm;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import utils.ConnectionUtil;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Send1 {
    static String QUEUE_NAME = "test_queue_confirm2";
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        channel.confirmSelect();
        String message = "This is confirm queue batch";
        for (int i = 0; i < 10; i++) {
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes("utf-8"));
        }
        //发送完一起确认
        if(!channel.waitForConfirms()) {
            System.out.println("message send failed");
        } else {
            System.out.println("message send success");
        }
        channel.close();
        connection.close();
    }
}
View Code

消费者

技术图片
package confirm;

import com.rabbitmq.client.*;
import utils.ConnectionUtil;
import java.io.IOException;

public class Recv1 {
    private static String QUE_NAME = "test_queue_confirm2";
    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConnection();
        final Channel channel = connection.createChannel();
        channel.queueDeclare(QUE_NAME,false,false,false,null);
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                                       byte[] body) throws IOException {
                String message = new String(body, "utf-8");
                System.out.println("[receiveconfirm1]:" + message);
            }
        };
        channel.basicConsume(QUE_NAME,true,consumer);
    }
}
View Code

三、异步模式

 Channel对象提供的ConfirmListener()回调方法只包含 deliveryTag(当前Chanel发出的消息序号),我们需要自己为每一个 Channel维护一个unconfirm的消息序号集合,每publish一条数据,集合中元素加1,每回调一次 handbAck方法,unconfim集合删掉相应的一条(multiple= false)或多条( multiple=tmue)记录。从程序运行效率上看,这个unconfirm集合最好采用有序集合SortedSet存储结构

生产者

技术图片
package confirm;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;
import utils.ConnectionUtil;
import java.io.IOException;
import java.util.Collections;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.TimeoutException;

public class Send3 {
    static String QUEUE_NAME = "test_queue_confirm3";
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        channel.confirmSelect();

        final SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<Long>());
        channel.addConfirmListener(new ConfirmListener() {
            @Override
            public void handleAck(long deliveryTag, boolean multiple) throws IOException {//成功调用
                if(multiple) {
                    System.out.println("----------handleAck---------multiple----------");
                    confirmSet.headSet(deliveryTag+1).clear();
                } else {
                    System.out.println("----------handleAck---------multiple----------false");
                    confirmSet.remove(deliveryTag);
                }
            }

            @Override
            public void handleNack(long deliveryTag, boolean multiple) throws IOException {//失败调用
                if(multiple) {
                    System.out.println("----------handleNack---------multiple----------");
                    confirmSet.headSet(deliveryTag+1).clear();
                } else {
                    System.out.println("----------handleNack---------multiple----------false");
                    confirmSet.remove(deliveryTag);
                }
            }
        });
        String message = "This is confirm queue synchronized";
        while (true) {
            long seqNo = channel.getNextPublishSeqNo();
            channel.basicPublish("",QUEUE_NAME,null,message.getBytes("utf-8"));
            confirmSet.add(seqNo);
        }
    }
}
View Code

消费者

技术图片
package confirm;

import com.rabbitmq.client.*;
import utils.ConnectionUtil;
import java.io.IOException;

public class Recv3 {
    private static String QUE_NAME = "test_queue_confirm3";
    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConnection();
        final Channel channel = connection.createChannel();
        channel.queueDeclare(QUE_NAME,false,false,false,null);
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                                       byte[] body) throws IOException {
                String message = new String(body, "utf-8");
                System.out.println("[receiveconfirm3]:" + message);
            }
        };
        channel.basicConsume(QUE_NAME,true,consumer);
    }
}
View Code

 














以上是关于RabbitMQ事物模式的主要内容,如果未能解决你的问题,请参考以下文章

RabbitMQ工作模式

RabbitMQ:第二章:Spring整合RabbitMQ(简单模式,广播模式,路由模式,通配符模式,消息可靠性投递,防止消息丢失,TTL,死信队列,延迟队列,消息积压,消息幂等性)(代码

二万字长文图文详解RabbitMQ6 种工作模式(理论与代码相结合)

RabbitMQ:第一章:6 种工作模式以及消息确认机制(理论与代码相结合)

微服务专题之.Net6下集成消息队列-RabbitMQ交换机模式代码演示(全)

RabbitMQ