一篇了解RabbitMQ

Posted haidi8

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了一篇了解RabbitMQ相关的知识,希望对你有一定的参考价值。

文章目录

消息中间件引言

1.消息队列概述

消息队列中间件是分布式系统中重要的组件,主要解决应用解耦异步消息流量削峰等问题,实现高性能,高可用,可伸缩和最终一致性架构。目前使用较多的消息队列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ等

message queue MQ

2.消息队列应用场景

以下介绍消息队列在实际应用中常用的使用场景。异步处理应用解耦流量削峰消息通讯四个场景。

2.1.异步处理

场景说明:用户注册后,需要发注册邮件和注册短信。

传统的做法有两种 1.串行的方式; 2.并行方式

a、串行方式:将注册信息写入数据库成功后,发送注册邮件,再发送注册短信。以上三个任务全部完成后,返回给客户端。

b、并行方式:将注册信息写入数据库成功后,发送注册邮件的同时,发送注册短信。以上三个任务完成后,返回给客户端。与串行的差别是,并行的方式可以提高处理的时间

假设三个业务节点每个使用50毫秒钟,不考虑网络等其他开销,则串行方式的时间是150毫秒,并行的时间可能是100毫秒。

因为CPU在单位时间内处理的请求数是一定的,假设CPU1秒内吞吐量是100次。则串行方式1秒内CPU可处理的请求量是7次(1000/150)。并行方式处理的请求量是10次(1000/100)

小结:如以上案例描述,传统的方式系统的性能(并发量,吞吐量,响应时间)会有瓶颈。如何解决这个问题呢?

引入消息队列,将不是必须的业务逻辑,异步处理。改造后的架构如下:

按照以上约定,用户的响应时间相当于是注册信息写入数据库的时间,也就是50毫秒。注册邮件,发送短信写入消息队列后,直接返回,因此写入消息队列的速度很快,基本可以忽略,因此用户的响应时间可能是50毫秒。因此架构改变后,系统的吞吐量提高到每秒20 QPS。比串行提高了3倍,比并行提高了两倍。

2.2.应用解耦

场景说明:用户下单后,订单系统需要通知库存系统。传统的做法是,订单系统调用库存系统的接口。如下图:

下单流程

​ 订单服务 新增订单

​ 库存服务 减库存

传统模式的缺点:假如库存系统无法访问,则订单减库存将失败,从而导致订单失败,订单系统与库存系统耦合

如何解决以上问题呢?引入应用消息队列后的方案,如下图:

订单系统:用户下单后,订单系统完成持久化处理,将消息写入消息队列,返回用户订单下单成功

库存系统:订阅下单的消息,采用拉/推的方式,获取下单信息,库存系统根据下单信息,进行库存操作

假如:在下单时库存系统不能正常使用。也不影响正常下单,因为下单后,订单系统写入消息队列就不再关心其他的后续操作了。实现订单系统与库存系统的应用解耦

2.3.流量削峰

流量削峰也是消息队列中的常用场景,一般在秒杀或团抢活动中使用广泛。

应用场景:秒杀活动,一般会因为流量过大,导致流量暴增,应用挂掉。为解决这个问题,一般需要在应用前端加入消息队列。
a、可以控制活动的人数
b、可以缓解短时间内高流量压垮应用

用户的请求,服务器接收后,首先写入消息队列。假如消息队列长度超过最大数量,则直接抛弃用户请求或跳转到错误页面。
秒杀业务根据消息队列中的请求信息,再做后续处理

2.4.日志处理

日志处理是指将消息队列用在日志处理中,比如Kafka的应用,解决大量日志传输的问题。架构简化如下

日志采集客户端,负责日志数据采集,定时写受写入Kafka队列
Kafka消息队列,负责日志数据的接收,存储和转发
日志处理应用:订阅并消费kafka队列中的日志数据

2.5.消息通讯

消息通讯是指,消息队列一般都内置了高效的通信机制,因此也可以用在纯的消息通讯。比如实现点对点消息队列,或者聊天室等

点对点通讯:

客户端A和客户端B使用同一队列,进行消息通讯。

聊天室通讯:

客户端A,客户端B,客户端N订阅同一主题,进行消息发布和接收。实现类似聊天室效果。

以上实际是消息队列的两种消息模式,点对点或发布订阅模式。模型为示意图,供参考。

3.消息中间件示例

3.1.电商系统

消息队列采用高可用,可持久化的消息中间件。比如Active MQ,Rabbit MQ,Rocket MQ。

(1)应用将主干逻辑处理完成后,写入消息队列。消息发送是否成功可以开启消息的确认模式。(消息队列返回消息接收成功状态后,应用再返回,这样保障消息的完整性)
(2)扩展流程(发短信,配送处理)订阅队列消息。采用推或拉的方式获取消息并处理。

(3)消息将应用解耦的同时,带来了数据一致性问题,可以采用最终一致性方式解决。比如主数据写入数据库,扩展应用根据消息队列,并结合数据库方式实现基于消息队列的后续处理。

3.2.日志收集系统


分为Zookeeper注册中心,日志收集客户端,Kafka集群和Storm集群(Other App)四部分组成。

Zookeeper注册中心,提出负载均衡和地址查找服务日志收集客户端,用于采集应用系统的日志,并将数据推送到kafka队列
Kafka集群:接收,路由,存储,转发等消息处理
Storm集群:与OtherApp处于同一级别,采用拉的方式消费队列中的数据

4.MQ选型对比文档

Kafka是linkedin开源的MQ系统,主要特点是基于Pull的模式来处理消息消费,追求高吞吐量,一开始的目的就是用于日志收集和传输,0.8开始支持复制,不支持事务,适合产生大量数据的互联网服务的数据收集业务

RabbitMQ是使用Erlang语言开发的开源消息队列系统,基于AMQP协议来实现。AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。AMQP协议更多用在企业系统内,对数据一致性、稳定性和可靠性要求很高的场景,对性能和吞吐量的要求还在其次。

RocketMQ是阿里开源的消息中间件,它是纯Java开发,具有高吞吐量、高可用性、适合大规模分布式系统应用的特点。RocketMQ思路起源于Kafka,但并不是Kafka的一个Copy,它对消息的可靠传输及事务性做了优化,目前在阿里集团被广泛应用于交易、充值、流计算、消息推送、日志流式处理、binglog分发等场景。

1、 RabbitMq比kafka成熟,在可用性上,稳定性上,可靠性上,RabbitMq超过kafka
2、 Kafka设计的初衷就是处理日志的,可以看做是一个日志系统,针对性很强,所以它并没有具备一个成熟MQ应该具备的特性
3、 但是Kafka的性能(吞吐量、tps)比RabbitMq要强,Kafka 自身服务和消费者都需要依赖 Zookeeper。

  1. RabbitMQ 在有大量消息堆积的情况下性能会下降,Kafka不会。毕竟AMQP设计的初衷不是用来持久化海量消息的,而Kafka一开始是用来处理海量日志的。

总的来说,RabbitMQ 和 Kafka 都是十分优秀的分布式的消息代理服务,只要合理部署,不作,基本上可以满足生产条件下的任何需求。

RabbitMQ基础

1.RabbitMQ初识

1.1.RabbitMQ简介

MQ全称为Message Queue,即消息队列, RabbitMQ是由erlang语言开发,基于AMQP(Advanced Message Queue 高级消息队列协议)协议实现的消息队列,它是一种应用程序之间的通信方法,消息队列在分布式系统开 发中应用非常广泛。

RabbitMQ官方地址:http://www.rabbitmq.com/

1.2.消息队列应用场景

开发中消息队列通常有如下应用场景:
1、任务异步处理。 将不需要同步处理的并且耗时长的操作由消息队列通知消息接收方进行异步处理。提高了应用程序的响应时间。
2、应用程序解耦合 MQ相当于一个中介,生产方通过MQ与消费方交互,它将应用程序进行解耦合。

1.3.市场上常见的消息队列产品

ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ、Redis(发布订阅模型)。

1.4.Why RabbitMQ?

1、使得简单,功能强大。 
2、基于AMQP协议。 
3、社区活跃,文档完善。 
4、高并发性能好,这主要得益于Erlang语言(高并发语言)。 
5、Spring Boot默认已集成RabbitMQ,SpringBoot把RabbitMQ自动整合!自动化配置!

   org.springframework.boot.autoconfigure.amqp.RabbitAutoConfiguration

2.相关概念

2.1.AMQP是什么

AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议(二进制应用层协议),是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件同产品,不同的开发语言等条件的限制。

总结:AMQP是一套公开的消息队列协议,最早在2003年被提出,它旨在从协议层定义消息通信数据的标准格式, 为的就是解决MQ市场上协议不统一的问题。RabbitMQ就是遵循AMQP标准协议开发的MQ服务

RabbitMQ 就是基于 AMQP 协议实现的。

官方:http://www.amqp.org/

3.RabbitMQ工作原理

下图是RabbitMQ的基本结构:

组成部分说明如下:
Broker:消息队列服务进程,此进程包括两个部分:Exchange和Queue。
Exchange:消息队列交换机,按一定的规则将消息路由转发到某个队列,对消息进行过虑
Queue:消息队列,存储消息的队列,消息到达队列并转发给指定的消费方。
Producer:消息生产者,即生产方客户端,生产方客户端将消息发送到MQ。
Consumer:消息消费者,即消费方客户端,接收MQ转发的消息。

消息发布流程:

1、生产者和Broker建立TCP连接。 
2、生产者和Broker建立通道。 
3、生产者通过通道消息发送给Broker,由Exchange将消息进行转发。 
4、Exchange将消息转发到指定的Queue(队列)

消息接收流程:

1、消费者和Broker建立TCP连接 
2、消费者和Broker建立通道 
3、消费者监听指定的Queue(队列) 
4、当有消息到达Queue时Broker默认将消息推送给消费者。 
5、消费者接收到消息。

4.RabbitMQ安装

4.1. 下载安装

RabbitMQ由Erlang语言开发,Erlang语言用于并发及分布式系统的开发,在电信领域应用广泛,OTP(Open Telecom Platform)作为Erlang语言的一部分,包含了很多基于Erlang开发的中间件及工具库,安装RabbitMQ需 要安装Erlang/OTP,并保持版本匹配,
如下图: RabbitMQ的下载地址:http://www.rabbitmq.com/download.html
本项目使用Erlang/OTP 20.3版本和RabbitMQ3.7.3版本。

1)下载erlang 地址如下: http://erlang.org/download/otp_win64_20.3.exe
以管理员方式运行此文件,安装。 erlang安装完成需要配置erlang环境变量: ERLANG_HOME=G:\\softDevelopment\\erlang\\erl10.1 在path中添 加%ERLANG_HOME%\\bin;
2)安装RabbitMQ https://github.com/rabbitmq/rabbitmq-server/releases/tag/v3.7.3
以管理员方式运行此文件,安装。

3.2. 启动

启动安装成功后会自动创建RabbitMQ服务并且启动。

1)从开始菜单启动RabbitMQ 完成在开始菜单找到RabbitMQ的菜单:

RabbitMQ Service-install :安装服务 
RabbitMQ Service-remove 删除服务 
RabbitMQ Service-start 启动 
RabbitMQ Service-stop 停止

2)如果没有开始菜单则进入安装目录下sbin目录手动启动:

3.3. 启动安装管理插件

安装rabbitMQ的管理插件,方便在浏览器端管理RabbitMQ 管理员身份运行 rabbitmq-plugins.bat enable rabbitmq_management
启动成功 登录RabbitMQ
进入浏览器,输入:http://localhost:15672
初始账号和密码:guest/guest

3.4. 注意事项

1、安装erlang和rabbitMQ以管理员身份运行
2、当卸载重新安装时会出现RabbitMQ服务注册失败,此时需要进入注册表清理erlang 搜索RabbitMQ、ErlSrv,将对应的项全部删除

4.Hello World

4.1.需求分析

按照官方教程(http://www.rabbitmq.com/getstarted.html)

测试hello world:

4.2.搭建环境

Java客户端:生产者和消费者
生产者和消费者都属于客户端,rabbitMQ的java客户端如下:

我们先用 rabbitMQ官方提供的java client测试,目的是对RabbitMQ的交互过程有个清晰的认识。 参考 :https://github.com/rabbitmq/rabbitmq-java-client/
创建maven工程 创建生产者工程和消费者工程,分别加入RabbitMQ java client的依赖。

 test-rabbitmq-producer:生产者工程 
 test-rabbitmq-consumer:消费者工程

项目中所需的依赖

 <dependency>
     <groupId>com.rabbitmq</groupId>
     <artifactId>amqp-client</artifactId>
     <version>3.6.5</version>
 </dependency>

4.3.生产者

在生产者工程下的test中创建测试类如下:

package com.bruceliu.producer001;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author bruceliu
 * @create 2019-10-23 11:34
 * @description
 */
public class Procducer001 

    //队列名称
    private static final String QUEUE = "helloworld";

    public static void main(String[] args) 

        Connection connection = null;
        Channel channel = null;
        try 
            ConnectionFactory factory = new ConnectionFactory();
            
            factory.setHost("localhost");
            factory.setPort(5672);
            factory.setUsername("guest");
            factory.setPassword("guest");

            //rabbitmq默认虚拟机名称为“/”,虚拟机相当于一个独立的mq服务器
            factory.setVirtualHost("/");

            // 创建与RabbitMQ服务的TCP连接
            connection = factory.newConnection();

            // /创建与Exchange的通道,每个连接可以创建多个通道,每个通道代表一个会话任务
            channel = connection.createChannel();

            /*** 声明队列,如果Rabbit中没有此队列将自动创建
             * param1:队列名称
             * param2:是否持久化
             * param3:队列是否独占此连接
             * param4:队列不再使用时是否自动删除此队列
             * param5:队列参数
             ***/
            channel.queueDeclare(QUEUE, true, false, false, null);
            
            
            String message = "helloworld---我是一条测试消息" + System.currentTimeMillis();

            /***
             * 消息发布方法
             * param1:Exchange的名称,如果没有指定,则使用Default Exchange
             * 这里没有指定交换机,消息将发送给默认交换机,每个队列也会绑定那个默认的交换机,但是不能显 示绑定或解除绑定
             * param2:routingKey,消息的路由Key,是用于Exchange(交换机)将消息转发到指定的消息队列
             * param3:消息包含的属性
             * param4:消息体
             */
            channel.basicPublish("", QUEUE, null, message.getBytes());
            System.out.println("Send Mes!!sage is:'" + message + "'");
         catch (Exception ex) 
            ex.printStackTrace();
         finally 
            try 
                if (channel != null) 
                    channel.close();
                
                if (connection != null) 
                    connection.close();
                
             catch (IOException e) 
                e.printStackTrace();
             catch (TimeoutException e) 
                e.printStackTrace();
             finally 
            
        

    


4.4.消费者

在消费者工程下的test中创建测试类如下:

package com.bruceliu.consumer001;

import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * @author bruceliu
 * @create 2019-10-23 11:45
 * @description
 */
public class Consumer001 

    private static final String QUEUE = "helloworld";

    public static void main(String[] args) throws Exception 

        ConnectionFactory factory = new ConnectionFactory();
        //设置MabbitMQ所在服务器的ip和端口
        factory.setHost("127.0.0.1");
        factory.setPort(5672);
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        //声明队列
        channel.queueDeclare(QUEUE, true, false, false, null);

        // 定义消费方法
        DefaultConsumer consumer = new DefaultConsumer(channel) 
            /*** 消费者接收消息调用此方法
             * @param consumerTag 消费者的标签,在channel.basicConsume()去指定
             * @param envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志 (收到消息失败后是否需要重新发送)
             * @param properties
             * @param body
             * @throws IOException*/
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException 
                //交换机
                String exchange = envelope.getExchange();
                // /路由key
                String routingKey = envelope.getRoutingKey();
                //消息id
                long deliveryTag = envelope.getDeliveryTag();
                //消息内容
                String msg = new String(body,"UTF-8");
                System.out.println("receive message.." + msg);
            
        ;
        /*** 监听队列
         * String queue, boolean autoAck,Consumer callback
         * 参数明细
         * 1、队列名称
         * 2、是否自动回复,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置 为false则需要手动回复
         * 3、消费消息的方法,消费者接收到消息后调用此方法
         */
        channel.basicConsume(QUEUE, true, consumer);

    


4.5.总结

发送端操作流程

1)创建连接 
2)创建通道 
3)声明队列 
4)发送消息 

接收端

1)创建连接 
2)创建通道 
3)声明队列 
4)监听队列 
5)接收消息
6)ack回复

docker部署RabbitMq

1、查询rabbitmq镜像

docker search rabbitmq:management

2、拉取rabbitmq镜像

docker pull rabbitmq:management

3、创建并启动容器

创建和启动(同时设置用户和密码)

docker run -d --hostname rabbit --name rabbitmq --restart always -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -v /etc/localtime:/etc/localtime:ro -v /usr/local/rabbitmq/data:/var/lib/rabbitmq -p 15672:15672 -p 5672:5672 -p 25672:25672 -p 61613:61613 -p 1883:1883 rabbitmq:management

其中:

15672:控制台端口号  Web插件访问端口
5672:应用访问端口号  程序访问端口

控制台端口用于管理rabbitmq,应用访问端口号为应用程序访问

4、查看rabbitmq运行状况

docker logs rabbit

5、访问

http://ip:15672

6、登录

默认账户名:guest
密码:guest

启动修改过账号: admin admin

RabbitMq常见工作模式

1.简介

最近,在看一些消息中间件的内容,之前都没有好好学习一下消息中间件。本文将对RabbitMQ中五种常用的工作模式做一个简单的介绍和总结。RabbitMQ常用的工作模式有:简单队列模式工作队列模式发布订阅模式路由模式主题模式。本文参照RabbitMQ官网示例总结,详细可以到官网查看:https://www.rabbitmq.com/getstarted.html

2.简单队列模式(Simple Queue)

【模型图】
只包含一个生产者以及一个消费者,生产者Producer将消息发送到队列中,消费者Consumer从该队列接收消息。(单生产单消费)

上图中,“P”是我们的生产者,“C”是我们的消费者。

【获取MQ连接对象工具类】

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class MQConnecitonUtils 

    private static final String RABBITMQ_HOST = "127.0.0.1";
    private static final Integer RABBITMQ_PORT = 5672;
    private static final String RABBITMQ_VHOST = "/";
    private static final String RABBITMQ_USERNAME = "guest";
    private static final String RABBITMQ_PASSWORD = "guest";

    public static Connection getConnection() 
        //定义MQ连接对象
        Connection connection = null;
        //创建MQ连接工厂对象
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 设置MQ主机名称
        connectionFactory.setHost(RABBITMQ_HOST);
        // 设置MQ AMQP端口号
        connectionFactory.setPort(RABBITMQ_PORT);
        // 设置MQ 连接的virtual host
        connectionFactory.setVirtualHost(RABBITMQ_VHOST);
        // 设置MQ 用户名称
        connectionFactory.setUsername(RABBITMQ_USERNAME);
        // 设置MQ 用户密码
        connectionFactory.setPassword(RABBITMQ_PASSWORD);
        try 
            connection = connectionFactory.newConnection();
         catch (Exception e) 
            e.printStackTrace();
        
        //返回连接对象
        return connection;
    以上是关于一篇了解RabbitMQ的主要内容,如果未能解决你的问题,请参考以下文章

RabbitMQ 死信队列

RabbitMQ实战-死信队列

RabbitMQ一文带你搞定RabbitMQ死信队列

Rabbitmq通过死信队列实现过期监听

SpringBoot+RabbitMQ 死信队列

一篇带您搞懂MQ延迟队列实战操作