RabbitMQ 消息中间件

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ 消息中间件相关的知识,希望对你有一定的参考价值。

1、消息中间件

1、简介

消息中间件也可以称消息队列,是指用高效可靠的消息传递机制进行与平台无关的数据交流,并基于数据通信来进行分布式系统的集成。通过提供消息传递和消息队列模型,可以在分布式环境下扩展进程的通信。

当下主流的消息中间件有RabbitMQ、Kafka、ActiveMQ、RocketMQ等。其能在不同平台之间进行通信,常用来屏蔽各种平台协议之间的特性,实现应用程序之间的协同。优点在于能够在客户端和服务器之间进行同步和异步的连接,并且在任何时刻都可以将消息进行传送和转发,是分布式系统中非常重要的组件,主要用来解决应用耦合、异步通信、流量削峰等问题。

2、作用

1、消息中间件主要作用

  • 解耦
  • 冗余(存储)
  • 扩展性
  • 削峰
  • 可恢复性
  • 顺序保证
  • 缓冲
  • 异步通信

2、消息中间件的两种模式

1、P2P模式

P2P模式包含三个角色:消息队列(Queue)、发送者(Sender)、接收者(Receiver)。每个消息都被发送到一个特定的队列,接收者从队列中获取消息。队列保留着消息,直到它们被消费或超时。

P2P的特点:

  • 每个消息只有一个消费者(Consumer),即一旦被消费,消息就不再在消息队列中
  • 发送者和接收者之间在时间上没有依赖性,也就是说当发送者发送了消息之后,不管接收者有没有正在运行它不会影响到消息被发送到队列
  • 接收者在成功接收消息之后需向队列应答成功
  • 如果希望发送的每个消息都会被成功处理的话,那么需要P2P模

2、Pub/Sub模式

Pub/Sub模式包含三个角色:主题(Topic)、发布者(Publisher)、订阅者(Subscriber) 。多个发布者将消息发送到Topic,系统将这些消息传递给多个订阅者。

Pub/Sub的特点:

  • 每个消息可以有多个消费者
  • 发布者和订阅者之间有时间上的依赖性。针对某个主题(Topic)的订阅者,它必须创建一个订阅者之后,才能消费发布者的消息
  • 为了消费消息,订阅者必须保持运行的状态
  • 如果希望发送的消息可以不被做任何处理、或者只被一个消息者处理、或者可以被多个消费者处理的话,那么可以采用Pub/Sub模型

3、常用中间件介绍与对比

1、Kafka

Kafka是LinkedIn开源的分布式发布-订阅消息系统,目前归属于Apache顶级项目。Kafka主要特点是基于Pull的模式来处理消息消费,追求高吞吐量,一开始的目的就是用于日志收集和传输。0.8版本开始支持复制,不支持事务,对消息的重复、丢失、错误没有严格要求,适合产生大量数据的互联网服务的数据收集业务。

2、RabbitMQ

RabbitMQ是使用Erlang语言开发的开源消息队列系统,基于AMQP协议来实现。AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。AMQP协议更多用在企业系统内对数据一致性、稳定性和可靠性要求很高的场景,对性能和吞吐量的要求还在其次。

3、RocketMQ

RocketMQ是阿里开源的消息中间件,它是纯Java开发,具有高吞吐量、高可用性、适合大规模分布式系统应用的特点。RocketMQ思路起源于Kafka,但并不是Kafka的一个Copy,它对消息的可靠传输及事务性做了优化,目前在阿里集团被广泛应用于交易、充值、流计算、消息推送、日志流式处理、binglog分发等场景。

RabbitMQ比Kafka可靠,Kafka更适合IO高吞吐的处理,一般应用在大数据日志处理或对实时性(少量延迟),可靠性(少量丢数据)要求稍低的场景使用,比如ELK日志收集。

2、RabbitMQ 详解

1、RabbitMQ 介绍

RabbitMQ是一个在AMQP(Advanced Message Queuing Protocol )基础上实现的,可复用的企业消息系统。它可以用于大型软件系统各个模块之间的高效通信,支持高并发,支持可扩展。它支持多种客户端如:Python、Ruby、.NET、Java、JMS、C、php、ActionScript、XMPP、STOMP等,支持AJAX,持久化,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。

RabbitMQ是使用Erlang编写的一个开源的消息队列,本身支持很多的协议:AMQP,XMPP, SMTP, STOMP,也正是如此,使的它变的非常重量级,更适合于企业级的开发。它同时实现了一个Broker构架,这意味着消息在发送给客户端时先在中心队列排队,对路由(Routing)、负载均衡(Load balance)或者数据持久化都有很好的支持。

2、RabbitMQ 特点

  • 可靠性
  • 灵活的路由
  • 扩展性
  • 高可用性
  • 多种协议
  • 多语言客户端
  • 管理界面
  • 插件机制

3、AMQP 介绍

AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。

4、什么和是消息队列

MQ 全称为Message Queue, 消息队列。是一种应用程序对应用程序的通信方法。应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们。

消息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信。队列的使用除去了接收和发送应用程序同时执行的要求。

在项目中,将一些无需即时返回且耗时的操作提取出来,进行了异步处理,而这种异步处理的方式大大的节省了服务器的请求响应时间,从而提高了系统的吞吐量。

消息队列的使用场景是怎样的?

5、RabbitMQ 应用场景

对于一个大型的软件系统来说,它会有很多的组件或者说模块或者说子系统或者(subsystem or Component or submodule)。那么这些模块的如何通信?这和传统的IPC有很大的区别。传统的IPC很多都是在单一系统上的,模块耦合性很大,不适合扩展(Scalability);如果使用socket那么不同的模块的确可以部署到不同的机器上,但是还是有很多问题需要解决。比如:

1)信息的发送者和接收者如何维持这个连接,如果一方的连接中断,这期间的数据如何防止丢失?

2)如何降低发送者和接收者的耦合度?

3)如何让Priority高的接收者先接到数据?

4)如何做到load balance?有效均衡接收者的负载?

5)如何有效的将数据发送到相关的接收者?也就是说将接收者subscribe 不同的数据,如何做有效的filter。

6)如何做到可扩展,甚至将这个通信模块发到cluster上?

7)如何保证接收者接收到了完整,正确的数据?

AMDQ协议解决了以上的问题,而RabbitMQ实现了AMQP。

6、RabbitMQ 概念介绍

  • Broker:简单来说就是消息队列服务器实体。
  • Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。
  • Queue:消息队列载体,每个消息都会被投入到一个或多个队列。
  • Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来。
  • Routing Key:路由关键字,exchange根据这个关键字进行消息投递。
  • vhost:虚拟主机,一个broker里可以开设多个vhost,用作不同用户的权限分离。
  • producer:消息生产者,就是投递消息的程序。
  • consumer:消息消费者,就是接受消息的程序。
  • channel:消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务。

RabbitMQ从整体上来看是一个典型的生产者消费者模型,主要负责接收、存储和转发消息

技术图片

7、RabbitMQ 使用流程

AMQP模型中,消息在producer中产生,发送到MQ的exchange上,exchange根据配置的路由方式发到相应的Queue上,Queue又将消息发送给consumer,消息从queue到consumer有push和pull两种方式。 消息队列的使用过程大概如下:

  1. 客户端连接到消息队列服务器,打开一个channel。
  2. 客户端声明一个exchange,并设置相关属性。
  3. 客户端声明一个queue,并设置相关属性。
  4. 客户端使用routing key,在exchange和queue之间建立好绑定关系。
  5. 客户端投递消息到exchange。

exchange接收到消息后,就根据消息的key和已经设置的binding,进行消息路由,将消息投递到一个或多个队列里。 exchange也有几个类型,完全根据key进行投递的叫做Direct交换机,例如,绑定时设置了routing key为”abc”,那么客户端提交的消息,只有设置了key为”abc”的才会投递到队列。

技术图片

3、RabbitMQ 单机安装部署

1、安装 erlang
添加yum支持

cd /usr/local/src/
mkdir rabbitmq
cd rabbitmq
wget http://packages.erlang-solutions.com/erlang-solutions-1.0-1.noarch.rpm
rpm -ivh erlang-solutions-1.0-1.noarch.rpm
rpm --import http://packages.erlang-solutions.com/rpm/erlang_solutions.asc
yum install erlang

2、安装RabbitMQ

1、用 yum 安装 RabbitMQ

rpm --import https://github.com/rabbitmq/signing-keys/releases/download/2.0/rabbitmq-release-signing-key.asc
#this example assumes the CentOS 7 version of the package
yum install rabbitmq-server-3.7.13-1.el7.noarch.rpm
rpm --import https://www.rabbitmq.com/rabbitmq-release-signing-key.asc
# this example assumes the CentOS 7 version of the package
yum install rabbitmq-server-3.7.13-1.el7.noarch.rpm

2、用 rpm 手动安装

下载:

wget  https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.7.13/rabbitmq-server-3.7.13-1.el7.noarch.rpm

上传rabbitmq-server-3.7.13-1.el7.noarch.rpm文件到/usr/local/src/rabbitmq/

安装:

rpm -ivh rabbitmq-server-3.7.13-1.el7.noarch.rpm

常用命令

service rabbitmq-server start
service rabbitmq-server stop
service rabbitmq-server restart 
chkconfig rabbitmq-server on  //设置开机自启

设置配置文件:

cd /etc/rabbitmq
cp /usr/share/doc/rabbitmq-server-3.7.13/rabbitmq.config.example /etc/rabbitmq/
mv rabbitmq.config.example rabbitmq.config

设置用户远程访问:

vim /etc/rabbitmq/rabbitmq.config

找到{}loopack_users , [] }把","去掉
开启web界面管理工具

rabbitmq-plugins enable rabbitmq_management
service rabbitmq-server restart

防火墙设置

/sbin/iptables -I INPUT -p tcp --dport 15672 -j ACCEPT
/etc/rc.d/init.d/iptables save

5、RabbitMQ常用的命令

1、基本命令

启动监控管理器:rabbitmq-plugins enable rabbitmq_management

关闭监控管理器:rabbitmq-plugins disable rabbitmq_management

启动rabbitmq:rabbitmq-service start

关闭rabbitmq:rabbitmq-service stop

查看所有的队列:rabbitmqctl list_queues

清除所有的队列:rabbitmqctl reset

关闭应用:rabbitmqctl stop_app

启动应用:rabbitmqctl start_app

2、用户和权限设置

添加用户:rabbitmqctl add_user username password

分配角色:rabbitmqctl set_user_tags username administrator

新增虚拟主机:rabbitmqctl add_vhost vhost_name

将新虚拟主机授权给新用户:rabbitmqctl set_permissions -p vhost_name username “.” “.” “.”(后面三个””代表用户拥有配置、写、读全部权限)

3、角色说明

  • 超级管理员(administrator)
    可登陆管理控制台,可查看所有的信息,并且可以对用户,策略(policy)进行操作。
  • 监控者(monitoring)
    可登陆管理控制台,同时可以查看rabbitmq节点的相关信息(进程数,内存使用情况,磁盘使用情况等)
  • 策略制定者(policymaker)
    可登陆管理控制台, 同时可以对policy进行管理。但无法查看节点的相关信息(上图红框标识的部分)。
  • 普通管理者(management)
    仅可登陆管理控制台,无法看到节点信息,也无法对策略进行管理。
  • 其他
    无法登陆管理控制台,通常就是普通的生产者和消费者。

    4、RabbitMQ 集群部署及配置

    署 RabbitMQ Cluster

    1、环境要求

1、所有节点需要再同一个局域网内;

2、所有节点需要有相同的 erlang cookie,否则不能正常通信,为了实现cookie内容一致,采用scp的方式进行。

3、准备三台虚拟机,配置相同

rabbitmq01 192.168.101.11

rabbitmq02 192.168.101.12

rabbitmq03 192.168.101.13

操作系统:centos7.5

2、部署过程

1、所有节点配置/etc/hosts

node1 192.168.101.11

node2 192.168.101.12

node3 192.168.101.13

2、所有节点安装 erLang 和 rabbitmq

1、安装erlang

安装依赖包

yum install -y *epel* gcc-c++ unixODBC unixODBC-devel openssl-devel ncurses-devel

编译安装

wget http://erlang.org/download/otp_src_21.3.tar.gz
tar -zxvf otp_src_21.3.tar.gz
cd otp_src_21.3
./configure --prefix=/usr/local/bin/erlang --without-javac
make && make install
echo "export PATH=$PATH:/usr/local/bin/erlang/bin:/usr/local/bin/rabbitmq_server-3.6.15/sbin" >> /etc/profile
source /etc/profile

出现 erl 命令则说明安装成功;

安装rabbitmq

编译安装

wget http://www.rabbitmq.com/releases/rabbitmq-server/v3.6.15/rabbitmq-server-generic-unix-3.6.15.tar.xz
yum install -y xz
xz -d rabbitmq-server-generic-unix-3.6.15.tar.xz
tar -xvf rabbitmq-server-generic-unix-3.6.15.tar -C /usr/local/bin/
echo "export PATH=$PATH:/usr/local/bin/erlang/bin:/usr/local/bin/rabbitmq_server-3.6.15/sbin" >> /etc/profile
source /etc/profile

导入 rabbitmq 的管理界面

rabbitmq-plugins enable rabbitmq_management

**设置 erlang

找到erlang cookie文件的位置,官方在介绍集群的文档中提到过.erlang.cookie 一般会存在这两个地址:第一个是home/.erlang.cookie;第二个地方就是/var/lib/rabbitmq/.erlang.cookie。如果我们使用解压缩方式安装部署的rabbitmq,那么这个文件会在{home}目录下,也就是$home/.erlang.cookie。如果我们使用rpm等安装包方式进行安装的,那么这个文件会在/var/lib/rabbitmq目录下。

这里将 node1 的该文件复制到 node2、node3,注意这个文件的权限是 400(默认即是400),因此采用scp的方式只拷贝内容即可;

可以通过cat $home/.erlang.cookie来查看三台机器的cookie是否一致,设置erlang的目的是要保证集群内的cookie内容一致。

**使用-detached参数运行各节点

rabbitmqctl stop

rabbitmq-server -detached

然后可以通过 rabbitmqctl cluster_status查看节点状态。

注意:要先拷贝cookie到另外两台机器上,保证三台机器上的cookie是一致的,然后再启动服务。

由于guest这个用户,只能在本地访问,所以我们要新增一个用户并赋予权限:

添加用户并设置密码:

rabbitmqctl add_user  admin 123456

添加权限(使admin用户对虚拟主机“/” 具有所有权限):

rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"

修改用户角色(加入administrator用户组)

rabbitmqctl set_user_tags admin administrator

然后就可以远程访问了,然后可直接配置用户权限等信息。到此,就可以通过http://ip:15672 使用admin 123456 进行登陆了。

到这里的话,每个节点是作为单独的一台RabbitMQ存在的,也可以正常提供服务了

**组成集群

rabbitmq-server 启动时,会一起启动节点和应用,它预先设置RabbitMQ应用为standalone模式。要将一个节点加入到现有的集群中,你需要停止这个应用,并将节点设置为原始状态。如果使用./rabbitmqctl stop,应用和节点都将被关闭。所以使用rabbitmqctl stop_app仅仅关闭应用。

1、将 node2、node3与 node1 组成集群,这里以node2为例

    node2# rabbitmqctl stop_app      
    node2# rabbitmqctl join_cluster rabbit@node1               ####这里集群的名字一定不要写错了
    node2# rabbitmqctl start_app

2、将node3重复上述操作,也加入node1的集群。

node3# rabbitmqctl stop_app      
node3# rabbitmqctl join_cluster rabbit@node1               ####这里集群的名字一定不要写错了
node3# rabbitmqctl start_app

则此时 node2 与 node3 也会自动建立连接,集群配置完毕;

    #使用内存节点加入集群
    node2 # rabbitmqctl join_cluster --ram rabbit@node1

3、在 RabbitMQ 集群任意节点上执行 rabbitmqctl cluster_status来查看是否集群配置成功。

node3# rabbitmqctl cluster_status
Cluster status of node rabbit@node3 ...
[{nodes,[{disc,[rabbit@node1,rabbit@node2,rabbit@node3]}]},
 {running_nodes,[rabbit@node1,rabbit@node2,rabbit@node3]},
 {cluster_name,<"rabbit@node1">},      #集群的名称默认为 rabbit@node1
 {partitions,[]},
 {alarms,[{rabbit@node1,[]},{rabbit@node2,[]},{rabbit@node3,[]}]}]

** 设置镜像队列策略

在任意一个节点上执行如下操作(这里在node1上执行)

首先,在web界面,登陆后,点击“Admin--Virtual Hosts(页面右侧)”,在打开的页面上的下方的“Add a new virtual host”处增加一个虚拟主机,同时给用户“admin”和“guest”均加上权限(在页面直接设置、点点点即可);

然后,在linux中执行如下命令

    rabbitmqctl set_policy -p coresystem  ha-all "^" ‘{"ha-mode":"all"}‘

"coresystem" vhost名称, "^"匹配所有的队列, ha-all 策略名称为ha-all, ‘{"ha-mode":"all"}‘ 策略模式为 all 即复制到所有节点,包含新增节点。

则此时镜像队列设置成功。(这里的虚拟主机coresystem是代码中需要用到的虚拟主机,虚拟主机的作用是做一个消息的隔离,本质上可认为是一个rabbitmq-server,是否增加虚拟主机,增加几个,这是由开发中的业务决定,即有哪几类服务,哪些服务用哪一个虚拟主机,这是一个规划)。
**镜像队列策略设置说明

    rabbitmqctl set_policy [-p Vhost] Name Pattern Definition [Priority]

    -p Vhost: 可选参数,针对指定vhost下的queue进行设置
    Name: policy的名称
    Pattern: queue的匹配模式(正则表达式)
    Definition:镜像定义,包括三个部分ha-mode, ha-params, ha-sync-mode
        ha-mode:指明镜像队列的模式,有效值为 all/exactly/nodes
            all:表示在集群中所有的节点上进行镜像
            exactly:表示在指定个数的节点上进行镜像,节点的个数由ha-params指定
            nodes:表示在指定的节点上进行镜像,节点名称通过ha-params指定
        ha-params:ha-mode模式需要用到的参数
        ha-sync-mode:进行队列中消息的同步方式,有效值为automatic和manual
    priority:可选参数,policy的优先级

将所有队列设置为镜像队列,即队列会被复制到各个节点,各个节点状态保持一直。完成这 6 个步骤后,RabbitMQ 高可用集群搭建完成,最后一个步骤就是搭建均衡器。
**安装并配置负载均衡器HA

注意:如果使用阿里云,可以使用阿里云的内网slb来实现负载均衡,不用自己搭建HA。

1、在192.168.101.11安装HAProxy

    yum -y install HAProxy

2、修改 /etc/haproxy/haproxy.cfg

    vim /etc/haproxy/haproxy.cfg
    global 

        log         127.0.0.1 local2

        chroot      /var/lib/haproxy
        pidfile     /var/run/haproxy.pid
        maxconn     4000
        user        haproxy
        group       haproxy
        daemon

        stats socket /var/lib/haproxy/stats

    defaults 
           log        global 
           mode       tcp 
           option     tcplog 
           option     dontlognull 
           retries    3 
           option redispatch 
           maxconn 2000 
           contimeout      5s 
           clitimeout      120s 
           srvtimeout      120s 

    listen rabbitmq_cluster 192.168.101.11:5670
           mode      tcp 
           balance roundrobin 
           server rabbit1  192.168.101.11:5672 check inter 5000 rise 2 fall 2 
           server rabbit2  192.168.101.12:5672 check inter 5000 rise 2 fall 2        

3、重启HAProxy

    service haproxy restart

登录浏览器输入地址http://192.168.101.11:8100/rabbitmqstats查看HAProxy的状态

三、常见问题

常见错误:

1、使用 rabbitmq-server -detached命令启动rabbitmq时,出现以下提示Warning: PID file not written; -detached was passed,此时使用rabbitmqctl status提示服务已启动,可知此问题不用解决。

2、由于更改hostname文件,在每次rabbitmqctl stop或者rabbitmqctl cluster_status等,只要是rabbitmq的命令就报错,提示大概如下

    Cluster status of node rabbit@web2 ...
    Error: unable to connect to node rabbit@web2: nodedown

    DIAGNOSTICS
    ===========

    attempted to contact: [rabbit@web2]

    rabbit@web2:
    ? * connected to epmd (port 4369) on web2
    ? * epmd reports node ‘rabbit‘ running on port 25672
    ? * TCP connection succeeded but Erlang distribution failed

    ? * Hostname mismatch: node "rabbit@mq2" believes its host is different. Please ensure that hostnames resolve the same way locally and on "rabbit@mq2"

    current node details:
    - node name: ‘rabbitmq-cli-11@web2‘
    - home dir: /root
    - cookie hash: SGwxMdJ3PjEXG1asIEFpBg==

此时先ps aux | grep mq,然后kill -9 该进程,然后再rabbitmq-server -detached即可解决。(即先强杀,再重新启动)

3、使用rabbitmqctl stop,rabbitmq-server -detached重新启动后,原先添加的用户admin、虚拟主机coresystem等均丢失,还需要重新添加。

采用脚本启动,在脚本中写好启动好需要加载的各配置项(创建admin用户并授权,创建虚拟主机并授权,配置镜像队列)。

4、命令

    rabbitmqctl stop_app        #仅关闭应用,不关闭节点
    rabbitmqctl start_app       #开启应用
    rabbitmq--server -detached  #启动节点和应用
    rabbitmqctl stop            #关闭节点和应用

4、常用命令:

Rabbitmq服务器的主要通过rabbitmqctl和rabbimq-plugins两个工具来管理,以下是一些常用功能。

1、 服务器启动与关闭

     启动: rabbitmq-server –detached
      关闭: rabbitmqctl stop
      若单机有多个实例,则在rabbitmqctl后加 –n 指定名称

2、插件管理

      开启某个插件:rabbitmq-plugins enable  xxx
      关闭某个插件:rabbitmq-plugins disable xxx
      注意:重启服务器后生效。

3、virtual_host管理

      新建virtual_host:rabbitmqctl add_vhost  xxx
      撤销virtual_host:rabbitmqctl  delete_vhost xxx 

4、用户管理

      新建用户:rabbitmqctl add_user xxxpwd
      删除用户: rabbitmqctl delete_user xxx
      查看用户:rabbitmqctl list_users
      改密码: rabbimqctl change_password {username} {newpassword}
      设置用户角色:rabbitmqctlset_user_tags {username} {tag ...}
              Tag可以为 administrator,monitoring, management       

5、 权限管理

      权限设置:set_permissions [-pvhostpath] {user} {conf} {write} {read}
      Vhostpath: Vhost路径
      user: 用户名
      Conf: 一个正则表达式match哪些配置资源能够被该用户访问。
      Write: 一个正则表达式match哪些配置资源能够被该用户读。
      Read: 一个正则表达式match哪些配置资源能够被该用户访问。

6、获取服务器状态信息

     服务器状态:rabbitmqctl status     ##其中可查看rabbitmq的版本信息

7、获取集群状态信息

rabbitmqctl cluster_status

以上是关于RabbitMQ 消息中间件的主要内容,如果未能解决你的问题,请参考以下文章

结合生活案例实现rabbitmq消息通信

RabbitMQ消息中间件

消息中间件RabbitMQ

消息中间件-----RabbitMq消息可靠传递

揭开消息中间件RabbitMQ的神秘面纱

十次方项目第五天(消息中间件RabbitMQ)