Rabbitmq的一些笔记
Posted 溜溜吃鱼
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Rabbitmq的一些笔记相关的知识,希望对你有一定的参考价值。
目录
一些基本概念:
-
消息队列三大功能:
- 流量消峰:超过极限之后,后续的访问人员需要等待;可以避免宕机,但是需要更多的时间;
- 应用解耦:可以使系统之间解耦,一个系统调用别的系统的时候不会因为被调用的系统故障而一起发生故障;这样在调用的时候,会通过队列去访问别的系统,那么需要调用的系统只需要将这样的一个请求交给了队列则就完成了他的操作,后续的出错不会影响它。
- 异步处理:可以使得模块之间调用的时候,调用的模块不再需要等待被调用的模块执行结束,而是被调用的模块执行结束以后由队列去通知调用的模块;
-
MQ的四大核心概念:
- 生产者;
- 交换机:交换机和队列是绑定的关系,一个交换机可以绑定多个队列;
- 队列:交换机和队列是MQ的重要组成部分;队列和消费者一一对应;
- 消费者;
-
其他的一些:
- 生产者和交换机、队列(broker缓存代理)之间通过connection中的chennel(信道)连接。
基础代码:
-
生产者部分:
/**
* 生产者
*/
public class Producer
public static final String Queue_Name="hello";
public static void main(String[] args) throws IOException, TimeoutException
//创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//工厂ip 连接mq
connectionFactory.setHost("172.20.10.6");
connectionFactory.setUsername("ljw");
connectionFactory.setPassword("666666");
Connection connection = connectionFactory.newConnection();
//连接需要通过信道发送消息
Channel channel = connection.createChannel();
//通过信道获取队列
//1.队列名、队列中的消息是否需要持久化---默认存内存,持久化后存磁盘;
//2.是否进行消息共享,是否可以被多个消费者共享,true:不共享,false:共享;
//3.是否自动删除
channel.queueDeclare(Queue_Name,false,false,false,null);
String message="hello world";
//1.交换机
//2.路由的key
channel.basicPublish("",Queue_Name,null,message.getBytes(StandardCharsets.UTF_8));
System.out.println("消息发送完毕");
-
消费者部分:
/**
* 消费者接收消息
*/
public class Consumer
public static final String Queue_Name = "hello";
//接收消息
public static void main(String[] args) throws IOException, TimeoutException
//创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//工厂ip 连接mq
connectionFactory.setHost("172.20.10.6");
connectionFactory.setUsername("ljw");
connectionFactory.setPassword("666666");
Connection connection = connectionFactory.newConnection();
//连接需要通过信道接受消息
Channel channel = connection.createChannel();
//声明 接收消息
DeliverCallback deliverCallback = (var1, var2) ->
//将byte数组转化为String类型
String message = new String(var2.getBody(), StandardCharsets.UTF_8);;
System.out.println(message);
;
//声明 取消消息
CancelCallback cancelCallback=(var1)->
System.out.println("消费被中断");
;
//1.消费哪个队列
//2.消费成功以后是否自动应答
//3.消费者收到消息的回调
//4.消费者取消消费的回调
channel.basicConsume(Queue_Name, true,deliverCallback, cancelCallback);
综上,可以将获取信道的过程封装到工具类里面:
public class RabbitUtils
public Channel getChannel() throws IOException, TimeoutException
//创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//工厂ip 连接mq
connectionFactory.setHost("172.20.10.6");
connectionFactory.setUsername("ljw");
connectionFactory.setPassword("666666");
Connection connection = connectionFactory.newConnection();
//连接需要通过信道接受消息
Channel channel = connection.createChannel();
return channel;
工作队列:
- 概念:避免立即执行资源密集型任务,将这些任务封装成消息在后台执行,可以由多个线程一起处理。那么多个线程操作的时候需要保证一条消息只能被处理一次,各个线程轮询工作。
- 测试的时候可以设置允许并行操作,模拟多个线程---同时跑两遍;
- 此时需要生产者发送大量消息:
/** * 生产者 */ public class Producer public static final String QUEUE_NAME="hello"; public static void main(String[] args) throws IOException, TimeoutException Channel channel = RabbitUtils.getChannel(); //通过信道获取队列 //1.队列名、队列中的消息是否需要持久化---默认存内存,持久化后存磁盘; //2.是否进行消息共享,是否可以被多个消费者共享,true:不共享,false:共享; //3.是否自动删除 channel.queueDeclare(QUEUE_NAME,false,false,false,null); String message="hello world"; //1.交换机 //2.路由的key Scanner scanner = new Scanner(System.in); while (scanner.hasNext()) scanner=new Scanner(System.in); channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8)); System.out.println("消息发送完毕");
运行会发现,消费者是轮询消费多条消息的;
消息应答:
- 如果没有消息应答那么rabbitmq一旦向消费者传递了一条信息,就会立刻将该条消息标记为删除,那么如果一旦有一个消费者挂掉了,那么就会丢失消息;那么为了保证消息在发送过程中不丢失,就产生了消息应答机制,即:消费者在接收到消息并且处理消息之后,会告诉rabbitmq处理好了,此时rabbitmq可以将消息删除了。
-
自动应答:
-
消息发送以后立即被认为已经传送成功,这种情况下,如果消息在接收到之前,消费者的channel关闭了,那么消息就会丢失;另一方面,这种情况没有对传递消息的数量进行限制,那么可能会导致消费者来不及处理消息,导致消息积压内存耗尽,最终使得消费者线程被操作系统杀死。所以这种应答方式仅适用于消费者可以高效并以某种速率处理这些消息的情况下。
-
手动应答:
- Channel.basicAck(用于肯定确认):消费者已经接收到消息并且成功处理了消息,rabbitmq可以丢弃该条消息了。
- Channel.basicNack(用于否认确认):
- Channel.basicReject(用于否认确认):比Channel.basicNack少一个参数--是否批量处理,表示不处理该消息直接拒绝,rabbitmq可以丢弃该条消息了。
- 批量处理:如果此时channel中有多条未应答消息,批量处理为true的话,那么这多条消息都会收到消息应答,如果批量处理为false的话,那么只有最新的一条未应答消息收到消息应答。批量处理的话有可能导致后续消息处理失败,但是rabbitmq已经收到应答从而导致消息的丢失,所以不推荐使用批量处理。
-
消息自动重新入队:
- 如果消费者由于某些原因失去连接(如channel关闭了),导致消息未发送ack确认,那么rabbitmq将会知道消息没有完全处理,就会将消息重新排队,此时如果有其他消费者可以处理,那么将会重新分发给其他消费者,这样就不会导致消息的丢失。
-
代码部分:
- 生产者:
public class Producer //队列名称 private static final String QUEUE_NAME = "ack_queue"; public static void main(String[] args) throws IOException, TimeoutException Channel channel = RabbitUtils.getChannel(); //申明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); Scanner scanner = new Scanner(System.in); String message = "hello world"; while (scanner.hasNext()) scanner = new Scanner(System.in); channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8)); System.out.println("消息已发送");
- 消费者:
public class Consumer01 //队列名称 private static final String QUEUE_NAME = "ack_queue"; public static void main(String[] args) throws IOException, TimeoutException Channel channel = RabbitUtils.getChannel(); System.out.println("C1等待接收消息处理时间较短"); //这里写收到消息后如何消费 DeliverCallback deliverCallback = (var1, var2) -> try Thread.sleep(10000L); catch (InterruptedException e) e.printStackTrace(); String message = new String(var2.getBody(), StandardCharsets.UTF_8); System.out.println("接收到的消息是:"+message); //手动应答 //1.消息的标记 tag -->envelope是属性 //2.批量应答 channel.basicAck(var2.getEnvelope().getDeliveryTag(),false); ; //声明 取消消息 CancelCallback cancelCallback=(var1)-> System.out.println("消费被中断"); ; //关闭自动应答,采用手动应答 boolean autoAck=false; channel.basicConsume(QUEUE_NAME,autoAck,deliverCallback,cancelCallback);
public class Consumer02 //队列名称 private static final String QUEUE_NAME = "ack_queue"; public static void main(String[] args) throws IOException, TimeoutException Channel channel = RabbitUtils.getChannel(); System.out.println("C2等待接收消息处理时间较长"); //这里写收到消息后如何消费 DeliverCallback deliverCallback = (var1, var2) -> try Thread.sleep(300000L); catch (InterruptedException e) e.printStackTrace(); String message = new String(var2.getBody(), StandardCharsets.UTF_8); System.out.println("接收到的消息是:"+message); //手动应答 //1.消息的标记 tag -->envelope是属性 //2.批量应答 channel.basicAck(var2.getEnvelope().getDeliveryTag(),false); ; //声明 取消消息 CancelCallback cancelCallback=(var1)-> System.out.println("消费被中断"); ; //关闭自动应答,采用手动应答 boolean autoAck=false; channel.basicConsume(QUEUE_NAME,autoAck,deliverCallback,cancelCallback);
测试后发现,如果在消息处理的过程中某个消费者挂掉了,那么该条消息会重新入队并且很快分给其他消费者;并且如果开启手动应答以后,会在手动应答执行以后才将消息从队列里删除。
RabbitMQ持久化:
- 以上是保证了传给消费者进行处理的过程中消息不丢失,那么如果是rabbitmq服务停掉以后,生产者发来消息,这个消息如何保证不丢失呢?那么也就是将队列和消息标记为持久化。
-
队列的持久化:
-
将队列申明中的durable参数改为true,那么也就是开启持久化;如果没有开启持久化,那么重启mq以后该队列就不存在了,但是持久化的仍旧存在。
- 特别注意:如果是已经创建了没有开启持久化的队列,那么需要将其删除重新创建持久化的队列,否则会报错;
//申明队列 //第二个参数即durable,true则为开启持久化 channel.queueDeclare(QUEUE_NAME, true, false, false, null);
(开启持久化的队列在feature一列会显示“D”)
-
消息持久化:
-
是在生产者发布消息的时候开启持久化,也就是在props参数加上开启持久化属性(MessageProperties.PERSISTENT_TEXT_PLAIN)。
- 特别注意:将消息标记为持久化并不能完全保证不会丢失消息,这里会存在消息刚准备保存在磁盘的时候,但还没有储存完就宕机了,消息在缓存的一个间隔点,也就是没有真正的写进磁盘,如果需要更强的持久化策略,需要参考发布确认。
//设置开启消息持久化(MessageProperties.PERSISTENT_TEXT_PLAIN) channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes(StandardCharsets.UTF_8));
发布确认:
-
不公平发布:
-
轮询分发相当于是一种公平发布,在这种情况下,如果一个消费者处理消息特别快,一个消费者处理消息特别慢,那么处理消息快的消费者大部分时间是空闲的,而处理消息慢的消费者则是一直处于工作状态,所以这样的发布方式不是非常合理,那么为了避免这种情况,我们可以在每个消费者端设置参数channel.basicQos(int prefetchCount=1)(默认是0,也就是轮询分发),从而变为不公平发布,也就相当于能者多劳,也就是一个消费者只能同一时刻只能处理一个消息,要是目前的消息还没处理好,就不会分给他新的消息;注意:此设置需要在手动应答的时候才生效;
-
预取值:
- 也就是指定每个消费者分到几条消息,分配合理可以提高效率;
//consumer01 int prefetchCount=3; channel.basicQos(prefetchCount);
//consumer02 int prefetchCount=3; channel.basicQos(prefetchCount);
-
发布确认原理:
- 发布确认就是存在磁盘上完成以后,mq告知生产者。
- 开启发布确认就可以保证持久化,真的存储在磁盘上(前提是开启了队列、消息的持久化)。
- 开启发布确认:需要在生产者的channel上调用channel.confirmSelect();
//开启发布确认 channel.confirmSelect();
-
单个发布确认:
- 这是一种同步发布确认的方式,发布消息后只有被确认了才会发布下一条;如果指定时间内没有被确认,那么就会抛出异常;缺点:发布速度特别慢。
//单个确认 public static void publishMessageIndividually() throws IOException, TimeoutException, InterruptedException Channel channel = RabbitUtils.getChannel(); //通过uuid获取一个随机的队列名 String queueName = UUID.randomUUID().toString(); channel.queueDeclare(queueName, true, false, false, null); //开启发布确认 channel.confirmSelect(); //开始时间 long begin = System.currentTimeMillis(); //批量发消息 for (int i=0;i<MESSAGE_COUNTS;i++) String message=i+""; channel.basicPublish("",queueName, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes(StandardCharsets.UTF_8)); //单个消息就发布确认 boolean flag = channel.waitForConfirms(); if (flag) System.out.println("发布成功"); //结束时间 long end = System.currentTimeMillis(); System.out.println("发布"+MESSAGE_COUNTS+"个单独发布确认消息用时:"+(end - begin)+"ms");
-
单个确认发布耗时如图:
-
批量发布确认:
- 发布一批消息然后一起确认,提高了吞吐量;缺点:如果发布出现问题,就不知道是哪个消息出现的问题。
//批量确认 public static void publishMessageBatch() throws IOException, TimeoutException, InterruptedException Channel channel = RabbitUtils.getChannel(); //通过uuid获取一个随机的队列名 String queueName = UUID.randomUUID().toString(); channel.queueDeclare(queueName, true, false, false, null); //开启发布确认 channel.confirmSelect(); //开始时间 long begin = System.currentTimeMillis(); //设置批量确认消息的数量 int batchCount = 100; //批量发消息 for (int i = 1; i <= MESSAGE_COUNTS; i++) String message = i + ""; channel.basicPublish("", queueName, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes(StandardCharsets.UTF_8)); //判断每到达100条的时候,批量确认一次 if (i % batchCount == 0) boolean flag = channel.waitForConfirms(); if (flag) System.out.println("发布成功"); //结束时间 long end = System.currentTimeMillis(); System.out.println("发布" + MESSAGE_COUNTS + "个批量发布确认消息用时:" + (end - begin) + "ms");
代码如图
-
批量确认发布耗时如图:
-
异步发布确认:
- 这是由broker利用回调函数达到可靠性传递,不再由生产者进行确认,但是生产者和需要进行监听(channel.addConfirmListener(),两个参数的,这样可以监听成功和失败的);
//异步发布确认 public static void publishMessageAsync() throws IOException, TimeoutException, InterruptedException Channel channel = RabbitUtils.getChannel(); //通过uuid获取一个随机的队列名 String queueName = UUID.randomUUID().toString(); channel.queueDeclare(queueName, true, false, false, null); //开启发布确认 channel.confirmSelect(); //开始时间 long begin = System.currentTimeMillis(); //设置批量确认消息的数量 int batchCount = 100; //消息确认成功 回调函数 ConfirmCallback ackCallback=(var1,var3)-> System.out.println("已确认消息:"+var1); ; //消息确认失败,回调函数 ConfirmCallback nackCallback=(var1,var3)-> System.out.println("未确认消息:"+var1); ; //准备消息监听器,发送的时候就开始监听--->异步的 channel.addConfirmListener(ackCallback,nackCallback); //批量发消息 for (int i = 1; i <= MESSAGE_COUNTS; i++) String message = i + ""; channel.basicPublish("", queueName, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes(StandardCharsets.UTF_8)); //结束时间 long end = System.currentTimeMillis(); System.out.println("发布" + MESSAGE_COUNTS + "个异步发布确认消息用时:" + (end - begin) + "ms");
- 批量确认发布耗时如图:
-
异步发布确认中未确认消息如何处理:
以上是关于Rabbitmq的一些笔记的主要内容,如果未能解决你的问题,请参考以下文章
RabbitMQ!女朋友看了都会的超详细保姆级附源码笔记!看完还不会请砍我!
RabbitMQ!女朋友看了都会的超详细保姆级附源码笔记!看完还不会请砍我!
RabbitMQ!女朋友看了都会的超详细保姆级附源码笔记!看完还不会请砍我!