了解MQ和安装使用RabbitMQ
Posted 肖帆咪
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了了解MQ和安装使用RabbitMQ相关的知识,希望对你有一定的参考价值。
什么是消息队列
本质是一个队列,队列中出存放的是跨进程的通信机制,用于上下游传递消息.
MQ是常见的上下游"逻辑解耦 + 物理解耦"的消息通信服务,在使用MQ之后,消息发送上只需要依赖MQ,不用依赖其他服务.
功能
1 流量削峰
👱♀举个例子
系统最多处理一万订单,在正常时段是没问题的,我们下单一秒就能返回结果.
但是在高峰期,如果有两万的下单操作,我们的系统无法处理,只能限制订单超过一万后不允许用户下单.
使用MQ做缓冲,我们可以取消这个限制,把一秒内下的订单分散成一段时间来处理,这时有些用户可能在下单十几秒后才能收到下单成功的消息
2 应用解耦
以电商应用为例,应用中有订单系统、库存系统、支付系统。用户创建订单后如果耦合调用库存系统和支付系统这些系统,任何一个子系统出了故障,都会造成下单操作异常。
使用MQ,将订单系统和其余系统完成解耦,不必担心其它系统出现故障,当转变成基于消息队列的方式后,系统间调用的问题会减少很多,比如库存系统发生故障,需要几分钟修复,在这几分钟,库存系统要处理的内存被缓存在消息队列中,用户的下单操作可以正常完成,当库存系统恢复后,继续处理订单信息即可,提高系统的可用性。
3 异步处理
有些服务间的调用并不是同步的,而是异步执行,例如,A调用B,B需要花费很长时间执行,此时A需要知道B什么时间可以执行完成,在未使用MQ时,一般会有两种方法实现,1.A不断地轮询查看B是否完成。2、就是A提供一个调用接口,当B执行完成之后,调用A的回调接口,以此实现。
MQ很好的解决这个问题
A调用B后,只需要监听B处理完成消息,当B处理完后,会发送一条消息给MQ,MQ会将此消息转发给A服务。这样就省去了A的轮询或者B对A的回调。A也能够即使得到异步处理消息。
MQ分类
AcitveMQ
优点: 单机吞吐量万级,时效性ms级,可用性高,基于主从架构实现高可用性,消息可靠性较低的概率丢失数据
缺点: 维护少,高吞吐量的场景较少使用
Kafka
使用场景
他的特点是基于pull的模式来处理消息消费,追求高吞吐量,一开始的目的就是用于日志收集和传输,适合产生大量数据的互联网服务的数据收集业务。
💛 优点: 性能卓越,单机写入TPS约在百万条/秒,吞吐量高,时效性高,Kafka是分布式的,一个数据多个副本,少数机器宕机,不会丢失数据,不会导致不可用,消费者采用pull方式获取消息,消息有序,通过控制能够保证所有消息被消费且仅被消费一次。
功能比较简单,主要支持简单的MQ功能,在大数据领域的实时计算以及日志采集被大规模使。
❌缺点: Kafka单机超过64个队列,Load会发生明显的标高现象,队列越多,load越高,发生消息响应时间变长,使用短轮询的方式,实时性取决于轮询间隔时间,消息失败不支持重试,支持消息顺序,但是一台代理宕机,会产生消息乱序
RocketMQ
使用场景:
为金融互联网领域而生,对于可靠性要求很高的场景,尤其是电商里面的订单扣款,以及业务削峰,在大量交易涌入后,后端可能无法及时处理的情况。
💛优点: 单机吞吐量十万级,而可用性高,分布式架构,消息可以做到0丢失。
MQ功能完善,还是分布式,支持10亿级别的消息堆积,不会因为堆积导致性能下降
❌缺点: 支持客户端语言不多,目前是java,没有在MQ黑犀牛中实现JMS等接口,有些系统要迁移需要修改大量代码。
RabbitMQ
是一个在AMQP(高级消息队列协议)基础上完成的,可复用的企业消息系统,是当前最主流的消息中间件之一。
适应场景
结合erlang语言本身的并发优势,性能好时效性微秒级,社区活跃度也比较高,管理界面用起来十分方便
💛 优点: 由于erlang语言的高并发特性,性能较好;吞吐量到万级,MQ功能比较完备,健壮、稳定、易用、跨平台、支持多种语言,支持AJAX文档齐全;开源提供的管理界面;更新频率相当高
RabbitMQ
如何使用
1 安装Erlang语言搭建运行环境
- Erlang下载(下载地址https://www.erlang.org/downloads)并安装
- 配置环境变量: 变量名:ERLANG_HOME 变量值:(自己的安装路径)
然后在系Path变量名选中点击编辑,新建内容 %ERLANG_HOME%\\bin - Win+R,输入erl查看版本
2 安装RabbitMQ
- RabbitMQ去官网下载https://www.rabbitmq.com/download.html 进入页面点击右侧菜单列表中Install: Windows选项,在下载页面找到Direct Downloads下载项选择下载
- RabbitMQ安装,安装步骤和Erlang一样一直下一步就可以,安装完成后RabbitMQ会在系统开始菜单中添加服务快捷键
3 启动服务
找到开始菜单中的RabbitMQ Service - start 如果没有点击展开就可以看到,如果提示没有此服务需要安装服务点击RabbitMQ Service - (re)install安装服务
4 开启web管理界面
- Win+R 输入cmd打开命令行 cd到RabbitMQ安装目录sbin目录下输入下面指令
rabbitmq-plugins.bat enable rabbitmq_management - 重启RabbitMQ服务 先停止服务 点击开始菜单中的 RabbitMQ Service - stop 停止完成后 再次启动 RabbitMQ Service - start
- 重启服务后 在浏览器中输入http://127.0.0.1:15672 进入web管理界面 默认账号密码 guest/guest
5 测试代码
导入依赖(SpringBoot的版本我使用的是2.6.6)
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
在application.yml中添加RabbitMQ的地址:
spring:
rabbitmq:
host: 192.168.18.130
username: admin
password: admin
编写Send消息生产者
public static void main(String[] args)
//定义连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置服务地址
connectionFactory.setHost("localhost");
try
//通过工厂获取连接
Connection connection = connectionFactory.newConnection();
//建立通道
Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare("hello", false, false, false, null);
//消息
String msg = "Hello World";
channel.basicPublish("", "hello", null, msg.getBytes(StandardCharsets.UTF_8));
System.out.println("生产者已发送" + msg);
catch (IOException | TimeoutException e)
e.printStackTrace();
编写Recv消费者
public class Recv
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception
//定义连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置服务地址
connectionFactory.setHost("localhost");
//通过工厂获取连接
Connection connection = connectionFactory.newConnection();
//建立通道
Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
/*DefaultConsumer consumer = new DefaultConsumer(channel)
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException
String msg = new String(body);
System.out.println(" 消费者 接收到消息'" + msg + "'");
;
channel.basicConsume(QUEUE_NAME,true,consumer);*/
DeliverCallback deliverCallback = (consumerTag, deliver) ->
String message = new String(deliver.getBody(), "UTF-8");
System.out.println(" 消费者1 接收到消息'" + message + "'");
;
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag ->
);
先启动消费者再启动生产者,web端会有变化,且消费者会受到生产者发来的消息
消息模型
创建一个连接工具类
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class MQConnectionUtil
public static Connection getConnection() throws IOException, TimeoutException
//定义连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置服务地址
factory.setHost("127.0.0.1");
//端口
factory.setPort(5672);
//设置账户信息
factory.setUsername("guest");
factory.setPassword("guest");
//通过工厂获取连接
Connection connection = factory.newConnection();
return connection;
基本消息模型
生产者发送消息
public class send
private final static String QUEUE_NAME = "testQueue";
public static void main(String[] args)
try
//通过工厂获取连接
Connection connection = MQConnectionUtil.getConnection();
//建立通道
Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//消息
String msg = "Hello World";
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes(StandardCharsets.UTF_8));
System.out.println("生产者已发送" + msg);
catch (IOException | TimeoutException e)
e.printStackTrace();
消费者消费消息
public class Recv
private final static String QUEUE_NAME = "testQueue";
public static void main(String[] args) throws Exception
//与mq服务建立连接
Connection connection = MQConnectionUtil.getConnection();
//建立通道
Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DefaultConsumer consumer = new DefaultConsumer(channel)
//获取消息,并且处理,有消息时会自动调用
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException
//body就是消息体
String msg = new String(body);
System.out.println(" 消费者 接收到消息'" + msg + "'");
;
channel.basicConsume(QUEUE_NAME, true, consumer);
消息的接收与消费使用都需要在一个匿名内部类DefaultConsumer中完成.
注意: 队列需要提前声明,如果未声明就是用队列,则会报错.
生产者和消费者都声明队列,队列的创建会保证幂等性,两个都声明同一个队列,就只会创建一个队列.
WorkQueues 工作队列模型
在基本消息模型中,一个生产者对应一个消费者,而实际生产过程中,往往消息生产会发送很多条消息,如果消费者只有一个的话效率就会很低,因此rabbitmq有另外一种消息模型,这种模型下,一个生产发送消息到队列,允许有多个消费者接收消息,但是一条消息只会被一个消费者获取。
生产者生产20条消息
public class send
private final static String QUEUE_NAME = "testQueue";
public static void main(String[] args)
try
//通过工厂获取连接
Connection connection = MQConnectionUtil.getConnection();
//建立通道
Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
for (int i = 0; i < 20; i++)
//消息
String msg = "task.." + i;
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes(StandardCharsets.UTF_8));
System.out.println("生产者已发送" + msg);
Thread.sleep(500);
channel.close();
connection.close();
catch (IOException | TimeoutException | InterruptedException e)
e.printStackTrace();
消费者1和消费者2
public class Recv1
private final static String QUEUE_NAME = "testQueue";
public static void main(String[] args) throws Exception
//与mq服务建立连接
Connection connection = MQConnectionUtil.getConnection();
//建立通道
Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DefaultConsumer consumer = new DefaultConsumer(channel)
//获取消息,并且处理,有消息时会自动调用
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException
//body就是消息体
String msg = new String(body);
System.out.println(" 消费者1 接收到消息'" + msg + "'");
try
Thread.sleep(50);
catch (InterruptedException e)
e.printStackTrace();
;
channel.basicConsume(QUEUE_NAME, true, consumer);
public class Recv2
private final static String QUEUE_NAME = "testQueue";
public static void main(String[] args) throws Exception
//与mq服务建立连接
Connection connection = MQConnectionUtil.getConnection();
//建立通道
Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DefaultConsumer consumer = new DefaultConsumer(channel)
//获取消息,并且处理,有消息时会自动调用
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException
//body就是消息体
String msg = new String(body);
System.out.println(" 消费者2 接收到消息'" + msg + "'");
try
Thread.sleep(50);
catch (InterruptedException e)
e.printStackTrace();
;
channel.basicConsume(QUEUE_NAME, true, consumer);
订阅模式
订阅模式中,可以实现一条消息被多个消费者获取。在这种模型下,消息传递过程中比之前多了一个exchange交换机,生产者不是直接发送消息到队列,而是先发送给交换机,经由交换机分配到不同的队列,而每个消费者都有自己的队列:
交换机的类型
- Fanout:广播模式,交换机将消息发送到所有与之绑定的队列中去
- Direct:定向,交换机按照指定的Routing Key发送到匹配的队列中去
- Topics: 通配符,与定向大致相同,不同在于Routing Key可以根据通配符进行匹配
广播模式Fanout
- 多个消费者,独立队列
- 需要与exchange绑定
- 生产者发消息给exchange
- exchange将消息发送到所有绑定的队列中
- 消费者从各自的队列获取消息
生产者
public class Send
private static final String EXCHANGE_NAME = "fanout_exchange";
public static void main(String[] args)
try
Connection connection = MQConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
String msg = "广播模式1";
channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes());
System.out.println("广播发送消息:" + msg);
channel.close();
connection.close();
catch (IOException | TimeoutException e)
e.printStackTrace();
消费者
public class Consumer1
//独立队列
private static final String QUEUE_NAME = "fanout_queue_1";
private static final String EXCHANGE_NAME = "fanout_exchange";
public static void main(String[] args)
try
Connection connection = MQConnectionUtil.getConnection();
Channel channel = connection.createChannel();
//消费者1声明自己的队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//声明Exchange,类型为fanout
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
//消费者将队列与交换机绑定
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
channel.basicConsume(QUEUE_NAME,true, new DefaultConsumer(channel)
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException
String msg = new String(body);
System.out.println("消费者1获得消息:" + msg);
);
catch (IOException | TimeoutException e)
e.printStackTrace();
其他消费者只需修改QUEUE_NAME即可
注意:exchange与队列一样都需要提前声明,如果未声明就使用交换机,则会报错。如果不清楚生产者和消费者谁先声明,为了保证不报错,生产者和消费者都声明交换机,同样的,交换机的创建也会保证幂等性。
定向模式Direct
路由模式,可以实现不同的消息被不同队列消费,在direct中,交换机不在将消息发送给所有绑定的队列,而是根据Routing Key将消息发送给指定的队列,队列在与交换机绑定时会设定一个Routing Key,而生产者发送的消息也需要携带一个Routing Key.
消费者C1的队列与交换机绑定时设置的Routing Key是“error”, 而C2的队列与交换机绑定时设置的Routing Key包括三个:“info”,“error”,“warning”,假如生产者发送一条消息到交换机,并设置消息的Routing Key为“info”,那么交换机只会将消息发送给C2的队列。
生产者
public class Send
private static final String EXCHANGE_NAME = "direct_exchange";
public static void main(String[] args)
try以上是关于了解MQ和安装使用RabbitMQ的主要内容,如果未能解决你的问题,请参考以下文章
RabbitMQ 服务异步通信 -- 初识MQ(同步通信和异步通信MQ几种常见MQ的对比)RabbitMQ安装和介绍