了解MQ和安装使用RabbitMQ

Posted 肖帆咪

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了了解MQ和安装使用RabbitMQ相关的知识,希望对你有一定的参考价值。

什么是消息队列

本质是一个队列,队列中出存放的是跨进程的通信机制,用于上下游传递消息.

MQ是常见的上下游"逻辑解耦 + 物理解耦"的消息通信服务,在使用MQ之后,消息发送上只需要依赖MQ,不用依赖其他服务.

功能

1 流量削峰

👱‍♀举个例子

系统最多处理一万订单,在正常时段是没问题的,我们下单一秒就能返回结果.

但是在高峰期,如果有两万的下单操作,我们的系统无法处理,只能限制订单超过一万后不允许用户下单.

使用MQ做缓冲,我们可以取消这个限制,把一秒内下的订单分散成一段时间来处理,这时有些用户可能在下单十几秒后才能收到下单成功的消息

2 应用解耦

以电商应用为例,应用中有订单系统、库存系统、支付系统。用户创建订单后如果耦合调用库存系统和支付系统这些系统,任何一个子系统出了故障,都会造成下单操作异常。

使用MQ,将订单系统和其余系统完成解耦,不必担心其它系统出现故障,当转变成基于消息队列的方式后,系统间调用的问题会减少很多,比如库存系统发生故障,需要几分钟修复,在这几分钟,库存系统要处理的内存被缓存在消息队列中,用户的下单操作可以正常完成,当库存系统恢复后,继续处理订单信息即可,提高系统的可用性。

3 异步处理

有些服务间的调用并不是同步的,而是异步执行,例如,A调用B,B需要花费很长时间执行,此时A需要知道B什么时间可以执行完成,在未使用MQ时,一般会有两种方法实现,1.A不断地轮询查看B是否完成。2、就是A提供一个调用接口,当B执行完成之后,调用A的回调接口,以此实现。

MQ很好的解决这个问题

A调用B后,只需要监听B处理完成消息,当B处理完后,会发送一条消息给MQ,MQ会将此消息转发给A服务。这样就省去了A的轮询或者B对A的回调。A也能够即使得到异步处理消息。

MQ分类

AcitveMQ

优点: 单机吞吐量万级,时效性ms级,可用性高,基于主从架构实现高可用性,消息可靠性较低的概率丢失数据

缺点: 维护少,高吞吐量的场景较少使用

Kafka

使用场景

他的特点是基于pull的模式来处理消息消费,追求高吞吐量,一开始的目的就是用于日志收集和传输,适合产生大量数据的互联网服务的数据收集业务。

💛 优点: 性能卓越,单机写入TPS约在百万条/秒,吞吐量高,时效性高,Kafka是分布式的,一个数据多个副本,少数机器宕机,不会丢失数据,不会导致不可用,消费者采用pull方式获取消息,消息有序,通过控制能够保证所有消息被消费且仅被消费一次。

功能比较简单,主要支持简单的MQ功能,在大数据领域的实时计算以及日志采集被大规模使。

❌缺点: Kafka单机超过64个队列,Load会发生明显的标高现象,队列越多,load越高,发生消息响应时间变长,使用短轮询的方式,实时性取决于轮询间隔时间,消息失败不支持重试,支持消息顺序,但是一台代理宕机,会产生消息乱序

RocketMQ

使用场景:

为金融互联网领域而生,对于可靠性要求很高的场景,尤其是电商里面的订单扣款,以及业务削峰,在大量交易涌入后,后端可能无法及时处理的情况。

💛优点: 单机吞吐量十万级,而可用性高,分布式架构,消息可以做到0丢失。

MQ功能完善,还是分布式,支持10亿级别的消息堆积,不会因为堆积导致性能下降

❌缺点: 支持客户端语言不多,目前是java,没有在MQ黑犀牛中实现JMS等接口,有些系统要迁移需要修改大量代码。

RabbitMQ

是一个在AMQP(高级消息队列协议)基础上完成的,可复用的企业消息系统,是当前最主流的消息中间件之一。

适应场景

结合erlang语言本身的并发优势,性能好时效性微秒级,社区活跃度也比较高,管理界面用起来十分方便

💛 优点: 由于erlang语言的高并发特性,性能较好;吞吐量到万级,MQ功能比较完备,健壮、稳定、易用、跨平台、支持多种语言,支持AJAX文档齐全;开源提供的管理界面;更新频率相当高

RabbitMQ

如何使用

1 安装Erlang语言搭建运行环境

  1. Erlang下载(下载地址https://www.erlang.org/downloads)并安装
  2. 配置环境变量: 变量名:ERLANG_HOME 变量值:(自己的安装路径)
    然后在系Path变量名选中点击编辑,新建内容 %ERLANG_HOME%\\bin
  3. Win+R,输入erl查看版本

2 安装RabbitMQ

  1. RabbitMQ去官网下载https://www.rabbitmq.com/download.html 进入页面点击右侧菜单列表中Install: Windows选项,在下载页面找到Direct Downloads下载项选择下载
  2. RabbitMQ安装,安装步骤和Erlang一样一直下一步就可以,安装完成后RabbitMQ会在系统开始菜单中添加服务快捷键

3 启动服务

找到开始菜单中的RabbitMQ Service - start 如果没有点击展开就可以看到,如果提示没有此服务需要安装服务点击RabbitMQ Service - (re)install安装服务

4 开启web管理界面

  1. Win+R 输入cmd打开命令行 cd到RabbitMQ安装目录sbin目录下输入下面指令
    rabbitmq-plugins.bat enable rabbitmq_management
  2. 重启RabbitMQ服务 先停止服务 点击开始菜单中的 RabbitMQ Service - stop 停止完成后 再次启动 RabbitMQ Service - start
  3. 重启服务后 在浏览器中输入http://127.0.0.1:15672 进入web管理界面 默认账号密码 guest/guest

5 测试代码

导入依赖(SpringBoot的版本我使用的是2.6.6)

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

在application.yml中添加RabbitMQ的地址:

spring:
  rabbitmq:
    host: 192.168.18.130
    username: admin
    password: admin

编写Send消息生产者

public static void main(String[] args) 
    //定义连接工厂
    ConnectionFactory connectionFactory = new ConnectionFactory();
    //设置服务地址
    connectionFactory.setHost("localhost");
    try 
        //通过工厂获取连接
        Connection connection = connectionFactory.newConnection();
        //建立通道
        Channel channel = connection.createChannel();
        //声明队列
        channel.queueDeclare("hello", false, false, false, null);
        //消息
        String msg = "Hello World";
        channel.basicPublish("", "hello", null, msg.getBytes(StandardCharsets.UTF_8));
        System.out.println("生产者已发送" + msg);
     catch (IOException | TimeoutException e) 
        e.printStackTrace();
    

编写Recv消费者

public class Recv 

    private final static String QUEUE_NAME = "hello";

    public static void main(String[] args) throws Exception 
        //定义连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //设置服务地址
        connectionFactory.setHost("localhost");

        //通过工厂获取连接
        Connection connection = connectionFactory.newConnection();
        //建立通道
        Channel channel = connection.createChannel();
        //声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
        /*DefaultConsumer consumer = new DefaultConsumer(channel) 
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException 
                String msg = new String(body);
                System.out.println(" 消费者 接收到消息'" + msg + "'");
            
        ;
        channel.basicConsume(QUEUE_NAME,true,consumer);*/
        DeliverCallback deliverCallback = (consumerTag, deliver) -> 
            String message = new String(deliver.getBody(), "UTF-8");
            System.out.println(" 消费者1 接收到消息'" + message + "'");
        ;
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> 
        );
    

先启动消费者再启动生产者,web端会有变化,且消费者会受到生产者发来的消息


消息模型

创建一个连接工具类

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class MQConnectionUtil 

    public static Connection getConnection() throws IOException, TimeoutException 
        //定义连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //设置服务地址
        factory.setHost("127.0.0.1");
        //端口
        factory.setPort(5672);
        //设置账户信息
        factory.setUsername("guest");
        factory.setPassword("guest");
        //通过工厂获取连接
        Connection connection = factory.newConnection();
        return connection;
    

基本消息模型


生产者发送消息

public class send 

    private final static String QUEUE_NAME = "testQueue";

    public static void main(String[] args) 
        try 
            //通过工厂获取连接
            Connection connection = MQConnectionUtil.getConnection();
            //建立通道
            Channel channel = connection.createChannel();
            //声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            //消息
            String msg = "Hello World";
            channel.basicPublish("", QUEUE_NAME, null, msg.getBytes(StandardCharsets.UTF_8));
            System.out.println("生产者已发送" + msg);
         catch (IOException | TimeoutException e) 
            e.printStackTrace();
        
    

消费者消费消息

public class Recv 

    private final static String QUEUE_NAME = "testQueue";

    public static void main(String[] args) throws Exception 
        //与mq服务建立连接
        Connection connection = MQConnectionUtil.getConnection();
        //建立通道
        Channel channel = connection.createChannel();
        //声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
        DefaultConsumer consumer = new DefaultConsumer(channel) 
            //获取消息,并且处理,有消息时会自动调用
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException 
                //body就是消息体
                String msg = new String(body);
                System.out.println(" 消费者 接收到消息'" + msg + "'");
            
        ;
        channel.basicConsume(QUEUE_NAME, true, consumer);
    

消息的接收与消费使用都需要在一个匿名内部类DefaultConsumer中完成.

注意: 队列需要提前声明,如果未声明就是用队列,则会报错.

生产者和消费者都声明队列,队列的创建会保证幂等性,两个都声明同一个队列,就只会创建一个队列.

WorkQueues 工作队列模型

在基本消息模型中,一个生产者对应一个消费者,而实际生产过程中,往往消息生产会发送很多条消息,如果消费者只有一个的话效率就会很低,因此rabbitmq有另外一种消息模型,这种模型下,一个生产发送消息到队列,允许有多个消费者接收消息,但是一条消息只会被一个消费者获取。

生产者生产20条消息

public class send 

    private final static String QUEUE_NAME = "testQueue";

    public static void main(String[] args) 
        try 
            //通过工厂获取连接
            Connection connection = MQConnectionUtil.getConnection();
            //建立通道
            Channel channel = connection.createChannel();
            //声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            for (int i = 0; i < 20; i++) 
                //消息
                String msg = "task.." + i;
                channel.basicPublish("", QUEUE_NAME, null, msg.getBytes(StandardCharsets.UTF_8));
                System.out.println("生产者已发送" + msg);
                Thread.sleep(500);
            
            channel.close();
            connection.close();
         catch (IOException | TimeoutException | InterruptedException e) 
            e.printStackTrace();
        
    

消费者1和消费者2

public class Recv1 

    private final static String QUEUE_NAME = "testQueue";

    public static void main(String[] args) throws Exception 
        //与mq服务建立连接
        Connection connection = MQConnectionUtil.getConnection();
        //建立通道
        Channel channel = connection.createChannel();
        //声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
        DefaultConsumer consumer = new DefaultConsumer(channel) 
            //获取消息,并且处理,有消息时会自动调用
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException 
                //body就是消息体
                String msg = new String(body);
                System.out.println(" 消费者1 接收到消息'" + msg + "'");
                try 
                    Thread.sleep(50);
                 catch (InterruptedException e) 
                    e.printStackTrace();
                
            
        ;
        channel.basicConsume(QUEUE_NAME, true, consumer);
    


public class Recv2 

    private final static String QUEUE_NAME = "testQueue";

    public static void main(String[] args) throws Exception 
        //与mq服务建立连接
        Connection connection = MQConnectionUtil.getConnection();
        //建立通道
        Channel channel = connection.createChannel();
        //声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
        DefaultConsumer consumer = new DefaultConsumer(channel) 
            //获取消息,并且处理,有消息时会自动调用
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException 
                //body就是消息体
                String msg = new String(body);
                System.out.println(" 消费者2 接收到消息'" + msg + "'");
                try 
                    Thread.sleep(50);
                 catch (InterruptedException e) 
                    e.printStackTrace();
                
            
        ;
        channel.basicConsume(QUEUE_NAME, true, consumer);
    

订阅模式

订阅模式中,可以实现一条消息被多个消费者获取。在这种模型下,消息传递过程中比之前多了一个exchange交换机,生产者不是直接发送消息到队列,而是先发送给交换机,经由交换机分配到不同的队列,而每个消费者都有自己的队列:

交换机的类型

  1. Fanout:广播模式,交换机将消息发送到所有与之绑定的队列中去
  2. Direct:定向,交换机按照指定的Routing Key发送到匹配的队列中去
  3. Topics: 通配符,与定向大致相同,不同在于Routing Key可以根据通配符进行匹配

广播模式Fanout

  1. 多个消费者,独立队列
  2. 需要与exchange绑定
  3. 生产者发消息给exchange
  4. exchange将消息发送到所有绑定的队列中
  5. 消费者从各自的队列获取消息

生产者

public class Send 

    private static final String EXCHANGE_NAME = "fanout_exchange";

    public static void main(String[] args) 
        try 
            Connection connection = MQConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
            String msg = "广播模式1";
            channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes());
            System.out.println("广播发送消息:" + msg);
            channel.close();
            connection.close();
         catch (IOException | TimeoutException e) 
            e.printStackTrace();
        
    

消费者

public class Consumer1 

    //独立队列
    private static final String QUEUE_NAME = "fanout_queue_1";
    private static final String EXCHANGE_NAME = "fanout_exchange";

    public static void main(String[] args) 
        try 
            Connection connection = MQConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
            //消费者1声明自己的队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            //声明Exchange,类型为fanout
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
            //消费者将队列与交换机绑定
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
            channel.basicConsume(QUEUE_NAME,true, new DefaultConsumer(channel)
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException 
                    String msg = new String(body);
                    System.out.println("消费者1获得消息:" + msg);
                
            );

         catch (IOException | TimeoutException e) 
            e.printStackTrace();
        
    

其他消费者只需修改QUEUE_NAME即可

注意:exchange与队列一样都需要提前声明,如果未声明就使用交换机,则会报错。如果不清楚生产者和消费者谁先声明,为了保证不报错,生产者和消费者都声明交换机,同样的,交换机的创建也会保证幂等性。

定向模式Direct

路由模式,可以实现不同的消息被不同队列消费,在direct中,交换机不在将消息发送给所有绑定的队列,而是根据Routing Key将消息发送给指定的队列,队列在与交换机绑定时会设定一个Routing Key,而生产者发送的消息也需要携带一个Routing Key.

消费者C1的队列与交换机绑定时设置的Routing Key是“error”, 而C2的队列与交换机绑定时设置的Routing Key包括三个:“info”,“error”,“warning”,假如生产者发送一条消息到交换机,并设置消息的Routing Key为“info”,那么交换机只会将消息发送给C2的队列。

生产者

public class Send 

    private static final String EXCHANGE_NAME = "direct_exchange";

    public static void main(String[] args) 
        try

以上是关于了解MQ和安装使用RabbitMQ的主要内容,如果未能解决你的问题,请参考以下文章

了解MQ和安装使用RabbitMQ

RabbitMQ详解------简介与安装(Docker)

RabbitMQ安装

RabbitMQ 服务异步通信 -- 初识MQ(同步通信和异步通信MQ几种常见MQ的对比)RabbitMQ安装和介绍

java架构之路-(MQ专题)RabbitMQ安装和基本使用

Node下RabbitMQ的使用