RabbitMQ介绍

Posted love the future

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ介绍相关的知识,希望对你有一定的参考价值。

文章目录

1.什么是RabbitMQ?

1.1MQ的概念

MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法。应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需用专用连接来链接它们。消息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术。排队指的是应用程序通过 队列来通信。队列的使用除去了接收和发送应用程序同时执行的要求。

1.2MQ的分类

1.2.1.ActiveMQ

优点:单机吞吐量万级,时效性ms级,可用性高,基于主从架构实现高可用性,消息可靠性较低的概率丢失数据
缺点:官方社区现在对ActiveMQ 5.x维护越来越少,高吞吐量场景较少使用。

1.2.2.Kafka

大数据的杀手锏,谈到大数据领域内的消息传输,则绕不开Kafka,这款为大数据而生的消息中间件,以其百万级TPS的吞吐量名声大噪,迅速成为大数据领域的宠儿,在数据采集、传输、存储的过程中发挥着举足轻重的作用。目前已经被LinkedIn,Uber, Twitter, Netflix等大公司所采纳。
优点: 性能卓越,单机写入TPS约在百万条/秒,最大的优点,就是吞吐量高。时效性ms级可用性非常高,kafka是分布式的,一个数据多个副本,少数机器宕机,不会丢失数据,不会导致不可用,消费者采用Pull方式获取消息, 消息有序, 通过控制能够保证所有消息被消费且仅被消费一次;有优秀的第三方Kafka Web管理界面Kafka-Manager;在日志领域比较成熟,被多家公司和多个开源项目使用;功能支持:功能较为简单,主要支持简单的MQ功能,在大数据领域的实时计算以及日志采集被大规模使用
缺点:Kafka单机超过64个队列/分区,Load会发生明显的飙高现象,队列越多,load越高,发送消息响应时间变长,使用短轮询方式,实时性取决于轮询间隔时间,消费失败不支持重试;支持消息顺序,但是一台代理宕机后,就会产生消息乱序,社区更新较慢;

1.2.3.RocketMQ

RocketMQ出自阿里巴巴的开源产品,用Java语言实现,在设计时参考了 Kafka,并做出了自己的一些改进。被阿里巴巴广泛应用在订单,交易,充值,流计算,消息推送,日志流式处理,binglog分发等场景。
优点:单机吞吐量十万级,可用性非常高,分布式架构,消息可以做到0丢失,MQ功能较为完善,还是分布式的,扩展性好,支持10亿级别的消息堆积,不会因为堆积导致性能下降,源码是java我们可以自己阅读源码,定制自己公司的MQ
缺点:支持的客户端语言不多,目前是java及c++,其中c++不成熟;社区活跃度一般,没有在MQ核心中去实现JMS等接口,有些系统要迁移需要修改大量代码

1.2.4.RabbitMQ

RabbitMQ是使用Erlang语言开发的开源消息队列系统,基于AMQP协议来实现。AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、 安全。AMQP协议更多用在企业系统内,对数据一致性、稳定性和可靠性要求很高的场景,对性能和吞吐量的要求还在其次。

2.为什么要使用Rabbit MQ

2.1解耦

  • 例如:在在线购物系统中,用户下单之后,需要通知库存系统,让库存系统进行相应的库存处理。即订单系统需要调用库存系统的接口

    假如库存系统无法访问,则订单系统调用库存系统将失败,从而导致订单失败,即订单系统与库存系统耦合
  • 引入Rabbit MQ之后

    订单系统:用户下单后,订单系统完成持久化处理,将消息写入消息队列,返回用户订单下单成功
    库存系统:订阅下单的消息,采用拉/推的方式,获取下单信息,库存系统根据下单信息,进行库存操作
    假如:在下单时库存系统不能正常使用。也不影响正常下单,因为下单后,订单系统写入消息队列就不再关心其他的后续操作了。实现订单系统与库存系统的应用解耦
    为了保证库存肯定有,可以将队列大小设置成库存数量,或者采用其他方式解决。基于消息的模型,关心的是“通知”,而非“处理”。

2.2异步:可以提升系统的效率

例如:用户注册后,需要发注册邮件和注册短信。传统的做法有两种 1.串行的方式;2.并行方式

(1)串行方式:

将注册信息写入数据库成功后,发送注册邮件,再发送注册短信。以上三个任务全部完成后,返回给客户端

(2)并行方式:

将注册信息写入数据库成功后,发送注册邮件的同时,发送注册短信。以上三个任务完成后,返回给客户端。与串行的差别是,并行的方式可以提高处理的时间

(3)引入消息队列

将不是必须的业务逻辑,异步处理。改造后的架构如下:通过引入消息队列,用户不用等待邮件和短信的发送就可以得到响应,发送邮件和短信的操作通过从消息队列中拿到消息进行异步处理

2.3 流量消峰

流量削锋也是消息队列中的常用场景,一般在秒杀或团抢活动中使用广泛应用场景:
A系统其他时间每秒请求量就100个,系统可以稳定运行。
系统每天晚间八点有秒杀活动,每秒并发请求量增至1万条,但是系统最大的处理能力只能每秒处理1000个请求,于是系统崩溃,服务器宕机。

之前架构:

大量用户(100万用户)通过浏览器在晚上八点高峰期同时参与秒杀活动。大量的请求涌入我们的系统中,高峰期达到每秒钟5000个请求,大量的用户请求mysql操作,每秒钟预计执行3000条SQL。
但是一般的MySQL每秒只能处理2000个请求,如果达到3000个请求的话可能MySQL直接就瘫痪了,从而系统无法被使用。
但是这种高峰期过了之后,就成了低峰期,可能也就1万用户访问系统,每秒的请求数量也就50个左右,整个系统几乎没有任何压力。

引入MQ:

100万用户在高峰期的时候,每秒请求有5000个请求左右,将这5000请求写入MQ里面,系统每秒最多只能处理2000请求,因为MySQL每秒只能处理2000个请求。系统从消息队列MQ中慢慢拉取请求,每秒就拉取2000个请求,不要超过自己每秒能处理的请求数量即可。MQ,每秒5000个请求进来,结果只有2000个请求出去,所以在秒杀期间(将近一小时)可能会有几十万或者几百万的请求积压在MQ中。这个短暂的高峰期积压是没问题的,因为高峰期过了之后,每秒就只有50个请求进入MQ了,但是系统还是按照每秒2000个请求的速度在处理,所以说,只要高峰期一过,系统就会快速将积压的消息消费掉。我们在此计算一下,每秒在MQ积压3000条消息,1分钟会积压18万,1小时积压1000万条消息,高峰期过后,1个多小时就可以将积压的1000万消息消费掉。

2.4 缺点

优点

优点就是以上的那些场景应用,就是在特殊场景下有其对应的好处,解耦、异步、削峰。

缺点

系统的可用性降低系统引入的外部依赖越多,系统越容易挂掉,本来只是A系统调用BCD三个系统接口就好,ABCD四个系统不报错整个系统会正常运行。引入了MQ之后,虽然ABCD系统没出错,但MQ挂了以后,整个系统也会崩溃。
系统的复杂性提高引入了MQ之后,需要考虑的问题也变得多了,如何保证消息没有重复消费?如何保证消息不丢失?怎么保证消息传递的顺序?一致性问题A系统发送完消息直接返回成功,但是BCD系统之中若有系统写库失败,则会产生数据不一致的问题。

3.RabbitMQ工作原理

Broker:

接收和分发消息的应用,RabbitMQ Server就是Message Broker

Virtual host:

出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类似于网络中的namespace概念。当多个不同的用户使用同一个RabbitMQ server提供的服务时,可以划分出多个vhost,每个用户在自己的 vhost 创建 exchange/queue 等

Connection:

publisher/consumer和broker之间的TCP连接

Channel:

如果每一次访问 RabbitMQ 都建立一个Connection,在消息量大的时候建立 TCP Connection的开销将是巨大的,效率也较低。Channel是在connection内部建立的逻辑连接,如果应用程序支持多线程,通常每个thread创建单独的channel进行通讯,AMQP method包含了channel id 帮助客户端和message broker 识别 channel,所以channel之间是完全隔离的。Channel作为轻量级的Connection极大减少了操作系统建立TCP connection的开销

Exchange:

message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到queue 中去。常用的类型有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast)

Queue:

消息最终被送到这里等待consumer取走

Binding:

exchange和queue之间的虚拟连接,binding中可以包含routing key,Binding信息被保存到exchange中的查询表中,用于message的分发依据

4.RabbitMQ的安装

为了发挥RabbitMQ的最大性能,安装在linux系统中,由于RabbitMQ是依赖于erlang的,所以需要先安装erlang。
官网下载地址:https://www.rabbitmq.com/download.html

4.1.文件上传

将下载的文件上传到/opt/rabbitMQ目录下(如果没有rabbitMQ需要自己创建)
erlang-21.3-1.el7.x86_64.rpm
rabbitmq-server-3.8.8-1.el7.noarch.rpm

4.2.安装文件

依次执行下面命令
rpm -ivh erlang-21.3-1.el7.x86_64.rpm
yum install socat -y
rpm -ivh rabbitmq-server-3.8.8-1.el7.noarch.rpm

4.3.启动Rabbit MQ服务

添加开机启动RabbitMQ服务
chkconfig rabbitmq-server on
启动服务
/sbin/service rabbitmq-server start
查看服务状态
/sbin/service rabbitmq-server status
下面表示已将启动

4.4.添加Web管理插件

在添加插件之前,需要将Rabbit MQ服务停止
停止服务(选择执行)
/sbin/service rabbitmq-server stop
开启web管理插件
rabbitmq-plugins enable rabbitmq_management
用默认账号密码(guest/guest)访问地址http://47.115.185.244:15672

根据访问地址进行访问,访问不了,需要设置访问权限才能进行访问。

4.5设置访问权限

创建账号
rabbitmqctl add_user admin 123
设置用户角色
rabbitmqctl set_user_tags admin administrator
设置用户权限
set_permissions [-p ]
rabbitmqctl set_permissions -p “/” admin “." ".” “.*”
用户user_admin具有/vhost1这个virtual host中所有资源的配置、写、读权限
查看当前用户和角色
rabbitmqctl list_users

4.6利用刚创建的用户进行登录即可

登录之前将RabbitMQ服务启动,要不然无法登录

5.Rabbit MQ的四大核心

生产者

产生数据发送消息的程序是生产者

交换机

交换机是RabbitMQ非常重要的一个部件,一方面它接收来自生产者的消息,另一方面它将消息推送到队列中。交换机必须确切知道如何处理它接收到的消息,是将这些消息推送到特定队列还是推送到多个队列,亦或者是把消息丢弃,这个得有交换机类型决定

队列

队列是RabbitMQ内部使用的一种数据结构,尽管消息流经RabbitMQ和应用程序,但它们只能存储在队列中。队列仅受主机的内存和磁盘限制的约束,本质上是一个大的消息缓冲区。许多生产者可以将消息发送到一个队列,许多消费者可以尝试从一个队列接收数据。这就是我们使用队列的方式

消费者

消费与接收具有相似的含义。消费者大多时候是一个等待接收消息的程序。请注意生产者,消费者和消息中间件很多时候并不在同一机器上。同一个应用程序既可以是生产者又是可以是消费者。

6.RabbitMQ的工作模式

6.1 简单模式

简单模式中生产者发送单个消息给消息队列,单个消费者对消息进行消费(处理)
即生产者发送一条消息,消费者接收一条消息进行消费。


在Java开发环境中创建项目,代码如下
pom.xml文件引入相应的依赖包:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.lsy</groupId>
    <artifactId>rabbitMQ_hello</artifactId>
    <version>1.0-SNAPSHOT</version>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <version>3.8.1</version>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>8</source>
                    <target>8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>

    <dependencies> <!--rabbitmq依赖客户端-->
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.8.0</version>
        </dependency> <!--操作文件流的一个依赖-->
        <dependency>
            <groupId>commons-io</groupId>
            <artifactId>commons-io</artifactId>
            <version>2.6</version>
        </dependency>
    </dependencies>

</project>

生产者代码:Producer.java

package com.lsy.rabbit.one;


import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 消息队列的生产者:发消息
 */
public class Producer 
    //队列名称
    public static final String QUEUE_NAME="hello";

    public static void main(String[] args) throws IOException, TimeoutException 
        ConnectionFactory factory=new ConnectionFactory();
        factory.setHost("192.168.6.100");
        factory.setUsername("lsy");
        factory.setPassword("123");
        //申请通道
        Connection connection=factory.newConnection();
        Channel channel=connection.createChannel();
        /** * 生成一个队列 *
         * 1.队列名称 *
         * 2.队列里面的消息是否持久化 默认消息存储在内存中 *
         * 3.该队列是否只供一个消费者进行消费 是否进行共享 true可以多个消费者消费 *
         * 4.是否自动删除 最后一个消费者端开连接以后 该队列是否自动删除 true 自动删除 *
         * 5.其他参数
         * */
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        String message="Hello world";
        /**
         * * 发送一个消息
         * * 1.发送到那个交换机
         * * 2.路由的key是哪个
         * * 3.其他的参数信息
         * * 4.发送消息的消息体
         * */

        channel.basicPublish("",QUEUE_NAME,null,message.getBytes());

        System.out.println("消息发送完成");
    



消费者代码: Consumer.java

package com.lsy.rabbit.one;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer 
    //队列名称,消费者和生产者的队列名字相同
    public static final String QUEUE_NAME="hello";
    public static void main(String[] args) throws IOException, TimeoutException 
        ConnectionFactory factory=new ConnectionFactory();
        factory.setHost("192.168.6.100");
        factory.setUsername("lsy");
        factory.setPassword("123");
        //建立连接,连接通道
        Connection connection=factory.newConnection();
        Channel channel=connection.createChannel();

        System.out.println("等待接收消息....");

        //推送的消息如何进行消费的接口回调
        DeliverCallback deliverCallback=(consumerTag, delivery)->
        String message= new String(delivery.getBody());
        System.out.println(message);
        ;

        //取消消费的一个回调接口 如在消费的时候队列被删除掉了
        CancelCallback cancelCallback=(consumerTag)->
           System.out.println("消息消费被中断");
        ;
        //消费者接受消息
        /**
         ** 消费者消费消息 *
         * 1.消费哪个队列 *
         * 2.消费成功之后是否要自动应答 true代表自动应答 false手动应答 *
         * 3.消费者未成功消费的回调
         * 4.消费者取消消费
         */
        channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);

    


6.2 工作模式

6.2.1轮询发布消息

工作队列(又称任务队列)的主要思想是避免立即执行资源密集型任务,而不得不等待它完成。相反我们安排任务在之后执行。我们把任务封装为消息并将其发送到队列。在后台运行的工作进程将弹出任务并最终执行作业。当有多个工作线程时,这些工作线程将一起处理这些任务。
即多个工作线程共同消费消息队列中的信息,避免有些消息需要处理的时间长,而其他的消息进行等待。并且消息队列通过轮询的方式分发消息。
消费者发送四条消息:A,B,C,D
工作线程1接收消息:A,C
工作线程2接收消息:B,D
实现代码如下:

抽取工具类

package com.lsy.rabbit.util;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/*
获取连接的工具类

 */
public class RabbitMQUtil 

    public static Channel getChannel() throws IOException, TimeoutException 
        ConnectionFactory factory=new ConnectionFactory();
        factory.setHost("192.168.6.100");
        factory.setUsername("lsy");
        factory.setPassword("123");
        //建立连接,连接通道
        Connection connection=factory.newConnection();
        Channel channel=connection.createChannel();
        return channel;
    




启动两个工作线程

package com.lsy.rabbit.workQueue;


import com.lsy.rabbit.util.RabbitMQUtil;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/*
一个工作线程,相当于一个消费者

 */
public class Worker01 
    //队列名称
    public static final String QUEUE_NAME="hello";
    //队列--->工作线程
    public static void main(String[] args) throws IOException, TimeoutException 
        Channel channel = RabbitMQUtil.getChannel();
        //接收消息

        //推送的消息如何进行消费的接口回调
        DeliverCallback deliverCallback=(consumerTag, delivery)->
            String message= new String("接收到的消息:"+new String(delivery.getBody()));
            System.out.println(message);
        ;

        //取消消费的一个回调接口 如在消费的时候队列被删除掉了
        CancelCallback cancelCallback=(consumerTag)->
            System.out.println(consumerTag+"消息消费被中断");
        ;
        /**
         ** 消费者消费消息 *
         * 1.消费哪个队列 *
         * 2.消费成功之后是否要自动应答 true代表自动应答 false手动应答 *
         * 3.消费者未成功消费的回调
         * 4.消费者取消消费
         */
        System.out.println("C2等待接收消息。。。");
        channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
    




开启多线程的方法,可以通过两次执行同一份代码,来启动两个工作线程。

启动生产者

package com.lsy.rabbit.workQueue;

import com.lsy.rabbit.util.RabbitMQUtil;
import com.rabbitmq.client.Channel;

import java.io.IOException;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;

/*
生产者大量发送消息。
发送:
A  :C1
B  :C2
C  :C1
D  :C2


 */
public class Task01 
    //队列名称
    public static final String QUEUE_NAME="hello";

    public static void main(StringRabbitMQ延迟队列

RabbitMQ一文带你搞定RabbitMQ延迟队列

RabbitMQ 死信队列DLX

RabbitMQ-死信队列

RabbitMQ——RabbitMQ的高级特性(TTL死信队列延迟队列优先级队列RPC)

RabbitMQ死信队列