RabbitMQ 消息中间件
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ 消息中间件相关的知识,希望对你有一定的参考价值。
1、消息中间件1、简介
消息中间件也可以称消息队列,是指用高效可靠的消息传递机制进行与平台无关的数据交流,并基于数据通信来进行分布式系统的集成。通过提供消息传递和消息队列模型,可以在分布式环境下扩展进程的通信。
当下主流的消息中间件有RabbitMQ、Kafka、ActiveMQ、RocketMQ等。其能在不同平台之间进行通信,常用来屏蔽各种平台协议之间的特性,实现应用程序之间的协同。优点在于能够在客户端和服务器之间进行同步和异步的连接,并且在任何时刻都可以将消息进行传送和转发,是分布式系统中非常重要的组件,主要用来解决应用耦合、异步通信、流量削峰等问题。
2、作用
1、消息中间件主要作用
- 解耦
- 冗余(存储)
- 扩展性
- 削峰
- 可恢复性
- 顺序保证
- 缓冲
- 异步通信
2、消息中间件的两种模式
1、P2P模式
P2P模式包含三个角色:消息队列(Queue)、发送者(Sender)、接收者(Receiver)。每个消息都被发送到一个特定的队列,接收者从队列中获取消息。队列保留着消息,直到它们被消费或超时。
P2P的特点:
- 每个消息只有一个消费者(Consumer),即一旦被消费,消息就不再在消息队列中
- 发送者和接收者之间在时间上没有依赖性,也就是说当发送者发送了消息之后,不管接收者有没有正在运行它不会影响到消息被发送到队列
- 接收者在成功接收消息之后需向队列应答成功
- 如果希望发送的每个消息都会被成功处理的话,那么需要P2P模
2、Pub/Sub模式
Pub/Sub模式包含三个角色:主题(Topic)、发布者(Publisher)、订阅者(Subscriber) 。多个发布者将消息发送到Topic,系统将这些消息传递给多个订阅者。
Pub/Sub的特点:
- 每个消息可以有多个消费者
- 发布者和订阅者之间有时间上的依赖性。针对某个主题(Topic)的订阅者,它必须创建一个订阅者之后,才能消费发布者的消息
- 为了消费消息,订阅者必须保持运行的状态
- 如果希望发送的消息可以不被做任何处理、或者只被一个消息者处理、或者可以被多个消费者处理的话,那么可以采用Pub/Sub模型
3、常用中间件介绍与对比
1、Kafka
Kafka是LinkedIn开源的分布式发布-订阅消息系统,目前归属于Apache顶级项目。Kafka主要特点是基于Pull的模式来处理消息消费,追求高吞吐量,一开始的目的就是用于日志收集和传输。0.8版本开始支持复制,不支持事务,对消息的重复、丢失、错误没有严格要求,适合产生大量数据的互联网服务的数据收集业务。
2、RabbitMQ
RabbitMQ是使用Erlang语言开发的开源消息队列系统,基于AMQP协议来实现。AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。AMQP协议更多用在企业系统内对数据一致性、稳定性和可靠性要求很高的场景,对性能和吞吐量的要求还在其次。
3、RocketMQ
RocketMQ是阿里开源的消息中间件,它是纯Java开发,具有高吞吐量、高可用性、适合大规模分布式系统应用的特点。RocketMQ思路起源于Kafka,但并不是Kafka的一个Copy,它对消息的可靠传输及事务性做了优化,目前在阿里集团被广泛应用于交易、充值、流计算、消息推送、日志流式处理、binglog分发等场景。
RabbitMQ比Kafka可靠,Kafka更适合IO高吞吐的处理,一般应用在大数据日志处理或对实时性(少量延迟),可靠性(少量丢数据)要求稍低的场景使用,比如ELK日志收集。
2、RabbitMQ 详解
1、RabbitMQ 介绍
RabbitMQ是一个在AMQP(Advanced Message Queuing Protocol )基础上实现的,可复用的企业消息系统。它可以用于大型软件系统各个模块之间的高效通信,支持高并发,支持可扩展。它支持多种客户端如:Python、Ruby、.NET、Java、JMS、C、php、ActionScript、XMPP、STOMP等,支持AJAX,持久化,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。
RabbitMQ是使用Erlang编写的一个开源的消息队列,本身支持很多的协议:AMQP,XMPP, SMTP, STOMP,也正是如此,使的它变的非常重量级,更适合于企业级的开发。它同时实现了一个Broker构架,这意味着消息在发送给客户端时先在中心队列排队,对路由(Routing)、负载均衡(Load balance)或者数据持久化都有很好的支持。
2、RabbitMQ 特点
- 可靠性
- 灵活的路由
- 扩展性
- 高可用性
- 多种协议
- 多语言客户端
- 管理界面
- 插件机制
3、AMQP 介绍
AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。
4、什么和是消息队列
MQ 全称为Message Queue, 消息队列。是一种应用程序对应用程序的通信方法。应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们。
消息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信。队列的使用除去了接收和发送应用程序同时执行的要求。
在项目中,将一些无需即时返回且耗时的操作提取出来,进行了异步处理,而这种异步处理的方式大大的节省了服务器的请求响应时间,从而提高了系统的吞吐量。
消息队列的使用场景是怎样的?
5、RabbitMQ 应用场景
对于一个大型的软件系统来说,它会有很多的组件或者说模块或者说子系统或者(subsystem or Component or submodule)。那么这些模块的如何通信?这和传统的IPC有很大的区别。传统的IPC很多都是在单一系统上的,模块耦合性很大,不适合扩展(Scalability);如果使用socket那么不同的模块的确可以部署到不同的机器上,但是还是有很多问题需要解决。比如:
1)信息的发送者和接收者如何维持这个连接,如果一方的连接中断,这期间的数据如何防止丢失?
2)如何降低发送者和接收者的耦合度?
3)如何让Priority高的接收者先接到数据?
4)如何做到load balance?有效均衡接收者的负载?
5)如何有效的将数据发送到相关的接收者?也就是说将接收者subscribe 不同的数据,如何做有效的filter。
6)如何做到可扩展,甚至将这个通信模块发到cluster上?
7)如何保证接收者接收到了完整,正确的数据?
AMDQ协议解决了以上的问题,而RabbitMQ实现了AMQP。
6、RabbitMQ 概念介绍
- Broker:简单来说就是消息队列服务器实体。
- Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。
- Queue:消息队列载体,每个消息都会被投入到一个或多个队列。
- Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来。
- Routing Key:路由关键字,exchange根据这个关键字进行消息投递。
- vhost:虚拟主机,一个broker里可以开设多个vhost,用作不同用户的权限分离。
- producer:消息生产者,就是投递消息的程序。
- consumer:消息消费者,就是接受消息的程序。
- channel:消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务。
RabbitMQ从整体上来看是一个典型的生产者消费者模型,主要负责接收、存储和转发消息
7、RabbitMQ 使用流程
AMQP模型中,消息在producer中产生,发送到MQ的exchange上,exchange根据配置的路由方式发到相应的Queue上,Queue又将消息发送给consumer,消息从queue到consumer有push和pull两种方式。 消息队列的使用过程大概如下:
- 客户端连接到消息队列服务器,打开一个channel。
- 客户端声明一个exchange,并设置相关属性。
- 客户端声明一个queue,并设置相关属性。
- 客户端使用routing key,在exchange和queue之间建立好绑定关系。
- 客户端投递消息到exchange。
exchange接收到消息后,就根据消息的key和已经设置的binding,进行消息路由,将消息投递到一个或多个队列里。 exchange也有几个类型,完全根据key进行投递的叫做Direct交换机,例如,绑定时设置了routing key为”abc”,那么客户端提交的消息,只有设置了key为”abc”的才会投递到队列。
3、RabbitMQ 单机安装部署
1、安装 erlang
添加yum支持
cd /usr/local/src/
mkdir rabbitmq
cd rabbitmq
wget http://packages.erlang-solutions.com/erlang-solutions-1.0-1.noarch.rpm
rpm -ivh erlang-solutions-1.0-1.noarch.rpm
rpm --import http://packages.erlang-solutions.com/rpm/erlang_solutions.asc
yum install erlang
2、安装RabbitMQ
1、用 yum 安装 RabbitMQ
rpm --import https://github.com/rabbitmq/signing-keys/releases/download/2.0/rabbitmq-release-signing-key.asc
#this example assumes the CentOS 7 version of the package
yum install rabbitmq-server-3.7.13-1.el7.noarch.rpm
rpm --import https://www.rabbitmq.com/rabbitmq-release-signing-key.asc
# this example assumes the CentOS 7 version of the package
yum install rabbitmq-server-3.7.13-1.el7.noarch.rpm
2、用 rpm 手动安装
下载:
wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.7.13/rabbitmq-server-3.7.13-1.el7.noarch.rpm
上传rabbitmq-server-3.7.13-1.el7.noarch.rpm文件到/usr/local/src/rabbitmq/
安装:
rpm -ivh rabbitmq-server-3.7.13-1.el7.noarch.rpm
常用命令
service rabbitmq-server start
service rabbitmq-server stop
service rabbitmq-server restart
chkconfig rabbitmq-server on //设置开机自启
设置配置文件:
cd /etc/rabbitmq
cp /usr/share/doc/rabbitmq-server-3.7.13/rabbitmq.config.example /etc/rabbitmq/
mv rabbitmq.config.example rabbitmq.config
设置用户远程访问:
vim /etc/rabbitmq/rabbitmq.config
找到{}loopack_users , [] }把","去掉
开启web界面管理工具
rabbitmq-plugins enable rabbitmq_management
service rabbitmq-server restart
防火墙设置
/sbin/iptables -I INPUT -p tcp --dport 15672 -j ACCEPT
/etc/rc.d/init.d/iptables save
5、RabbitMQ常用的命令
1、基本命令
启动监控管理器:rabbitmq-plugins enable rabbitmq_management
关闭监控管理器:rabbitmq-plugins disable rabbitmq_management
启动rabbitmq:rabbitmq-service start
关闭rabbitmq:rabbitmq-service stop
查看所有的队列:rabbitmqctl list_queues
清除所有的队列:rabbitmqctl reset
关闭应用:rabbitmqctl stop_app
启动应用:rabbitmqctl start_app
2、用户和权限设置
添加用户:rabbitmqctl add_user username password
分配角色:rabbitmqctl set_user_tags username administrator
新增虚拟主机:rabbitmqctl add_vhost vhost_name
将新虚拟主机授权给新用户:rabbitmqctl set_permissions -p vhost_name username “.” “.” “.”(后面三个””代表用户拥有配置、写、读全部权限)
3、角色说明
- 超级管理员(administrator)
可登陆管理控制台,可查看所有的信息,并且可以对用户,策略(policy)进行操作。 - 监控者(monitoring)
可登陆管理控制台,同时可以查看rabbitmq节点的相关信息(进程数,内存使用情况,磁盘使用情况等) - 策略制定者(policymaker)
可登陆管理控制台, 同时可以对policy进行管理。但无法查看节点的相关信息(上图红框标识的部分)。 - 普通管理者(management)
仅可登陆管理控制台,无法看到节点信息,也无法对策略进行管理。 - 其他
无法登陆管理控制台,通常就是普通的生产者和消费者。4、RabbitMQ 集群部署及配置
署 RabbitMQ Cluster
1、环境要求
1、所有节点需要再同一个局域网内;
2、所有节点需要有相同的 erlang cookie,否则不能正常通信,为了实现cookie内容一致,采用scp的方式进行。
3、准备三台虚拟机,配置相同
rabbitmq01 192.168.101.11
rabbitmq02 192.168.101.12
rabbitmq03 192.168.101.13
操作系统:centos7.5
2、部署过程
1、所有节点配置/etc/hosts
node1 192.168.101.11
node2 192.168.101.12
node3 192.168.101.13
2、所有节点安装 erLang 和 rabbitmq
1、安装erlang
安装依赖包
yum install -y *epel* gcc-c++ unixODBC unixODBC-devel openssl-devel ncurses-devel
编译安装
wget http://erlang.org/download/otp_src_21.3.tar.gz
tar -zxvf otp_src_21.3.tar.gz
cd otp_src_21.3
./configure --prefix=/usr/local/bin/erlang --without-javac
make && make install
echo "export PATH=$PATH:/usr/local/bin/erlang/bin:/usr/local/bin/rabbitmq_server-3.6.15/sbin" >> /etc/profile
source /etc/profile
出现 erl 命令则说明安装成功;
安装rabbitmq
编译安装
wget http://www.rabbitmq.com/releases/rabbitmq-server/v3.6.15/rabbitmq-server-generic-unix-3.6.15.tar.xz
yum install -y xz
xz -d rabbitmq-server-generic-unix-3.6.15.tar.xz
tar -xvf rabbitmq-server-generic-unix-3.6.15.tar -C /usr/local/bin/
echo "export PATH=$PATH:/usr/local/bin/erlang/bin:/usr/local/bin/rabbitmq_server-3.6.15/sbin" >> /etc/profile
source /etc/profile
导入 rabbitmq 的管理界面
rabbitmq-plugins enable rabbitmq_management
**设置 erlang
找到erlang cookie文件的位置,官方在介绍集群的文档中提到过.erlang.cookie 一般会存在这两个地址:第一个是home/.erlang.cookie;第二个地方就是/var/lib/rabbitmq/.erlang.cookie。如果我们使用解压缩方式安装部署的rabbitmq,那么这个文件会在{home}目录下,也就是$home/.erlang.cookie。如果我们使用rpm等安装包方式进行安装的,那么这个文件会在/var/lib/rabbitmq目录下。
这里将 node1 的该文件复制到 node2、node3,注意这个文件的权限是 400(默认即是400),因此采用scp的方式只拷贝内容即可;
可以通过cat $home/.erlang.cookie来查看三台机器的cookie是否一致,设置erlang的目的是要保证集群内的cookie内容一致。
**使用-detached参数运行各节点
rabbitmqctl stop
rabbitmq-server -detached
然后可以通过 rabbitmqctl cluster_status查看节点状态。
注意:要先拷贝cookie到另外两台机器上,保证三台机器上的cookie是一致的,然后再启动服务。
由于guest这个用户,只能在本地访问,所以我们要新增一个用户并赋予权限:
添加用户并设置密码:
rabbitmqctl add_user admin 123456
添加权限(使admin用户对虚拟主机“/” 具有所有权限):
rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"
修改用户角色(加入administrator用户组)
rabbitmqctl set_user_tags admin administrator
然后就可以远程访问了,然后可直接配置用户权限等信息。到此,就可以通过http://ip:15672 使用admin 123456 进行登陆了。
到这里的话,每个节点是作为单独的一台RabbitMQ存在的,也可以正常提供服务了
**组成集群
rabbitmq-server 启动时,会一起启动节点和应用,它预先设置RabbitMQ应用为standalone模式。要将一个节点加入到现有的集群中,你需要停止这个应用,并将节点设置为原始状态。如果使用./rabbitmqctl stop,应用和节点都将被关闭。所以使用rabbitmqctl stop_app仅仅关闭应用。
1、将 node2、node3与 node1 组成集群,这里以node2为例
node2# rabbitmqctl stop_app
node2# rabbitmqctl join_cluster rabbit@node1 ####这里集群的名字一定不要写错了
node2# rabbitmqctl start_app
2、将node3重复上述操作,也加入node1的集群。
node3# rabbitmqctl stop_app
node3# rabbitmqctl join_cluster rabbit@node1 ####这里集群的名字一定不要写错了
node3# rabbitmqctl start_app
则此时 node2 与 node3 也会自动建立连接,集群配置完毕;
#使用内存节点加入集群
node2 # rabbitmqctl join_cluster --ram rabbit@node1
3、在 RabbitMQ 集群任意节点上执行 rabbitmqctl cluster_status来查看是否集群配置成功。
node3# rabbitmqctl cluster_status
Cluster status of node rabbit@node3 ...
[{nodes,[{disc,[rabbit@node1,rabbit@node2,rabbit@node3]}]},
{running_nodes,[rabbit@node1,rabbit@node2,rabbit@node3]},
{cluster_name,<"rabbit@node1">}, #集群的名称默认为 rabbit@node1
{partitions,[]},
{alarms,[{rabbit@node1,[]},{rabbit@node2,[]},{rabbit@node3,[]}]}]
** 设置镜像队列策略
在任意一个节点上执行如下操作(这里在node1上执行)
首先,在web界面,登陆后,点击“Admin--Virtual Hosts(页面右侧)”,在打开的页面上的下方的“Add a new virtual host”处增加一个虚拟主机,同时给用户“admin”和“guest”均加上权限(在页面直接设置、点点点即可);
然后,在linux中执行如下命令
rabbitmqctl set_policy -p coresystem ha-all "^" ‘{"ha-mode":"all"}‘
"coresystem" vhost名称, "^"匹配所有的队列, ha-all 策略名称为ha-all, ‘{"ha-mode":"all"}‘ 策略模式为 all 即复制到所有节点,包含新增节点。
则此时镜像队列设置成功。(这里的虚拟主机coresystem是代码中需要用到的虚拟主机,虚拟主机的作用是做一个消息的隔离,本质上可认为是一个rabbitmq-server,是否增加虚拟主机,增加几个,这是由开发中的业务决定,即有哪几类服务,哪些服务用哪一个虚拟主机,这是一个规划)。
**镜像队列策略设置说明
rabbitmqctl set_policy [-p Vhost] Name Pattern Definition [Priority]
-p Vhost: 可选参数,针对指定vhost下的queue进行设置
Name: policy的名称
Pattern: queue的匹配模式(正则表达式)
Definition:镜像定义,包括三个部分ha-mode, ha-params, ha-sync-mode
ha-mode:指明镜像队列的模式,有效值为 all/exactly/nodes
all:表示在集群中所有的节点上进行镜像
exactly:表示在指定个数的节点上进行镜像,节点的个数由ha-params指定
nodes:表示在指定的节点上进行镜像,节点名称通过ha-params指定
ha-params:ha-mode模式需要用到的参数
ha-sync-mode:进行队列中消息的同步方式,有效值为automatic和manual
priority:可选参数,policy的优先级
将所有队列设置为镜像队列,即队列会被复制到各个节点,各个节点状态保持一直。完成这 6 个步骤后,RabbitMQ 高可用集群搭建完成,最后一个步骤就是搭建均衡器。
**安装并配置负载均衡器HA
注意:如果使用阿里云,可以使用阿里云的内网slb来实现负载均衡,不用自己搭建HA。
1、在192.168.101.11安装HAProxy
yum -y install HAProxy
2、修改 /etc/haproxy/haproxy.cfg
vim /etc/haproxy/haproxy.cfg
global
log 127.0.0.1 local2
chroot /var/lib/haproxy
pidfile /var/run/haproxy.pid
maxconn 4000
user haproxy
group haproxy
daemon
stats socket /var/lib/haproxy/stats
defaults
log global
mode tcp
option tcplog
option dontlognull
retries 3
option redispatch
maxconn 2000
contimeout 5s
clitimeout 120s
srvtimeout 120s
listen rabbitmq_cluster 192.168.101.11:5670
mode tcp
balance roundrobin
server rabbit1 192.168.101.11:5672 check inter 5000 rise 2 fall 2
server rabbit2 192.168.101.12:5672 check inter 5000 rise 2 fall 2
3、重启HAProxy
service haproxy restart
登录浏览器输入地址http://192.168.101.11:8100/rabbitmqstats查看HAProxy的状态
三、常见问题
常见错误:
1、使用 rabbitmq-server -detached命令启动rabbitmq时,出现以下提示Warning: PID file not written; -detached was passed,此时使用rabbitmqctl status提示服务已启动,可知此问题不用解决。
2、由于更改hostname文件,在每次rabbitmqctl stop或者rabbitmqctl cluster_status等,只要是rabbitmq的命令就报错,提示大概如下
Cluster status of node rabbit@web2 ...
Error: unable to connect to node rabbit@web2: nodedown
DIAGNOSTICS
===========
attempted to contact: [rabbit@web2]
rabbit@web2:
? * connected to epmd (port 4369) on web2
? * epmd reports node ‘rabbit‘ running on port 25672
? * TCP connection succeeded but Erlang distribution failed
? * Hostname mismatch: node "rabbit@mq2" believes its host is different. Please ensure that hostnames resolve the same way locally and on "rabbit@mq2"
current node details:
- node name: ‘rabbitmq-cli-11@web2‘
- home dir: /root
- cookie hash: SGwxMdJ3PjEXG1asIEFpBg==
此时先ps aux | grep mq,然后kill -9 该进程,然后再rabbitmq-server -detached即可解决。(即先强杀,再重新启动)
3、使用rabbitmqctl stop,rabbitmq-server -detached重新启动后,原先添加的用户admin、虚拟主机coresystem等均丢失,还需要重新添加。
采用脚本启动,在脚本中写好启动好需要加载的各配置项(创建admin用户并授权,创建虚拟主机并授权,配置镜像队列)。
4、命令
rabbitmqctl stop_app #仅关闭应用,不关闭节点
rabbitmqctl start_app #开启应用
rabbitmq--server -detached #启动节点和应用
rabbitmqctl stop #关闭节点和应用
4、常用命令:
Rabbitmq服务器的主要通过rabbitmqctl和rabbimq-plugins两个工具来管理,以下是一些常用功能。
1、 服务器启动与关闭
启动: rabbitmq-server –detached
关闭: rabbitmqctl stop
若单机有多个实例,则在rabbitmqctl后加 –n 指定名称
2、插件管理
开启某个插件:rabbitmq-plugins enable xxx
关闭某个插件:rabbitmq-plugins disable xxx
注意:重启服务器后生效。
3、virtual_host管理
新建virtual_host:rabbitmqctl add_vhost xxx
撤销virtual_host:rabbitmqctl delete_vhost xxx
4、用户管理
新建用户:rabbitmqctl add_user xxxpwd
删除用户: rabbitmqctl delete_user xxx
查看用户:rabbitmqctl list_users
改密码: rabbimqctl change_password {username} {newpassword}
设置用户角色:rabbitmqctlset_user_tags {username} {tag ...}
Tag可以为 administrator,monitoring, management
5、 权限管理
权限设置:set_permissions [-pvhostpath] {user} {conf} {write} {read}
Vhostpath: Vhost路径
user: 用户名
Conf: 一个正则表达式match哪些配置资源能够被该用户访问。
Write: 一个正则表达式match哪些配置资源能够被该用户读。
Read: 一个正则表达式match哪些配置资源能够被该用户访问。
6、获取服务器状态信息
服务器状态:rabbitmqctl status ##其中可查看rabbitmq的版本信息
7、获取集群状态信息
rabbitmqctl cluster_status
以上是关于RabbitMQ 消息中间件的主要内容,如果未能解决你的问题,请参考以下文章