rabbitmq详解

Posted hmb↑

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了rabbitmq详解相关的知识,希望对你有一定的参考价值。

rabbitmq

一、简介

RabbitMQ是一个实现了AMQP(Advanced Message Queuing Protocol)高级消息队列协议的消息队列服务,用Erlang语言。是面向消息的中间件。

你可以把它想像成一个邮局:你把信件放入邮箱,邮递员就会把信件投递到你的收件人处。在这个比喻中,RabbitMQ是一个邮箱、邮局、邮递员。RabbitMQ和邮局的主要区别是,它处理的不是纸,而是接收、存储和发送二进制的数据——消息。

主要流程:生产者(Producer)与消费者(Consumer)和 RabbitMQ 服务(Broker)建立连接, 然后生产者发布消息(Message)同时需要携带交换机(Exchange) 名称以及路由规则(Routing Key),这样消息会到达指定的交换机,然后交换机根据路由规则匹配对应的 Binding,最终将消息发送到匹配的消息队列(Quene),最后 RabbitMQ 服务将队列中的消息投递给订阅了该队列的消费者(消费者也可以主动拉取消息)。

二、业务场景

1、异步

如: 用户注册发送,注册邮件、注册短信,
传统做法

1、串行 (先发送邮件、再发短信)。问题:持续时间长

2、并行(将注册信息写入数据库后,同时发送邮件、短信),速度快、但不能满足高吞吐需求。

消息队列做法
将数据写入数据库、同时发送消息给发送邮件和注册,异步处理

2、应用解耦

如:双十一购物节,用户下单后、订单系统通知库存系统。

传统做法:
订单系统调用库存系统接口。问题:库存接口故障,订单就会失败,而损失大量订单

消息队列做法

订单系统:下单,订单系统完成持久化,将消息写入队列,返回下单成功给用户
库存系统:订阅下单的消息,获取下单消息,进行库操作,就算库存系统故障,消息队列也能保证消息可靠投递,不会导致消息丢失。

3、流量削峰

如:秒杀活动、一般会因为流量过大,导致应用挂掉,一般在应用前端加入消息队列。

作用:1、可以控制活动人数,超过一定阈值,订单直接丢弃
2、可以缓解短时间的高流量压垮应用(应用程序按自己的最大处理能力获取订单)

消息队列做法
1、用户的请求,服务器收到后,首先写入消息队列,加入消息队列长度最大值,则直接抛弃用户请求或跳转到错误页面
2、秒杀业务根据消息队列中的请求信息,再做后续处理

三、下载

1、docker 安装 rabbitmq

docker pull rabbitmq:3.7.7-management

2、​​启动镜像(用户名和密码设置为 guest guest)

docker run -dit --name rabbitmq3.7.7 -e RABBITMQ_DEFAULT_USER=guest -e RABBITMQ_DEFAULT_PASS=guest  -v /home/rabbitmq/data:/var/lib/rabbitmq   -p 15672:15672 -p 5672:5672 rabbitmq:3.7.7-management

3、访问 rabbitmq 管理界面

http://127.0.0.1:15672 账号密码都是 guest

4、docker 安装 rabbitMQ 延时队列插件(delayed_message_exchange)

下载解压文件 链接:https://pan.baidu.com/s/1PpeOn8NJT4hgh7ZBP0J0OA?pwd=u2gu
提取码:u2gu

拷贝插件文件到 rabbitMQ 的 Docker 容器中
先解压

unzip  unzip rabbitmq_delayed_message_exchange-20171201-3.7.x.zip 

拷贝插件

docker cp rabbitmq_delayed_message_exchange-20171201-3.7.x.ez rabbitmq3.7.7:/plugins

进入容器:

docker ps  // 查看启动容器信息
docker exec -it 镜像ID /bin/bash    //开启进入终端

查看插件列表

rabbitmq-plugins list

启用插件

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

四、界面认识

1、概要

2、连接

3、通道

4、交换机

交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列

Type解释
direct它会把消息路由到那些 binding key 与 routing key 完全匹配的 Queue 中
fanout它会把所有发送到该 Exchange 的消息路由到所有与它绑定的 Queue 中
headersheaders 类型的 Exchange 不依赖于 routing key 与 binding key 的匹配规则来路由消息,而是根据发送的消息内容中的 headers 属性进行匹配。(headers 类型的交换器性能差,不实用,基本不会使用。)
topic与direct模型相比,多了个可以使用通配符!,这种模型Routingkey一般都是由一个或多个单词组成,多个单词之间以"."分割,例如:item.insert ---------星号 匹配一个1词 , 例audit.* ------- #号匹配一个或多个词 audit.#
x-delayed-message延迟交换机,可以延迟接收消息
Features解释
Dd 是 durable 的缩写,代表这个队列中的消息支持持久化
ADad 是 autoDelete 的缩写。代表当前队列的最后一个消费者退订时被自动删除。注意:此时不管队列中是否还存在消息,队列都会删除。
excl是 exclusive 的缩写。代表这是一个排他队列。如果一个队列被声明为排他队列,该队列仅对首次声明它的连接可见,并在连接断开时自动删除。这里需要注意三点:其一,排他队列是基于连接可见的,同一连接的不同信道是可以同时访问同一个连接创建的排他队列的。其二,“首次”,如果一个连接已经声明了一个排他队列,其他连接是不允许建立同名的排他队列的,这个与普通队列不同。其三,即使该队列是持久化的,一旦连接关闭或者客户端退出,该排他队列都会被自动删除的。这种队列适用于只限于一个客户端发送读取消息的应用场景。
Args是 arguments 的缩写。代表该队列配置了 arguments 参数。
TTL是 x-message-ttl 的缩写。设置队列中的所有消息的生存周期(统一为整个队列的所有消息设置生命周期), 也可以在发布消息的时候单独为某个消息指定剩余生存时间,单位毫秒。
ExpAuto Expire,是 x-expires 配置的缩写。当队列在指定的时间没有被访问(consume, basicGet, queueDeclare…)就会被删除,Features=Exp。注意这里是删除队列,不是队列中的消息。
Lim说明该队列配置了 x-max-length。限定队列的消息的最大值长度,超过指定长度将会把最早的几条删除掉。
Lim B说明队列配置了 x-max-length-bytes。限定队列最大占用的空间大小, 一般受限于内存、磁盘的大小。
DLX说明该队列配置了 x-dead-letter-exchange。当队列消息长度大于最大长度、或者过期的等,将从队列中删除的消息推送到指定的交换机中去而不是丢弃掉。
DLKx-dead-letter-routing-key 的缩写,将删除的消息推送到指定交换机的指定路由键的队列中去。
Prix-max-priority 的缩写,优先级队列。表明该队列支持优先级,先定义最大优先级值(定义最大值一般不要太大),在发布消息的时候指定该消息的优先级, 优先级更高(数值更大的)的消息先被消费。
Ovflx-overflow 的缩写。队列中的消息溢出时,如何处理这些消息。要么丢弃队列头部的消息,要么拒绝接收后面生产者发送过来的所有消息。有两个配置项:drop-head,代表丢弃队列头部的消息,默认行为;reject-publish 设置队列中的消息溢出后,该队列的行为:”拒绝接收”(所有消息)。
ha-all镜像队列。all 表示镜像到集群上的所有节点,ha-params 参数忽略。

5、队列


点击名称进去,可以看到队列的详细信息

get Message可以看到消息的内容

arguments具体参数如下:

参数名作用
x-message-ttl发送到队列的消息在丢弃之前可以存活时间(毫秒)
x-max-length队列最大长度
x-expires队列在被自动删除(毫秒)之前可以使用多长时间
x-max-length-bytes消息容量限制,该参数是非负整数值。该参数和x-max-length目的一样限制队列的容量,但是这个是靠队列大小(bytes)来达到限制。
x-dead-letter-exchange设置队列溢出行为。这决定了在达到队列的最大长度时消息会发生什么。有效值为drop-head或reject-publish。交换的可选名称,如果消息被拒绝或过期,将重新发布这些名称
x-dead-letter-routing-key可选的替换路由密钥,用于在消息以字母为单位时使用。如果未设置,叫使用消息的原始路由密钥
x-max-priority队列支持的最大优先级数;如果未设置,队列将不支持消息优先级
x-queue-mode将队列设置为延迟模式,在磁盘上保留尽可能多的消息以减少内存使用,如果未设置,队列将保留内存缓存以尽快传递消息
x-queue-master-locator将队列设置为主位置模式,确定在节点集群上声明时队列主机所在的规则

6、用户

就是添加用户和设置用户权限

五、五种模型示例

0、springboot依赖配置

依赖

<!-- amqp依赖,包含Rabbitmq-->
<dependency>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-amqp</artifactId>
 </dependency>

yml配置

spring:
  application:
    name: rabbitmq
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    virtual-host: /

1、Hello World简单模型

一对一消费,只有一个消费者能接收到

消费者

@Component
public class HolloWordListener 
	// @RabbitListener(queues = ("simple.queue")) // queues需手动先创建队列
	@RabbitListener(queuesToDeclare = @Queue("simple.queue"))  // queuesToDeclare 自动声明队列
	public void holloWordListener(String message)
		System.out.println("message = " + message);
	

生产者

	@Autowired
	private RabbitTemplate rabbitTemplate;

	@Test
	public void testSimpleQueue() 
		String queueName = "simple.queue"; // 队列名称
		String message = "heel,simple.queue"; // 要发送的消息
		rabbitTemplate.convertAndSend(queueName,message);
	

2、Work queues工作队列

多个消费者,你一个我一个分配消费消息,有预取机制,默认公平消费,可配置能者多劳模式,谁完成的快,谁多做一点

消费者

@Component
public class WoekWordListener 

	@RabbitListener(queuesToDeclare = @Queue("workQueue")) // queuesToDeclare 自动声明队列
	public void holloWordListener(String message) throws InterruptedException 
		Thread.sleep(200);
		System.out.println("message1 = " + message);
	
	
	@RabbitListener(queuesToDeclare = @Queue("workQueue")) // queuesToDeclare 自动声明队列
	public void holloWordListener1(String message) throws InterruptedException 
		Thread.sleep(400);
		System.out.println("message2 = " + message);
	

生产者

	@Autowired
	private RabbitTemplate rabbitTemplate;
	
	@Test
	public void testWorkQueue()
		String queueName = "workQueue";
		String message = "hello,work.queue__";
		for (int i = 0; i < 10; i++) 
			rabbitTemplate.convertAndSend(queueName,message+i);
			System.out.println("i = " + i);
		
	

取消预取机制,能者多劳配置

spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    virtual-host: /
    listener: 
      simple:
        prefetch: 1 # 每次只能获取一条,处理完成才能获取下一条

3、Publish/Subscribe发布订阅模型

发布订阅模式与之前案例的区别就是允许将同一消息发送给多个消费者。
实现方式是加入了exchange(交换机),注意:交换机是不缓存消息的

使用fanout交换机,会将接收到的消息路由到每一个跟其绑定的queue(队列)

消费者

// 消费者直接绑定交换机,指定类型为fanout
@Component
public class FanoutExchangeListener 
	// 不指定队列,消息过了就没了
	//  @RabbitListener(bindings = @QueueBinding(value = @Queue,exchange = @Exchange(value = "fanoutTest",type = ExchangeTypes.FANOUT)))

	// 指定队列,可以接收缓存到队列里的消息
	@RabbitListener(bindings = @QueueBinding(value = @Queue(value ="test",durable = "true" ),exchange = @Exchange(value = "fanoutTest",type = ExchangeTypes.FANOUT)))
	public void reveivel(String message)
		System.out.println("message = " + message);
	

	@RabbitListener(bindings = @QueueBinding(value = @Queue,exchange = @Exchange(value = "fanoutTest",type = ExchangeTypes.FANOUT)))
	public void reveivel2(String message)
		System.out.println("message1 = " + message);
	

生产者

	@Autowired
	private RabbitTemplate rabbitTemplate;
	
	@Test
	public void tesyPubSubQueue()
		// 参数1:交换机名称 , 参数2routingKey,(fanout类型可不写) , 参数3,消息内容
		rabbitTemplate.convertAndSend("fanoutTest","","消息内容");
	

4、Routing路由模型

routing模型也是将消息发送到交换机

使用的是Direct类型的交换机,会将接收到的消息根据规则路由到指定的Queue(队列),因此称为路由模式

消费者

// 消费者直接绑定交换机,指定类型为direct,并指定key表示能消费的key
@Component
public class RoutingExchangeListener 

	// 不指定队列,消息过了就没了
	//  @RabbitListener(bindings = @QueueBinding(value = @Queue,exchange = @Exchange(value = "direstTest",type = ExchangeTypes.DIRECT),key = "info","error"))
	
	// 指定队列,可以接收缓存到队列里的消息
	// key = "info","error" 表示我能接收到routingKey为 info和error的消息
	@RabbitListener(bindings = @QueueBinding(value = @Queue(value ="test1",durable = "true" ),exchange = @Exchange(value = "direstTest",type = ExchangeTypes.DIRECT),key = "info","error"))
	public void receivel(String message)
		System.out.println("message = " + message);
	
	// key = "error" 表示我只能接收到routingKey为 error的消息
	@RabbitListener(bindings = @QueueBinding(value = @Queue,exchange = @Exchange(value = "direstTest",type = ExchangeTypes.DIRECT),key = "error"))
	public void receivel1(String message)
		System.out.println("message1 = " + message);
	

生产者

	@Autowired
	private RabbitTemplate rabbitTemplate;
	
	// 路由模型
	@Test
	public void direstExchangeTest()
		rabbitTemplate.convertAndSend("direstTest","info","发送info的key的路由消息");
	
	// 路由模型
	@Test
	public void direstExchangeTest1()
		rabbitTemplate.convertAndSend("direstTest","error","发送error的key的路由消息");
	

5、Topics主题模型

topicExchange与directExchange类型,区别在于routingKey必须是多个单词的列表,并且以 . 分隔

*(代表通配符,任意一个字段)
#(号代表一个或多个字段)

消费者

@Component
public class TopicsExchangeListener 

	// 不指定队列,消息过了就没了
	//  @RabbitListener(bindings = @QueueBinding(value = @Queue,exchange = @Exchange(name = "topicList",type = ExchangeTypes.TOPIC),key = "user.save","user.*"))
	
	// 指定队列,可以接收缓存到队列里的消息
	// key = "user.save","user.*" 表示能消费 routingkey为  user.save 和 user.任意一个字符  的消息
	@RabbitListener(bindings = @QueueBinding(value = @Queue(value ="test2",durable = "true" ),exchange = @Exchange(name = "topicList",type = ExchangeTypes.TOPIC),key = "user.save","user.*"))
	public void recevicel(String message)
		System.out.println("message = " + message);
	
	// key = "order.#","user.*" 表示能消费 routingkey为  order.一个或多个字符   和  user.任意一个字符  的消息
	@RabbitListener(bindings = @QueueBinding(value = @Queue,exchange = @Exchange(name = "topicList",type = ExchangeTypes.TOPIC),key = "order.#","user.*"))
	public void recevicel1(String message)
		System.out.println("message1 = " + message);
	

生产者

	@Autowired
	private RabbitTemplate rabbitTemplate;
	
	@Test
	public void topicTest()
		rabbitTemplate.convertAndSend("topicTest","user.save","topic路由消息,use.save");
	
	
	@Test
	public void topicTest1()
		rabbitTemplate.convertAndSend("topicTest","order.select.getone","topic路由消息,order.select.getone");
	

6、消息转换器

代码里直接发送对象,虽然接收的到消息,但是rabbitmq的界面上看到的消息会是乱码

依赖

 <dependency>
     <groupId>com.fasterxml.jackson.dataformat</groupId>
     <artifactId>jackson-dataformat-xml</artifactId>
     <version>2.9.10</version>
 </dependency>

配置

@Configuration
public class rabbitmqConfig 
 	// 消息转换配置
	@Bean
	public MessageConverter jsonMessageConverter()
		return new Jackson2JsonMessageConverter();
	

再次发送就会是转换好的消息

六、进阶

1、基于插件延迟队列

延迟队列非常常用且好用,可以将消息发送后使消费者延迟接收

RabbitAdmin配置

RabbitAdmin是用于对交换机和队列进行管理,用于创建、绑定、删除队列与交换机,发送消息的组件。

import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitAdminConfig 

		@Value("$spring.rabbitmq.host")
		private String host;
		@Value("$spring.rabbitmq.username")
		private String username;
		@Value("$spring.rabbitmq.password")
		private String password;
		@Value("$spring.rabbitmq.virtualhost")
		private String virtualhost;
		@Bean
		public ConnectionFactory connectionFactory()
			CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
			connectionFactory.setAddresses(host);
			connectionFactory.setUsername(username);
			connectionFactory.setPassword(password);
			connectionFactory.setVirtualHost(virtualhost);
			return connectionFactory;
		
		@Bean
		public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory)
			RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
			rabbitAdmin.setAutoStartup(true);
			return rabbitAdmin;
		

封装发送延迟队列工具类

import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;

@Component
public class DelayedQueue 
	// routingKey
	private static final String DELAYED_ROUTING_KEY = "delayed.routingkey";
	// 延迟队列交换机
	private static final String DELAYED_EXCHANGE = "delayed.exchange";
	@Autowired
	RabbitTemplate rabbitTemplate;
	@Resource
	RabbitAdmin rabbitAdmin;


	/**
	 * 发送延迟队列
	 * @param queueName 队列名称
	 * @param params 消息内容
	 * @param expiration 延迟时间 毫秒
	 */
	public void sendDelayedQueue(String queueName, Object params, Integer expiration) 
		// 先创建一个队列
		Queue queue = new Queue(queueName);
		rabbitAdmin.declareQueue(queue);
		// 创建延迟队列交换机
		CustomExchange customExchange = createCustomExchange();
		rabbitAdmin.declareExchange(customExchange);
		// 将队列和交换机绑定
		Binding binding &

以上是关于rabbitmq详解的主要内容,如果未能解决你的问题,请参考以下文章

MQ 死信队列/延迟队列-( 商品秒杀后30分钟之内付款)

rabbitmq详解

RabbitMQ死信队列

rabbitmq死信队列及延迟队列

RabbitMQ—SpringBoot中实现死信队列

RabbitMQ 中的死信死信消息