RabbitMQ 消息队列学习
Posted IT_Holmes
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ 消息队列学习相关的知识,希望对你有一定的参考价值。
文章目录
- 1. 备份交换机
- 2. Rabbitmq的 幂等性
- 3. rabbitmq的 优先级队列
- 4. rabbitmq的 惰性队列
- 5. rabbitmq的 集群
- 6. rabbitmq的 镜像队列
- 6. Haproxy实现高可用负载均衡
- 7. Federation Exchange 联邦交换机
- 8. Federation queue 联邦队列
- 9. Shovel(英文直译:铲子)
1. 备份交换机
添加参数给交换机联系上备份交换机:
ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME)
.durable(true) //持久化
.withArgument("alternate-exchange",BACKUP_EXCHANGE_NAME)//
.build()
配置类:
package com.itholmes.shopping.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* 发布确认高级 备份交换机
*/
@Configuration
public class ConfirmConfig
//交换机
public static final String CONFIRM_EXCHANGE_NAME = "confirm_exchange";
//队列
public static final String CONFIRM_QUEUE_NAME = "confirm_queue";
//RoutingKey
public static final String CONFIRM_ROUTING_KEY = "key1";
/**
* 备份交换机
*/
public static final String BACKUP_EXCHANGE_NAME = "backup_exchange";
/**
* 备份队列
*/
public static final String BACKUP_QUEUE_NAME = "backup_queue";
/**
* 报警队列
*/
public static final String WARNING_QUEUE_NAME = "warning_queue";
/**
* 声明交换机
* 这里需要将该交换机与备份交换机联系起来。
* 如果rabbitmq服务里面有了该交换机,现在已经该了配置,就要去rabbitmq服务删除该队列,重新创建。
*/
@Bean("confirmExchange")
public DirectExchange confirmExchange()
return ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME)
.durable(true) //持久化
.withArgument("alternate-exchange",BACKUP_EXCHANGE_NAME)//
.build();
//声明队列
@Bean("confirmQueue")
public Queue confirmQueue()
return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
//绑定
@Bean
public Binding queueBindingExchange(@Qualifier("confirmQueue") Queue confirmQueue,@Qualifier("confirmExchange") DirectExchange confirmExchange)
return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(CONFIRM_ROUTING_KEY);
/**
* 声明备份交换机
*/
@Bean("backupExchange")
public FanoutExchange backupExchange()
return new FanoutExchange(BACKUP_EXCHANGE_NAME);
/**
* 声明备份队列
*/
@Bean("backupQueue")
public Queue backupQueue()
return QueueBuilder.durable(BACKUP_QUEUE_NAME).build();
/**
* 声明报警队列
*/
@Bean("warningQueue")
public Queue warningQueue()
return QueueBuilder.durable(WARNING_QUEUE_NAME).build();
/**
* 备份队列的绑定
*/
@Bean
public Binding backupQueueBindingBackupExchange(@Qualifier("backupQueue") Queue backupQueue,@Qualifier("backupExchange") FanoutExchange backupExchange)
return BindingBuilder.bind(backupQueue).to(backupExchange);
/**
* 报警队列的绑定
*/
@Bean
public Binding warningQueueBindingBackupExchange(@Qualifier("warningQueue") Queue warningQueue,@Qualifier("backupExchange") FanoutExchange backupExchange)
return BindingBuilder.bind(warningQueue).to(backupExchange);
报警消费者:
package com.itholmes.shopping.consumer;
import com.itholmes.shopping.config.ConfirmConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* 报警消费者
*/
@Slf4j
@Component
public class WarningConsumer
//接受报警消息
@RabbitListener(queues = ConfirmConfig.WARNING_QUEUE_NAME)
public void receiveWarningMsg(Message message)
String msg = new String(message.getBody());
log.error("报警发现不可路由消息:",msg);
生产者:
- 发送一个有可用路由(routingKey),在发送一个不可用路由(routingKey)来测试判断。
package com.itholmes.shopping.controller;
import com.itholmes.shopping.config.ConfirmConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;
/**
* 发布确认高级:生产者 开始发消息 测试确认
*/
@Slf4j
@RestController
public class ProducerController
@Autowired
RabbitTemplate rabbitTemplate;
//发消息
@GetMapping("/sendMessage/message")
public void sendMessage(@PathVariable String message)
//对应回调接口的correlationData
CorrelationData correlationData = new CorrelationData("1");
rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME,
ConfirmConfig.CONFIRM_ROUTING_KEY,message,correlationData);
log.info("发送消息为:",message+"key1");
//模拟队列错误,专门绑定一个不存在的routingkey
CorrelationData correlationData2 = new CorrelationData("2");
rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME,
ConfirmConfig.CONFIRM_ROUTING_KEY+"2",message,correlationData2);
log.info("发送消息为:",message+"key2");
注意:
当回退消息和备份交换机同时配置,备份交换机优先级高。
2. Rabbitmq的 幂等性
幂等性就是:
- 就是用户对同一操作发起的一次请求或者多次请求的结果是一致的,不会因为多次点击而产生了副作用。
解决办法:
- 方法一:唯一ID+指纹码机制,利用数据库主键去重。
- 方法二:利用redis的原子性去实现。(最佳策略)
方法一:唯一ID+指纹码机制,利用数据库主键去重。
- 使用时间戳、UUID、或者mq的id来判断,每次消费消息时,用该唯一标识,判断该消息是否已经消费过。
- 指纹码介绍如下:
方法二:利用redis的原子性去实现。(最佳策略)
3. rabbitmq的 优先级队列
3.1 优先级队列的 原理
使用场景:
- 订单催付场景。
优先级队列的范围是0~255 , 越大越优先执行。
3.2 优先级队列 代码实现
页面配置最大优先级:
两个过程:给队列设置最大优先级,给消息设置优先级(不能超过最大优先级)。
代码实现:
-
在声明队列的时候,添加map类型参数,设置该队列最大优先级。
-
生产者消息添加优先级,配置Properties。
注意:
- 测试时,最好先把消费者停止了,这样生产者发送的消息才会在队列中堆积起来,之后启动消费者就能看到测试效果了。
4. rabbitmq的 惰性队列
惰性队列:将消息保存在磁盘中。
正常情况:消息是保存在内存中。
惰性队列,先将消息从磁盘读取到内存,然后在发送给消费者,所以相比较而言,很慢。
但是也有惰性队列的使用场景:
- 当消费者由于各种各样的原因,例如:消费者下线,宕机亦或者是由于维护而关闭,
从致使长时间内不能消费消息造成堆积时(也就是队列堆积很多了消息)
,这种情况就用到了惰性队列。
队列的两种模式:
- default模式,正常模式 。
- lazy模式,惰性模式。
页面添加惰性队列:
代码声明惰性队列:
惰性队列和正常队列的内存开销对比:
- 正常队列快,惰性队列慢。
- 内存开销,如下图,在消息庞大的时候,惰性队列的内存开销小很多!
5. rabbitmq的 集群
下图就是以第一个node1来主体,然后将node2和node3加入到node1中。
- 当然,也可以在node2,node3下面也可以添加mq节点,无论访问那个mq节点,都是相当于访问的这个集群。
首先,准备三台机器,每台机器安装好rabbitmq。
第一步:将他们都配置好vim /etc/hosts 文件的IP和地址名。
第二步:确保各个节点的cookie文件使用的是同一个值。
第三步:启动rabbitmq服务,三台机器都启动。
第四步:将node2和node3与node1联系起来。
第五步:查看集群状态。
第六步:为该集群创建账户。因为之前执行了rabbitmq的reset命令,所有的信息都被重置了,之前的用户也就没有了。
解除集群节点:
6. rabbitmq的 镜像队列
我们在集群里面创建队列,在node1节点创建队列,就在node1节点里面,并不会存在node2,node3中。
这样如果node1宕机了,该队列和队列里面的消息就没了。
这样就用到了镜像队列,将node1的队列复制一份到node2或者node3中。
这时候我们再次创建新队列,名称必须符合匹配规则。
该队列就会备份。在那个node节点备份,由服务器决定。
这样当正常队列出现问题后,镜像队列就会立刻补充上,达到备份效果。
6. Haproxy实现高可用负载均衡
之前所配置的代码,仅仅是连接了一个IP地址的node节点,如果该IP节点宕机了,那么代码就连接不到rabbitmq了。
Haproxy的使用原理图:
- 该方案适合当生产者无法实现连接多台IP服务器的场景。
- 当前rabbitmq集群有多台服务器,但是生产者只能访问其中一个节点,所以可以使用该方案进行架构。
7. Federation Exchange 联邦交换机
7.1 Federation Exchange 准备
实现上面的效果就可以通过Federation Exchange 联邦交换机实现。
开启federation的插件。
安装完成后,服务页面就多了如下部分:
北京broker同步给深圳,那么北京就是上流,深圳是下流。
7.2 联邦交换机的原理
联邦交换机的原理图
7.3 联邦交换机的 实现
按照原理图实现:
第一步:先构建node2节点的downstream下流:
- 在节点node2,创建一个fed_exchange交换机。
- 创建node2_queue,绑定routeKey。
第二步:创建联邦策略。根据pattern正则匹配对应下流交换机。
第三步:在rabbitmq页面,创建一个上游。node1节点为上游,根据联邦策略判断谁为下游。
第四步:查看status,检查是否创建成功。
8. Federation queue 联邦队列
同样一个联邦队列可以连接一个或者多个上游队列。
原理图:
和联邦交换机一样,先创建一个联邦策略:
其他过程也同样。
9. Shovel(英文直译:铲子)
9.1 Shovel 准备
Shovel行为就像优秀的客户端应用程序能够负责连接源端和目的地、负责消息的读写及负责连接失败问题的处理。
其实Shovel的感觉和上下流差不多。
原理图:
- Q1是源端,Q2是目的地。
rabbitmq的开启插件:
(这样Web页面就有了Shovel Status ,Shovel Management的功能。)
9.1 Shovel 的实现
点击Shovel Management添加一个Shovel:
查看添加的Shovel状态:
这样就实现了一个Shovel的效果图。
以上是关于RabbitMQ 消息队列学习的主要内容,如果未能解决你的问题,请参考以下文章