RabbitMQ快速入门

Posted 汤键.

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ快速入门相关的知识,希望对你有一定的参考价值。

目录

MQ概述

优势

应用解耦

异步提速

削峰填谷

劣势

系统可用性降低

系统复杂度提高

一致性问题

思考

RabbitMQ

AMQP

RabbitMQ

RabbitMQ中的相关概念

Broker

Virtual host

Connection

Channel

Exchange

Queue

Binding

工作模式

JMS

快速入门

生产者Moudle

消费者Moudle

测试


  • MQ概述

  • MQ全称 Message Queue(消息队列),是在消息的传输过程中保存消息的容器
  • 多用于分布式系统之间进行通信
  • MQ,消息队列,存储消息的中间件
  • 分布式系统通信两种方式:直接远程调用和借助第三方完成间接通信
  • 发送方称为生产者,接收方称为消费者
  • 优势

  • 应用解耦

  • 如果我们的应用与应用直连的话,如果一方宕机,可能会导致整个系统不能使用
  • 使用MQ就可以使得应用间解耦,提升容错性和可维护性
  • 异步提速

  • 如下图所示

  • 当下一个订单之后,订单系统需要连接库存系统,支付系统以及物流系统
  • 而与每个系统交互的时间为300ms,在每得到一个响应时才能与下个系统进行交互
  • 最后还要与数据库交互20ms,那么总共就是920ms
  • 但是当使用MQ之后,当有用户下单时
  • 只需要将订单放到MQ中,然后其它系统异步去获取,不需要等待
  • 放一条消息为5ms,然后与数据库交互是20ms,那么用户感知的整个过程就只有25ms
  • 其它工作都会异步执行,效率提高了很多
  • 削峰填谷

  • 如下图所示

  • A系统每次只能处理最大1000个请求
  • 但是某一瞬间请求数量突然增多,达到了每秒5000个请求
  • 这时,如果用了MQ,A系统就不会直接接收到这5000个请求
  • 而是自己会去MQ中每秒拿1000个请求去处理,这就叫削峰填谷
  • 劣势

  • 系统可用性降低

  • 系统引入的外部依赖越多,系统稳定性越差
  • 一旦MQ宕机,就会对业务造成影响
  • 系统复杂度提高

  • MQ 的加入大大增加了系统的复杂度
  • 以前系统间是同步的远程调用,现在是通过MQ进行异步调用
  • 一致性问题

  • A系统处理完业务
  • 通过MQ给B、C、D三个系统发消息数据,如果B系统、C系统处理成功,D系统处理失败
  • 如何保证消息数据处理的一致性?
  • 思考

  • 既然MQ有优势也有劣势
  • 那么使用MQ需要满足什么条件呢?
  • 1-生产者不需要从消费者处获得反馈
  • 引入消息队列之前的直接调用,其接口的返回值应该为空
  • 这才让明明下层的动作还没做,上层却当成动作做完了继续往后走,即所谓异步成为了可能
  • 2-容许短暂的不一致性
  • 3-确实是用了有效果
  • 即解耦、提速、削峰这些方面的收益,超过加入MQ,管理MQ这些成本
  • RabbitMQ

  • AMQP

  • 即Advanced Message Queuing Protocol(高级消息队列协议)
  • 是一个网络协议,是应用层协议的一个开放标准,为面向消息的中间件设计
  • 基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制
  • 2006年,AMQP规范发布,类比HTTP
  • RabbitMQ

  • 2007年,Rabbit 技术公司基于 AMQP 标准开发的 RabbitMQ 1.0 发布
  • RabbitMQ 采用 Erlang 语言开发
  • Erlang 语言由 Ericson 设计,专门为开发高并发和分布式系统的一种语言,在电信领域使用广泛
  • RabbitMQ 基础架构如下图

  • RabbitMQ中的相关概念

  • Broker

  • 接收和分发消息的应用
  • RabbitMQ Server就是 Message Broker
  • Virtual host

  • 出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中
  • 类似于网络中的namespace概念
  • 当多个不同的用户使用同一个 RabbitMQ server 提供的服务时,可以划分出多个vhost
  • 每个用户在自己的 vhost 创建 exchange/queue 等
  • Connection

  • publisher/consumer 和 broker 之间的 TCP连接
  • Channel

  • 如果每一次访问 RabbitMQ 都建立一个 Connection,在消息量大的时候建立 TCP Connection的开销将是巨大的,效率也较低
  • Channel 是在 connection 内部建立的逻辑连接
  • 如果应用程序支持多线程,通常每个thread创建单独的 channel 进行通讯
  • AMQP method 包含了channel id 帮助客户端和message broker 识别 channel,所以 channel 之间是完全隔离的
  • Channel 作为轻量级的 Connection 极大减少了操作系统建立 TCP connection 的开销
  • Exchange

  • 交换机,message 到达 broker 的第一站
  • 根据分发规则,匹配查询表中的 routing key,分发消息到queue 中去
  • 常用的类型有:
  • direct (point-to-point)–路由定向
  • topic (publish-subscribe)–通配符
  • fanout (multicast)–广播
  • Queue

  • 消息最终被送到这里等待 consumer 取走
  • Binding

  • exchange 和 queue 之间的虚拟连接
  • binding 中可以包含 routing key
  • Binding 信息被保存到 exchange 中的查询表中,用于 message 的分发依据
  • 工作模式

  • RabbitMQ提供了6种工作模式:
  • 简单模式
  • work queues
  • Publish/Subscribe 发布与订阅模式
  • Routing路由模式
  • Topics主题模式
  • RPC远程调用模式(远程调用,不太算MQ;暂不作介绍)
  • JMS

  • JMS 即 Java消息服务(JavaMessage Service)应用程序接口
  • 是一个Java平台中关于面向消息中间件的API
  • JMS是JavaEE规范中的一种,类比JDBC
  • 很多消息中间件都实现了JMS规范
  • 例如:ActiveMQ;RabbitMQ官方没有提供JMS的实现包,但是开源社区有
  • 快速入门

  • 在下图的模型中,有以下概念:
    • P:生产者,也就是要发送消息的程序
    • C:消费者,消息的接收者,会一直等待消息到来
    • queue:消息队列,图中红色部分
    • 类似一个邮箱,可以缓存消息;
    • 生产者向其中投递消息,消费者从其中取出消息

  • 需求: 使用简单模式完成消息传递
  • 步骤:
    • (1)创建工程(生成者、消费者),分别添加依赖
    • (2)编写生产者发送消息
    • (3)编写消费者接收消息
  • 添加Virtual Hosts

  • 生产者Moudle

  • pom.xml

  • <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>org.example</groupId>
        <artifactId>RabbitMQ-producer</artifactId>
        <version>1.0-SNAPSHOT</version>
    
        <properties>
            <maven.compiler.source>8</maven.compiler.source>
            <maven.compiler.target>8</maven.compiler.target>
            <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        </properties>
    
        <dependencies>
            <!--rabbitmq java 客户端-->
            <dependency>
                <groupId>com.rabbitmq</groupId>
                <artifactId>amqp-client</artifactId>
                <version>5.6.0</version>
            </dependency>
        </dependencies>
    
    </project>
  • 生产者类

  • import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    
    /**
     * @author Tang
     * @version 2022.2
     * @code 2023/1/10 19:07
     */
    
    //发送消息
    public class Producer_hello 
        public static void main(String[] args) throws Exception
            //1.创建连接工厂
            ConnectionFactory factory = new ConnectionFactory();
            //2.设置参数
            factory.setHost("192.168.11.11"); //ip 默认值localhost
            factory.setPort(5672); //端口 默认值5672
            factory.setVirtualHost("/icpc"); //虚拟机 默认值/
            factory.setUsername("admin");//用户名 默认guest
            factory.setPassword("admin");//密码 默认值guest
            //3.创建连接 Connection
            Connection connection = factory.newConnection();
            //4.创建Channel
            Channel channel = connection.createChannel();
            //5.创建队列Queue
            /*
              【API说明】:
              queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
              参数:
              (1)String queue:队列名称。如果当前队列名称不存在,则会创建该队列,否则不会创建
              (2)boolean durable:是否持久化。即队列在服务器重启后是否还存在
              (3)boolean exclusive:是否独占。只能有一个消费者监听这队列;当Connection关闭时,是否删除队列
              (4)boolean autoDelete:是否自动删除。如果为true,至少有一个消费者连接到这个队列,之后所有与这个队列连接的消费者都断开时,队列会自动删除。
              (5)Map<String, Object> arguments:附带参数。队列的其他属性,例如:x-message-ttl、x-expires、x-max-length、x-maxlength-bytes、x-dead-letter-exchange、x-dead-letter-routing-key、x-max-priority
             */
            channel.queueDeclare("helloworld_queue",true,false,false,null);
            //6.发送消息
            /*
             * 【API说明】:
             * basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
             * 参数:
             * (1)String exchange:交换机名称。简单模式下交换机会使用默认的 ""
             * (2)String routingKey:路由名称
             * (3)BasicProperties props:配置信息
             * (4)byte[] body:发送消息数据
             */
            String body = "hello rabbitmq~~~";
            channel.basicPublish("","helloworld_queue",null,body.getBytes());
            //7.释放资源
            channel.close();
            connection.close();
        
    
  • 消费者Moudle

  • pom.xml

  • 消费者类

  • import com.rabbitmq.client.*;
    
    
    /**
     * @author Tang
     * @version 2022.2
     * @code 2023/1/10 19:12
     */
    public class Consumer_hello 
        public static void main(String[] args) throws Exception 
            //1.创建连接工厂
            ConnectionFactory factory = new ConnectionFactory();
            //2.设置参数
            factory.setHost("192.168.11.11"); //ip 默认值localhost
            factory.setPort(5672); //端口 默认值5672
            factory.setVirtualHost("/icpc"); //虚拟机 默认值/
            factory.setUsername("admin");//用户名 默认guest
            factory.setPassword("admin");//密码 默认值guest
            //3.创建连接 Connection
            Connection connection = factory.newConnection();
            //4.创建Channel
            Channel channel = connection.createChannel();
            //5.创建队列Queue
            /*
              【API说明】:
              queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
              参数:
              (1)String queue:队列名称。如果当前队列名称不存在,则会创建该队列,否则不会创建
              (2)boolean durable:是否持久化。即队列在服务器重启后是否还存在
              (3)boolean exclusive:是否独占。只能有一个消费者监听这队列;当Connection关闭时,是否删除队列
              (4)boolean autoDelete:是否自动删除。如果为true,至少有一个消费者连接到这个队列,之后所有与这个队列连接的消费者都断开时,队列会自动删除。
              (5)Map<String, Object> arguments:附带参数。队列的其他属性,例如:x-message-ttl、x-expires、x-max-length、x-maxlength-bytes、x-dead-letter-exchange、x-dead-letter-routing-key、x-max-priority
             */
            channel.queueDeclare("helloworld_queue",true,false,false,null);
            //6.接收信息
            Consumer consumer = new DefaultConsumer(channel)
                /*
                 * 回调方法,当收到消息后,会自动执行该方法
                 * void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                 * 参数:
                 * (1)String consumerTag:标识
                 * (2)Envelope envelope:获取一些信息,交换机,路由key...
                 * (3)AMQP.BasicProperties properties:配置信息
                 * (4)byte[] body:数据
                 */
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) 
                    System.out.println("consumerTag:" + consumerTag);
                    System.out.println("Exchange:" + envelope.getExchange());
                    System.out.println("RoutingKey:" + envelope.getRoutingKey());
                    System.out.println("properties:" + properties);
                    System.out.println("body:" + new String(body));
                
            ;
            /*
             * 【API说明】:
             * basicConsume(String queue, boolean autoAck, Consumer callback)
             * 参数:
             * (1)String queue:队列名称
             * (2)boolean autoAck:是否自动确认
             * (3)Consumer callback:回调对象
             */
            channel.basicConsume("helloworld_queue", true, consumer);
            //7.关闭资源(消费者需要监听生产者消息,这里不关闭)
        
    
  • 测试

  • (1)先启动消费者类,消费监听生产者消息
  • (2)再启动生产者类生产消息,消费者消费消息,即控制台输出:

以上是关于RabbitMQ快速入门的主要内容,如果未能解决你的问题,请参考以下文章

RabbitMQ 第二课 快速入门

RabbitMQ 第二课 快速入门

快速入门分布式消息队列之 RabbitMQ(中)

ASP.NET Core消息队列RabbitMQ基础入门实战演练

RabbitMQ 概念

RabbitMQ入门教程——发布/订阅