RabbitMQ介绍
Posted love the future
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ介绍相关的知识,希望对你有一定的参考价值。
文章目录
- 1.什么是RabbitMQ?
- 2.为什么要使用Rabbit MQ
- 3.RabbitMQ工作原理
- 4.RabbitMQ的安装
- 5.Rabbit MQ的四大核心
- 6.RabbitMQ的工作模式
- 7.延迟队列
- 8.总结
1.什么是RabbitMQ?
1.1MQ的概念
MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法。应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需用专用连接来链接它们。消息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术。排队指的是应用程序通过 队列来通信。队列的使用除去了接收和发送应用程序同时执行的要求。
1.2MQ的分类
1.2.1.ActiveMQ
优点:单机吞吐量万级,时效性ms级,可用性高,基于主从架构实现高可用性,消息可靠性较低的概率丢失数据
缺点:官方社区现在对ActiveMQ 5.x维护越来越少,高吞吐量场景较少使用。
1.2.2.Kafka
大数据的杀手锏,谈到大数据领域内的消息传输,则绕不开Kafka,这款为大数据而生的消息中间件,以其百万级TPS的吞吐量名声大噪,迅速成为大数据领域的宠儿,在数据采集、传输、存储的过程中发挥着举足轻重的作用。目前已经被LinkedIn,Uber, Twitter, Netflix等大公司所采纳。
优点: 性能卓越,单机写入TPS约在百万条/秒,最大的优点,就是吞吐量高。时效性ms级可用性非常高,kafka是分布式的,一个数据多个副本,少数机器宕机,不会丢失数据,不会导致不可用,消费者采用Pull方式获取消息, 消息有序, 通过控制能够保证所有消息被消费且仅被消费一次;有优秀的第三方Kafka Web管理界面Kafka-Manager;在日志领域比较成熟,被多家公司和多个开源项目使用;功能支持:功能较为简单,主要支持简单的MQ功能,在大数据领域的实时计算以及日志采集被大规模使用
缺点:Kafka单机超过64个队列/分区,Load会发生明显的飙高现象,队列越多,load越高,发送消息响应时间变长,使用短轮询方式,实时性取决于轮询间隔时间,消费失败不支持重试;支持消息顺序,但是一台代理宕机后,就会产生消息乱序,社区更新较慢;
1.2.3.RocketMQ
RocketMQ出自阿里巴巴的开源产品,用Java语言实现,在设计时参考了 Kafka,并做出了自己的一些改进。被阿里巴巴广泛应用在订单,交易,充值,流计算,消息推送,日志流式处理,binglog分发等场景。
优点:单机吞吐量十万级,可用性非常高,分布式架构,消息可以做到0丢失,MQ功能较为完善,还是分布式的,扩展性好,支持10亿级别的消息堆积,不会因为堆积导致性能下降,源码是java我们可以自己阅读源码,定制自己公司的MQ
缺点:支持的客户端语言不多,目前是java及c++,其中c++不成熟;社区活跃度一般,没有在MQ核心中去实现JMS等接口,有些系统要迁移需要修改大量代码
1.2.4.RabbitMQ
RabbitMQ是使用Erlang语言开发的开源消息队列系统,基于AMQP协议来实现。AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、 安全。AMQP协议更多用在企业系统内,对数据一致性、稳定性和可靠性要求很高的场景,对性能和吞吐量的要求还在其次。
2.为什么要使用Rabbit MQ
2.1解耦
- 例如:在在线购物系统中,用户下单之后,需要通知库存系统,让库存系统进行相应的库存处理。即订单系统需要调用库存系统的接口
假如库存系统无法访问,则订单系统调用库存系统将失败,从而导致订单失败,即订单系统与库存系统耦合 - 引入Rabbit MQ之后
订单系统:用户下单后,订单系统完成持久化处理,将消息写入消息队列,返回用户订单下单成功
库存系统:订阅下单的消息,采用拉/推的方式,获取下单信息,库存系统根据下单信息,进行库存操作
假如:在下单时库存系统不能正常使用。也不影响正常下单,因为下单后,订单系统写入消息队列就不再关心其他的后续操作了。实现订单系统与库存系统的应用解耦
为了保证库存肯定有,可以将队列大小设置成库存数量,或者采用其他方式解决。基于消息的模型,关心的是“通知”,而非“处理”。
2.2异步:可以提升系统的效率
例如:用户注册后,需要发注册邮件和注册短信。传统的做法有两种 1.串行的方式;2.并行方式
(1)串行方式:
将注册信息写入数据库成功后,发送注册邮件,再发送注册短信。以上三个任务全部完成后,返回给客户端
(2)并行方式:
将注册信息写入数据库成功后,发送注册邮件的同时,发送注册短信。以上三个任务完成后,返回给客户端。与串行的差别是,并行的方式可以提高处理的时间
(3)引入消息队列
将不是必须的业务逻辑,异步处理。改造后的架构如下:通过引入消息队列,用户不用等待邮件和短信的发送就可以得到响应,发送邮件和短信的操作通过从消息队列中拿到消息进行异步处理
2.3 流量消峰
流量削锋也是消息队列中的常用场景,一般在秒杀或团抢活动中使用广泛应用场景:
A系统其他时间每秒请求量就100个,系统可以稳定运行。
系统每天晚间八点有秒杀活动,每秒并发请求量增至1万条,但是系统最大的处理能力只能每秒处理1000个请求,于是系统崩溃,服务器宕机。
之前架构:
大量用户(100万用户)通过浏览器在晚上八点高峰期同时参与秒杀活动。大量的请求涌入我们的系统中,高峰期达到每秒钟5000个请求,大量的用户请求mysql操作,每秒钟预计执行3000条SQL。
但是一般的MySQL每秒只能处理2000个请求,如果达到3000个请求的话可能MySQL直接就瘫痪了,从而系统无法被使用。
但是这种高峰期过了之后,就成了低峰期,可能也就1万用户访问系统,每秒的请求数量也就50个左右,整个系统几乎没有任何压力。
引入MQ:
100万用户在高峰期的时候,每秒请求有5000个请求左右,将这5000请求写入MQ里面,系统每秒最多只能处理2000请求,因为MySQL每秒只能处理2000个请求。系统从消息队列MQ中慢慢拉取请求,每秒就拉取2000个请求,不要超过自己每秒能处理的请求数量即可。MQ,每秒5000个请求进来,结果只有2000个请求出去,所以在秒杀期间(将近一小时)可能会有几十万或者几百万的请求积压在MQ中。这个短暂的高峰期积压是没问题的,因为高峰期过了之后,每秒就只有50个请求进入MQ了,但是系统还是按照每秒2000个请求的速度在处理,所以说,只要高峰期一过,系统就会快速将积压的消息消费掉。我们在此计算一下,每秒在MQ积压3000条消息,1分钟会积压18万,1小时积压1000万条消息,高峰期过后,1个多小时就可以将积压的1000万消息消费掉。
2.4 缺点
优点
优点就是以上的那些场景应用,就是在特殊场景下有其对应的好处,解耦、异步、削峰。
缺点
系统的可用性降低系统引入的外部依赖越多,系统越容易挂掉,本来只是A系统调用BCD三个系统接口就好,ABCD四个系统不报错整个系统会正常运行。引入了MQ之后,虽然ABCD系统没出错,但MQ挂了以后,整个系统也会崩溃。
系统的复杂性提高引入了MQ之后,需要考虑的问题也变得多了,如何保证消息没有重复消费?如何保证消息不丢失?怎么保证消息传递的顺序?一致性问题A系统发送完消息直接返回成功,但是BCD系统之中若有系统写库失败,则会产生数据不一致的问题。
3.RabbitMQ工作原理
Broker:
接收和分发消息的应用,RabbitMQ Server就是Message Broker
Virtual host:
出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类似于网络中的namespace概念。当多个不同的用户使用同一个RabbitMQ server提供的服务时,可以划分出多个vhost,每个用户在自己的 vhost 创建 exchange/queue 等
Connection:
publisher/consumer和broker之间的TCP连接
Channel:
如果每一次访问 RabbitMQ 都建立一个Connection,在消息量大的时候建立 TCP Connection的开销将是巨大的,效率也较低。Channel是在connection内部建立的逻辑连接,如果应用程序支持多线程,通常每个thread创建单独的channel进行通讯,AMQP method包含了channel id 帮助客户端和message broker 识别 channel,所以channel之间是完全隔离的。Channel作为轻量级的Connection极大减少了操作系统建立TCP connection的开销
Exchange:
message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到queue 中去。常用的类型有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast)
Queue:
消息最终被送到这里等待consumer取走
Binding:
exchange和queue之间的虚拟连接,binding中可以包含routing key,Binding信息被保存到exchange中的查询表中,用于message的分发依据
4.RabbitMQ的安装
为了发挥RabbitMQ的最大性能,安装在linux系统中,由于RabbitMQ是依赖于erlang的,所以需要先安装erlang。
官网下载地址:https://www.rabbitmq.com/download.html
4.1.文件上传
将下载的文件上传到/opt/rabbitMQ目录下(如果没有rabbitMQ需要自己创建)
erlang-21.3-1.el7.x86_64.rpm
rabbitmq-server-3.8.8-1.el7.noarch.rpm
4.2.安装文件
依次执行下面命令
rpm -ivh erlang-21.3-1.el7.x86_64.rpm
yum install socat -y
rpm -ivh rabbitmq-server-3.8.8-1.el7.noarch.rpm
4.3.启动Rabbit MQ服务
添加开机启动RabbitMQ服务
chkconfig rabbitmq-server on
启动服务
/sbin/service rabbitmq-server start
查看服务状态
/sbin/service rabbitmq-server status
下面表示已将启动
4.4.添加Web管理插件
在添加插件之前,需要将Rabbit MQ服务停止
停止服务(选择执行)
/sbin/service rabbitmq-server stop
开启web管理插件
rabbitmq-plugins enable rabbitmq_management
用默认账号密码(guest/guest)访问地址http://47.115.185.244:15672
根据访问地址进行访问,访问不了,需要设置访问权限才能进行访问。
4.5设置访问权限
创建账号
rabbitmqctl add_user admin 123
设置用户角色
rabbitmqctl set_user_tags admin administrator
设置用户权限
set_permissions [-p ]
rabbitmqctl set_permissions -p “/” admin “." ".” “.*”
用户user_admin具有/vhost1这个virtual host中所有资源的配置、写、读权限
查看当前用户和角色
rabbitmqctl list_users
4.6利用刚创建的用户进行登录即可
登录之前将RabbitMQ服务启动,要不然无法登录
5.Rabbit MQ的四大核心
生产者
产生数据发送消息的程序是生产者
交换机
交换机是RabbitMQ非常重要的一个部件,一方面它接收来自生产者的消息,另一方面它将消息推送到队列中。交换机必须确切知道如何处理它接收到的消息,是将这些消息推送到特定队列还是推送到多个队列,亦或者是把消息丢弃,这个得有交换机类型决定
队列
队列是RabbitMQ内部使用的一种数据结构,尽管消息流经RabbitMQ和应用程序,但它们只能存储在队列中。队列仅受主机的内存和磁盘限制的约束,本质上是一个大的消息缓冲区。许多生产者可以将消息发送到一个队列,许多消费者可以尝试从一个队列接收数据。这就是我们使用队列的方式
消费者
消费与接收具有相似的含义。消费者大多时候是一个等待接收消息的程序。请注意生产者,消费者和消息中间件很多时候并不在同一机器上。同一个应用程序既可以是生产者又是可以是消费者。
6.RabbitMQ的工作模式
6.1 简单模式
简单模式中生产者发送单个消息给消息队列,单个消费者对消息进行消费(处理)
即生产者发送一条消息,消费者接收一条消息进行消费。
在Java开发环境中创建项目,代码如下
pom.xml文件引入相应的依赖包:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.lsy</groupId>
<artifactId>rabbitMQ_hello</artifactId>
<version>1.0-SNAPSHOT</version>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<version>3.8.1</version>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
</plugins>
</build>
<dependencies> <!--rabbitmq依赖客户端-->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.8.0</version>
</dependency> <!--操作文件流的一个依赖-->
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.6</version>
</dependency>
</dependencies>
</project>
生产者代码:Producer.java
package com.lsy.rabbit.one;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 消息队列的生产者:发消息
*/
public class Producer
//队列名称
public static final String QUEUE_NAME="hello";
public static void main(String[] args) throws IOException, TimeoutException
ConnectionFactory factory=new ConnectionFactory();
factory.setHost("192.168.6.100");
factory.setUsername("lsy");
factory.setPassword("123");
//申请通道
Connection connection=factory.newConnection();
Channel channel=connection.createChannel();
/** * 生成一个队列 *
* 1.队列名称 *
* 2.队列里面的消息是否持久化 默认消息存储在内存中 *
* 3.该队列是否只供一个消费者进行消费 是否进行共享 true可以多个消费者消费 *
* 4.是否自动删除 最后一个消费者端开连接以后 该队列是否自动删除 true 自动删除 *
* 5.其他参数
* */
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
String message="Hello world";
/**
* * 发送一个消息
* * 1.发送到那个交换机
* * 2.路由的key是哪个
* * 3.其他的参数信息
* * 4.发送消息的消息体
* */
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
System.out.println("消息发送完成");
消费者代码: Consumer.java
package com.lsy.rabbit.one;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer
//队列名称,消费者和生产者的队列名字相同
public static final String QUEUE_NAME="hello";
public static void main(String[] args) throws IOException, TimeoutException
ConnectionFactory factory=new ConnectionFactory();
factory.setHost("192.168.6.100");
factory.setUsername("lsy");
factory.setPassword("123");
//建立连接,连接通道
Connection connection=factory.newConnection();
Channel channel=connection.createChannel();
System.out.println("等待接收消息....");
//推送的消息如何进行消费的接口回调
DeliverCallback deliverCallback=(consumerTag, delivery)->
String message= new String(delivery.getBody());
System.out.println(message);
;
//取消消费的一个回调接口 如在消费的时候队列被删除掉了
CancelCallback cancelCallback=(consumerTag)->
System.out.println("消息消费被中断");
;
//消费者接受消息
/**
** 消费者消费消息 *
* 1.消费哪个队列 *
* 2.消费成功之后是否要自动应答 true代表自动应答 false手动应答 *
* 3.消费者未成功消费的回调
* 4.消费者取消消费
*/
channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
6.2 工作模式
6.2.1轮询发布消息
工作队列(又称任务队列)的主要思想是避免立即执行资源密集型任务,而不得不等待它完成。相反我们安排任务在之后执行。我们把任务封装为消息并将其发送到队列。在后台运行的工作进程将弹出任务并最终执行作业。当有多个工作线程时,这些工作线程将一起处理这些任务。
即多个工作线程共同消费消息队列中的信息,避免有些消息需要处理的时间长,而其他的消息进行等待。并且消息队列通过轮询的方式分发消息。
消费者发送四条消息:A,B,C,D
工作线程1接收消息:A,C
工作线程2接收消息:B,D
实现代码如下:
抽取工具类
package com.lsy.rabbit.util;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/*
获取连接的工具类
*/
public class RabbitMQUtil
public static Channel getChannel() throws IOException, TimeoutException
ConnectionFactory factory=new ConnectionFactory();
factory.setHost("192.168.6.100");
factory.setUsername("lsy");
factory.setPassword("123");
//建立连接,连接通道
Connection connection=factory.newConnection();
Channel channel=connection.createChannel();
return channel;
启动两个工作线程
package com.lsy.rabbit.workQueue;
import com.lsy.rabbit.util.RabbitMQUtil;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/*
一个工作线程,相当于一个消费者
*/
public class Worker01
//队列名称
public static final String QUEUE_NAME="hello";
//队列--->工作线程
public static void main(String[] args) throws IOException, TimeoutException
Channel channel = RabbitMQUtil.getChannel();
//接收消息
//推送的消息如何进行消费的接口回调
DeliverCallback deliverCallback=(consumerTag, delivery)->
String message= new String("接收到的消息:"+new String(delivery.getBody()));
System.out.println(message);
;
//取消消费的一个回调接口 如在消费的时候队列被删除掉了
CancelCallback cancelCallback=(consumerTag)->
System.out.println(consumerTag+"消息消费被中断");
;
/**
** 消费者消费消息 *
* 1.消费哪个队列 *
* 2.消费成功之后是否要自动应答 true代表自动应答 false手动应答 *
* 3.消费者未成功消费的回调
* 4.消费者取消消费
*/
System.out.println("C2等待接收消息。。。");
channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
开启多线程的方法,可以通过两次执行同一份代码,来启动两个工作线程。
启动生产者
package com.lsy.rabbit.workQueue;
import com.lsy.rabbit.util.RabbitMQUtil;
import com.rabbitmq.client.Channel;
import java.io.IOException;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;
/*
生产者大量发送消息。
发送:
A :C1
B :C2
C :C1
D :C2
*/
public class Task01
//队列名称
public static final String QUEUE_NAME="hello";
public static void main(StringRabbitMQ延迟队列