RabbitMQ 学习笔记
Posted dingwen_blog
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ 学习笔记相关的知识,希望对你有一定的参考价值。
1.中间件
中间件(
Middleware
)是处于操作系统和应用程序之间的软件,也有人认为它应该属于操作系统中的一部分。人们在使用中间件时,往往是一组中间件集成在一起,构成一个平台(包括开发平台和运行平台),但在这组中间件中必须要有一个通信中间件,即中间件+平台+通信,这个定义也限定了只有用于分布式系统中才能称为中间件,同时还可以把它与支撑软件和使用软件区分开来。
1.1 特点
- 满足大量应用的需要
- 运行于多种硬件和 OS平台
- 支持分布计算,提供跨网络、硬件和 OS平台的透明性的应用或服务的交互
- 支持标准的协议
- 支持标准的接口
1.2消息中间件
利用可靠的消息传递机制进行系统和系统直接的通讯,通过提供消息传递和消息的派对机制,它可以在分布式系统环境下扩展进程间的通讯,是一种接受数据、接受请求、存储数据、发送数据等功能的技术服务。
1.3 消息组成部分
- 协议
- 持久化机制
- 分发策略
- 容错
1.4协议
- 计算机底层操作系统和应用程序通讯时共同遵守的一组约定,只有遵循共同的约定和规范,系统和底层操作系统之间才能相互交流2. 和一般的网络应用程序的不同它主要负责数据的接受和传递,所以性能比较的高3. 协议对数据格式和计算机之间交换数据都必须严格遵守规范。
1.5 协议组成
- 语法:语法是用户数据与控制信息的结构与格式,以及数据出现的顺序
- 语义:语义是解释控制信息每个部分的意义,它规定了需要发出何种控制信息,以及完成的动作与做出什么样的响应
- 时序:时序是对事件发生顺序的详细说明
1.6 AMQP
AMQP:(全称:Advanced Message Queuing Protocol)是高级消息队列协议。由摩根大通集团联合其他公司共同设计。是一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。(支持者: RabbitMQ ActiveMQ)
1.7 MQTT
MQTT协议(Message Queueing Telemetry Transport)消息队列是 IBM开放的及时通讯协议,物联网系统架构中的重要组成部分。支持者: RabbitMQ ActiveMQ)
- 轻量
- 结构简单
- 传输快
- 不支持事务
- 不支持持久化
1.8 OpenMessage
是近几年由阿里、雅虎和滴滴出行、Stremalio等公司共同参与创立的分布式信息中间件、流处理等领域的应用开发标准。
- 结构简单
- 解析速度快
- 事务
- 持久化
1.9 Kafka
Kafka协议是基于 TCP/IP的二进制协议。消息内部是 通过长度来分割,由一些基本数据类型组成。
- 结构简单
- 解析速度快
- 无事务支持
- 有持久化设计
2. 安装
2.1 rpm
安装
https://blog.csdn.net/qq_38020915/article/details/117957570
2.2 docker
安装
https://blog.csdn.net/qq_38020915/article/details/117957001
3.RabbitMQ
3.1 核心概念
- Server:又称Broker,接收客户端连接,实现AMQP服务。
- Connection: 应用程序和Broker的网络连接。TCPIP三次握手四次握手
- Channel: 网络信道,对应一个会话任务,进行消息的读写
- Message: 消息,服务与应用程序之间传送的数据,由Properties(对消息的描述)和body(消息)组成
- Virtual Host: 虚拟地址,用于进行逻辑隔离
- Queue: 队列
- Exchange: 交换机,根据路由键分发到队列
- Bindings: 交换机和队列之间的虚拟连接
- Routing key: 路由规则
3.2 组成
3.3运行流程
3.4 消息模型
-
简单模式
-
工作模式
-
公平分发(能者多劳)
-
producer
跟简单模式一样
-
consumer
-
maven
//简单模式 public class Consumer{ //3.接受内容 //指标定义出来 channel.basicQos(1); channel.basicConsume("queue1",false,new DefaultConsumer(){ public void handle(String consumerTag, Delivery message) throws IOException { System.out.println(new String("收到消息是" + new String(meassage.getBody()),"UTF-8")); //改成手动应答 channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false); },new CancelCallback(){ public void handle(String consumerTag) throws IOException { System.out.println("接受失败了"); } }); //4.关闭 channel.close(); connection.close(); }
-
springboot
# TODO 公平分发 0:(默认)轮询 1:公平分发 # listener: # direct: # prefetch: 1 # acknowledge-mode: manual
-
-
-
轮询
跟简单模式一样。
-
-
发布订阅模式
-
路由模式
-
主题模式
-
参数模式
-
发布确认模式
3.5 使用场景、解决问题
- 线程池异步处理(削峰)
- 解耦(模块分离,高内聚,低耦合)
- 消息可靠(持久化)
- 高可用(集群)
3.6 SpringBoot 2.x
整合使用
3.6.1 依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.dingwen</groupId>
<artifactId>stu-spr-boo-rab</artifactId>
<version>1.0-SNAPSHOT</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.dingwen.rabcon</groupId>
<artifactId>rab-con</artifactId>
<version>1.0-SNAPSHOT</version>
<name>rab-con</name>
<description>rab-con</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<!--rabbitmq -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>
3.6.2 配置(fanout)
package com.dingwen.rabcon.config;
import com.dingwen.rabcon.constant.RabbitConstant;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* rabbitmq 配置类
*
* @author dingwen
* 2021.06.17 10:36
*/
@Configuration
public class RabbitmqConfiguration {
/**
* 声明fanout模式的交换机
*
* @return {@link FanoutExchange}
*/
@Bean
public FanoutExchange fanoutExchange() {
// 持久化、关闭自动删除
return new FanoutExchange(RabbitConstant.FANOUT_EXCHANGE_NAME, true, false);
}
/**
* 短信任务队列
*
* @return {@link Queue}
*/
@Bean
public Queue smsQueue() {
// 持久化、排他性队列、关闭自动删除
return new Queue(RabbitConstant.FANOUT_QUEUE_SMS_NAME);
}
/**
* 邮件任务队列
*
* @return {@link Queue}
*/
@Bean
public Queue emailQueue() {
// 持久化、排他性队列、关闭自动删除
return new Queue(RabbitConstant.FANOUT_QUEUE_EMAIL_NAME);
}
/**
* 短信队列绑定到交换机
*
* @return {@link Binding}
*/
@Bean
public Binding smsBinding() {
return BindingBuilder.bind(smsQueue()).to(fanoutExchange());
}
/**
* 邮箱队列绑定到交换机
*
* @return {@link Binding}
*/
@Bean
public Binding emailBinding() {
return BindingBuilder.bind(emailQueue()).to(fanoutExchange());
}
}
3.6.3 producer
package com.dingwen.rabpro.service.impl;
import com.dingwen.rabpro.constant.RabbitConstant;
import com.dingwen.rabpro.service.OrderService;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.UUID;
/**
* order service impl
*
* @author dingwen
* 2021.06.17 11:11
*/
@Service
public class OrderServiceImpl implements OrderService {
private final RabbitTemplate rabbitTemplate;
@Autowired
public OrderServiceImpl(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
@Override
public void placeOrder() {
rabbitTemplate.convertAndSend(RabbitConstant.FANOUT_EXCHANGE_NAME, null, UUID.randomUUID().toString());
}
}
3.6.4 consumer
package com.dingwen.rabcon.service.impl;
import com.dingwen.rabcon.constant.RabbitConstant;
import com.dingwen.rabcon.service.EmailService;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
import java.util.Date;
/**
* email service impl
*
* @author dingwen
* 2021.06.17 11:22
*/
@Service
@RabbitListener(queues = {RabbitConstant.FANOUT_QUEUE_EMAIL_NAME})
public class EmailServiceImpl implements EmailService {
@Override
@RabbitHandler
public void sendEmail(String message) {
System.out.println("E: message = " + message);
}
}
3.6.5 topic
匹配规则
- “*”: 只有一个一级
- “#”: 至少一个一级
3.7 持久化
3.7.1 消息什么时候需要持久化
- 消息本身在publish的时候就要求消息写入磁盘
- 内存紧张,需要将部分内存中的消息转移到磁盘
3.7.2 消息什么时候会刷到磁盘
- 写入文件前会有一个Buffer,大小为
1M
(1048576),数据在写入文件时,首先会写入到这个Buffer,如果Buffer已满,则会将Buffer写入到文件(未必刷到磁盘) - 有个固定的刷盘时间:
25ms
,也就是不管Buffer满不满,每隔25ms
,Buffer里的数据及未刷新到磁盘的文件内容必定会刷到磁盘 - 每次消息写入后,如果没有后续写入请求,则会直接将已写入的消息刷到磁盘:使用Erlang的receive x after 0来实现,只要进程的信箱里没有消息,则产生一个timeout消息,而timeout会触发刷盘操作
3.7.3 消息保存到磁盘个格式
消息保存于
$MNESIA/msg_store_persistent/x.rdq
文件中,其中x为数字编号,从1开始,每个文件最大为16M
(16777216),超过这个大小会生成新的文件,文件编号加1。
3.7.4 文件何时删除
- 当所有文件中的垃圾消息(已经被删除的消息)比例大于阈值(GARBAGE_FRACTION = 0.5)时,会触发文件合并操作(至少有三个文件存在的情况下),以提高磁盘利用率
- publish消息时写入内容,
ack
消息时删除内容(更新该文件的有用数据大小),当一个文件的有用数据等于0时,删除该文件
3.7.5 存储位置
3.8 排他队列(Exclusive Queue)
只有自己可见的队列,即不允许其它用户访问
- 只对首次声明它的连接(Connection)可见
- 在其连接断开的时候自动删除(包括持久态队列)
- 是限制连接而不是通道
如果试图在一个不同的连接中重新声明或访问(如publish,consume)该排他性队列,会得到资源被锁定的错误
ESOURCE_LOCKED - cannot obtain exclusive access to locked queue 'UserLogin2'
3.9 阻塞队列
当队列为空后者队列慢的情况下回等待,当队列有空不会空是进行操作。
4.RabbitMQ
高级
4.1 过期时间
可对消息设置过期时间(TTL),表示消息在这个时间内可以被消费。消息过期后被删除或放入死信队列。可对单条消息进行设置,也可以对整个队列进行设置。如果两种方法同时使用则按照TTL小的为准。
4.1.1 消息TTL
package com.dingwen.rabpro.service.impl;
import com.dingwen.rabpro.constant.RabbitConstant;
import com.dingwen.rabpro.service.OrderService;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.UUID;
/**
* order service impl
*
* @author dingwen
* 2021.06.17 11:11
*/
@Service
public class OrderServiceImpl implements OrderService {
private final RabbitTemplate rabbitTemplate;
@Autowired
public OrderServiceImpl(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
@Override
public void placeOrder() {
// 测试消息过期时间
MessagePostProcessor messagePostProcessor = (message) -> {
MessageProperties messageProperties = message.getMessageProperties();
messageProperties.setContentEncoding("UTF-8");
// 过期时间 5000 毫秒
messageProperties.setExpiration("5000");
return message;
};
// fanout
rabbitTemplate.convertAndSend(RabbitConstant.FANOUT_EXCHANGE_NAME, null, UUID.randomUUID().toString(), messagePostProcessor);
// direct topic
// 同上需要指定路由key
}
}
4.1.2 队列TTL
package com.dingwen.rabcon.config;
import com.dingwen.rabcon.constant.RabbitConstant;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
/**
* rabbitmq 配置类
*
* @author dingwen
* 2021.06.17 10:36
*/
@Configuration
public class RabbitmqConfigurationFanout {
/**
* 声明fanout模式的交换机
*
* @return {@link FanoutExchange}
*/
@Bean
public FanoutExchange fanoutExchange() {
// 持久化、关闭自动删除
return new FanoutExchange(RabbitConstant.FANOUT_EXCHANGE_NAME, true, false);
}
/**
* 短信任务队列
*
* @return {@link Queue}
*/
@Bean
public Queue smsQueue() {
// 持久化、排他性队列、关闭自动删除
return new Queue(RabbitConstant.FANOUT_QUEUE_SMS_NAME);
}
/**
* 邮件任务队列
*
* @return {@link Queue}
*/
@Bean
public Queue emailQueue() {
// 持久化、排他性队列、关闭自动删除
// TTL队列测试
Map<String, Object> args = new HashMap<>(1);
// 整形 毫秒
args.put("x-message-ttl", 5000);
return new Queue(RabbitConstant.FANOUT_QUEUE_EMAIL_NAME, true, false, false以上是关于RabbitMQ 学习笔记的主要内容,如果未能解决你的问题,请参考以下文章