初识 RabbitMQ
Posted daydreamed
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了初识 RabbitMQ相关的知识,希望对你有一定的参考价值。
初识 RabbitMQ
1、概述
RabbitMQ 是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。
2、架构
-
Broker
代理人,消息队列的主体,负责管理 消息 的接收、发送和存储。
-
VHost
虚拟主机,发挥 消息队列 内部系统隔离的作用。
-
Exchange
交换机,完成 消息 的特定传递。
常用有以下三种:
-
DirectExchange
直接交换机,点对点传输。
-
FanoutExchange
扇出交换机,广播传输。
-
TopicExchange
主题交换机,可根据 routing key 完成分组传输。
-
-
Queue
队列,存储消息的地方。
-
Binding
绑定关系,交换机 和 交换机、交换机 和 队列 都可完成绑定。
-
Message(routing key)
消息,指定 路由键(routing key)可使 消息 传递到特定的队列。
-
Connect
连接,客户端 和 Broker 的连接是一种长连接,消息 在该连接中的一条 信道(channel)中完成传输。
-
Producer
生产者,生产消息。
-
Consumer
消费者,消费消息。
3、SpringBoot 整合
/**
* 1、创建 Exchange、Queue、Binding
* 1.1、AmqpAdmin
* 2、收发消息
* 2.1、RabbitTemplate
* 2.2、RabbitListener 类、方法
* 2.3、RabbitHandler 方法
* 2.3.1 用于区分 同一队列中的不同消息,不同队列中的不同消息。
* 3、消息确认机制
* 3.1、发送方
* 3.1.1、confirmCallback 确认模式
* producer -> Exchange
* 3.1.2、returnCallback 回退模式
* Exchange -> Queue
* 3.2、接收方
* 3.2.1、ACK机制
* Queue -> Consumer
*/
3.1、依赖
<!-- RabbitMQ -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
3.2、创建交换机、队列、绑定关系
@Test
void createExchange()
/*
DirectExchange(String name, boolean durable, boolean autoDelete, Map<String, Object> arguments)
直接交换机 点对点
*/
DirectExchange directExchange = new DirectExchange("hello-java-exchange", true, false);
amqpAdmin.declareExchange(directExchange);
log.info("DirectExchange 创建成功");
@Test
void createQueue()
/*
Queue(String name, boolean durable, boolean exclusive, boolean autoDelete, @Nullable Map<String, Object> arguments)
队列
*/
Queue queue = new Queue("hello-java-queue", true, false, false);
amqpAdmin.declareQueue(queue);
log.info("Queue 创建成功");
@Test
void createBinding()
/*
Binding(String destination, Binding.DestinationType destinationType, String exchange, String routingKey, @Nullable Map<String, Object> arguments)
绑定关系
*/
Binding binding = new Binding("hello-java-queue", Binding.DestinationType.QUEUE,
"hello-java-exchange", "hello-java", null);
amqpAdmin.declareBinding(binding);
log.info("Binding 创建成功");
3.3、收发消息
3.3.1、发送消息
@Test
void sendMessageString()
/*
void convertAndSend(String exchange, String routingKey, Object object);
*/
rabbitTemplate.convertAndSend("hello-java-exchange", "hello-java", "hello world");
log.info("消息已发送");
@Test
void sendMessageObj()
/*
对象需要实现 Serializable 接口
*/
Data data = new Data();
data.setId("1");
data.setName("小米");
data.setTime(LocalDateTime.now());
rabbitTemplate.convertAndSend("hello-java-exchange", "hello-java", data);
log.info("消息已发送");
@Test
void sendMessageJson()
/*
配置 MessageConverter -> new Jackson2JsonMessageConverter()
*/
Data data = new Data();
data.setId("1");
data.setName("小米");
data.setTime(LocalDateTime.now());
rabbitTemplate.convertAndSend("hello-java-exchange", "hello-java", data);
log.info("消息已发送");
@lombok.Data
static class Data implements Serializable
private String id;
private String name;
private LocalDateTime time;
3.3.2、接收消息
注:启动类需要添加 @EnableRabbit 注解。
/*
Message message, Data content, Channel channel
1、客户端 接收到消息后,Queue 中该消息会自动删除
2、客户端 有序接收消息
*/
@RabbitListener(queues = "hello-java-queue")
void receiveMessage(Message message, Data content, Channel channel)
System.out.println("接收到消息,内容为:" + message + ", 类型为:" + message.getClass());
@Service
@RabbitListener(queues = "hello-java-queue")
class TestService
@RabbitHandler
void manualAck(Message message, Data content, Channel channel) throws IOException
long deliveryTag = message.getMessageProperties().getDeliveryTag();
/*
签收
void basicAck(long deliveryTag, boolean multiple);
deliveryTag 通道自增
multiple 是否为批量模式
拒收
void basicNack(long deliveryTag, boolean multiple, boolean requeue);
requeue 是否重新入队
void channel.basicReject(long deliveryTag, boolean multiple, boolean requeue);
*/
channel.basicAck(deliveryTag, false);
// channel.basicNack(deliveryTag, false, true);
// channel.basicReject(deliveryTag, true);
3.4、配置
3.4.1、配置文件
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
virtual-host: /
publisher-confirm-type: correlated
publisher-returns: true
template:
# 异步回调 returnCallback
mandatory: true
listener:
simple:
# 手动 ack
acknowledge-mode: manual
3.4.2、配置类
package com.hong.changfeng.information.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
@Slf4j
@Configuration
public class RabbitConfig
@Primary
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory)
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(messageConverter());
initRabbitTemplate(rabbitTemplate);
return rabbitTemplate;
/**
* 使用 JSON 序列化机制 进行消息转换
*/
@Bean
public MessageConverter messageConverter()
return new Jackson2JsonMessageConverter();
/**
* 定制 RabbitTemplate
*/
public void initRabbitTemplate(RabbitTemplate rabbitTemplate)
// 设置 消息到达 Broker 的确认回调
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback()
/**
* 消息到达 Broker 时触发
*
* @param correlationData 当前消息的唯一关联数据(id)
* @param ack 消息是否到达 Broker(Exchange)
* @param cause 失败的原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause)
log.debug("RabbitConfig -> initRabbitTemplate " +
" correlationData: " + correlationData + " ack: " + ack + " cause: " + cause);
);
// 设置 消息到达 Queue 的确认回调
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback()
/**
* 消息未到达 Queue 时触发
*
* @param message 消息
* @param replyCode 回复的状态码
* @param replyText 回复的文本内容
* @param exchange 交换机
* @param routingKey 路由键
*/
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey)
log.debug("RabbitConfig -> initRabbitTemplate " +
" message: " + message + " replyCode: " + replyCode + " replyText: " + replyText +
" exchange: " + exchange + " routingKey: " + routingKey);
);
初识RabbitMQ
1.安装
rabbitmq官网:http://www.rabbitmq.com/
下载地址:https://packagecloud.io/rabbitmq
下载rabbitmq-server
安装脚本文件
# curl -s https://packagecloud.io/install/repositories/rabbitmq/rabbitmq-server/script.rpm.sh | bash
安装rabbitmq
# yum install rabbitmq-server -y
安装
报错如下:
Resolving Dependencies --> Running transaction check ---> Package rabbitmq-server.noarch 0:3.7.7-1.el6 will be installed --> Processing Dependency: erlang >= 19.3 for package: rabbitmq-server-3.7.7-1.el6.noarch --> Processing Dependency: socat for package: rabbitmq-server-3.7.7-1.el6.noarch --> Finished Dependency Resolution Error: Package: rabbitmq-server-3.7.7-1.el6.noarch (rabbitmq_rabbitmq-server) Requires: erlang >= 19.3 Error: Package: rabbitmq-server-3.7.7-1.el6.noarch (rabbitmq_rabbitmq-server) Requires: socat You could try using --skip-broken to work around the problem You could try running: rpm -Va --nofiles --nodigest #
意思是要安装rabbitmq-server,必须先安装erlang才行 安装rabbitmq必须先安装Erlang,版本信息可以参照:http://www.rabbitmq.com/which-erlang.html
版本信息如下:
安装Erlang
网址:https://packagecloud.io/rabbitmq/erlang
下载脚本文件
# curl -s https://packagecloud.io/install/repositories/rabbitmq/erlang/script.rpm.sh | bash
安装erlang
# yum install erlang
此路很坑,需要多试几次
Downloading Packages: https://packagecloud.io/rabbitmq/erlang/el/6/x86_64/erlang-21.0.4-1.el6.x86_64.rpm: [Errno 14] problem making ssl connection Trying other mirror. Error Downloading Packages: erlang-21.0.4-1.el6.x86_64: failure: erlang-21.0.4-1.el6.x86_64.rpm from rabbitmq_erlang: [Errno 256] No more mirrors to try. [[email protected] soft]#
安装完erlang后继续安装rabbitmq-server
# yum install rabbitmq-server -y
报错如下:
... Loading mirror speeds from cached hostfile * base: mirrors.tuna.tsinghua.edu.cn * extras: mirrors.163.com * updates: mirrors.tuna.tsinghua.edu.cn No package socat available. Error: Nothing to do
导入阿里云源,然后安装socat,
网址:https://opsx.alibaba.com/mirror
操作方法:
CentOS 1、备份 mv /etc/yum.repos.d/CentOS-Base.repo /etc/yum.repos.d/CentOS-Base.repo.backup 2、下载新的CentOS-Base.repo 到/etc/yum.repos.d/ CentOS 5 wget -O /etc/yum.repos.d/CentOS-Base.repo http://mirrors.aliyun.com/repo/Centos-5.repo 或者 curl -o /etc/yum.repos.d/CentOS-Base.repo http://mirrors.aliyun.com/repo/Centos-5.repo CentOS 6 wget -O /etc/yum.repos.d/CentOS-Base.repo http://mirrors.aliyun.com/repo/Centos-6.repo 或者 curl -o /etc/yum.repos.d/CentOS-Base.repo http://mirrors.aliyun.com/repo/Centos-6.repo CentOS 7 wget -O /etc/yum.repos.d/CentOS-Base.repo http://mirrors.aliyun.com/repo/Centos-7.repo 或者 curl -o /etc/yum.repos.d/CentOS-Base.repo http://mirrors.aliyun.com/repo/Centos-7.repo 3、之后运行yum makecache生成缓存
安装阿里云源
# wget -O /etc/yum.repos.d/CentOS-Base.repo http://mirrors.aliyun.com/repo/Centos-6.repo # yum makecache
安装socat
# yum install socat -y
安装rabbitmq-server
如果不行的话,可以参照文档:https://blog.csdn.net/chenkui199/article/details/76254871
# yum install rabbitmq-server
安装之路一路坑呀。。。
2.rabbitmq相关概念介绍
AMQP:一个提供统一消息服务的应用层标准高级消息队列协议
RabbitMQ相关概念:
生产者创建消息,然后发布到RabbitMQ中,消息分为两个部分:信息体和标签,rabbitmq会根据标签把消息发送给消费者。
名词解释:
Producer:生产者 简称P
Consumer:消费者 简称C
Broker:消息中间件的服务节点(一个节点可以通俗的理解为一台rabbitmq服务器)
Queue:队列
Exchange:交换机
RoutingKey:路由键 生产者将消息发送给交换机时,会指定一个RountingKey,key需要与交换机类型进行绑定键(BindingKey)联合使用才能够最终生效
Binding:绑定
通信过程和消费过程
生产 P1连接到RabbitMQ Broker,建立一个连接,开启一个信道,声明一个交换机,声明一个队列,通过RoutingKey将交换机和队列进行绑定, P1将信息发送至RabbitMQ Broker,包含RoutingKey等信息 交换机根据RoutingKey找到相应的队列,并且将信息发送给队列 关闭信号 关闭连接 消费 C1连接至RabbitMQ Broker上,建立一个连接,开启一个信道 C1向RabbitMQ Broker请求消费的队列中的信息 队列回应并且投递信息 C1接收信息,向队列确认收到信息 RabbitMQ从队列中删除已经确认的信息 关闭信号 关闭连接
3.利用python代码实现发送队列和接受队列
生产者代码
本实例,采用python去编写,需要预先安装pika(pip install pika)即可
如果没有python环境,快速搭建一个python环境就可以了
下载python包: # wget https://www.python.org/ftp/python/3.6.6/Python-3.6.6.tgz 解压: # tar xf Python-3.6.6.tgz 安装必要依赖包 # yum install gcc gcc-c++ zlib zlib-devel openssl openssl-devel -y 源码安装 # cd Python-3.6.6 # ./configure --prefix=/usr/local/python3 # make # make install 安装pika包 # /usr/local/python3/bin/pip3 install pika 如果是想使用系统自带的python 2.6,但是没有安装pip的话,可以参考 网址:https://pypi.org/project/pip/#files 下载pip包: # wget https://files.pythonhosted.org/packages/69/81/52b68d0a4de760a2f1979b0931ba7889202f302072cc7a0d614211bc7579/pip-18.0.tar.gz 解压: # tar xf pip-18.0.tar.gz 安装: # cd pip-18.0 # python setup.py install 如果报setuptools错误,则看下面 网址:https://pypi.org/project/setuptools/#files 下载setuptools包 # https://files.pythonhosted.org/packages/d3/3e/1d74cdcb393b68ab9ee18d78c11ae6df8447099f55fe86ee842f9c5b166c/setuptools-40.0.0.zip 使用unzip解压 如果没有unzip软件的话,就使用yum install unzip -y 安装即可 # unzip setuptools-40.0.0.zip # cd setuptools-40.0.0 # python setup.py install
生产者代码:
查看例子:
http://www.rabbitmq.com/tutorials/tutorial-one-python.html
https://pypi.org/project/pika
https://github.com/pika/pika
部分截图如下:
代码实现:
#!/usr/local/python3/bin/python3 import pika conn = pika.BlockingConnection(pika.ConnectionParameters(‘127.0.0.1‘,credentials=(pika.PlainCredentials(‘guest‘,‘guest‘)))) chan = conn.channel() chan.queue_declare(queue=‘test‘) chan.basic_publish(exchange=‘‘,routing_key=‘test‘,body=‘Hello World‘) conn.close()
其中rabbit默认的用户是guest密码也是guest
执行代码后
# chmod 755 send_sample_rabbitmq_message.py # ./send_sample_rabbitmq_message.py
可以使用rabbitmqctl list_queues看到队列信息
# rabbitmqctl list_queues Timeout: 60.0 seconds ... Listing queues for vhost / ... test 1 #
可以看到,rabitmq生产者信息已经给到队列了
消费者代码:
github例子:
代码实现:
刚已经已经发送了队列,现在用此代码获取下
# chmod 755 get_sample_rabbitmq_message.py [[email protected] python_script]# ./get_sample_rabbitmq_message.py <Basic.Deliver([‘consumer_tag=ctag1.ca8ba5916dd24d8e9e5ab383a1d5a032‘, ‘delivery_tag=1‘, ‘exchange=‘, ‘redelivered=False‘, ‘routing_key=test‘])> <BasicProperties> b‘Hello World‘ #
如果队列中还没消息的话,程序会一直等待,等待rabbitmq将信息发送至队列,然后它再获取
5.RabbitMQ的管理
rabbitmqctl管理rabbitmq
参考文档:http://www.rabbitmq.com/man/rabbitmqctl.8.html
rabbitmqctl — rabbitmq的命令管理行工具 用法: rabbitmqctl [-q] [-l] [-n node] [-t timeout] command [command_options] OPTIONS: -q: --quiet Quiet output mode is selected. Informational messages are suppressed when quiet mode is in effect. 静默的输出模式,提示性信息被隐藏 -t timeout, --timeout timeout 超时命令,仅对list命令有效 -q 参数 例如: 普通模式 # rabbitmqctl list_queues Timeout: 60.0 seconds ... Listing queues for vhost / ... test 2 # 静默模式 # rabbitmqctl -q list_queues test 2 # -t 参数 没有模拟出实际效果出来 command: -l 打印所有有用的命令 例如: # rabbitmqctl -l 即可打印有用的命令执行参数 User Management 用户管理 add_user username password 创建用户名和密码 例如: # rabbitmqctl add_user tonyg 123 Adding user "tonyg" ... list_users 列出用户 例如: # rabbitmqctl list_users Listing users ... guest [administrator] tonyg [] # change_password username newpassword 修改用户密码 例如: # rabbitmqctl change_password tonyg 456 Changing password for user "tonyg" ... # clear_password username 清楚用户名的密码 例如: # rabbitmqctl clear_password tonyg Clearing password for user "tonyg" ... # authenticate_user username password 例如: 待测试 set_user_tags username [tag ...] 设置用户名的标签 针对于Management Plugin,tag标签有5种 标签有五种: none:没有任何权限 management:可以在AMQP上做任何事情,查看自己虚拟主机的队列、交换机等 policymaker:management的都能够做,还能够查看,创建和删除可通过AMQP登录的虚拟主机的策略和参数 monitoring:management的都能够做,还可以列出所有的虚拟主机,不仅仅从AMQP登录的,查看其它用户的链接情况,查看节点数据等 administrator:所有权限 例如: # rabbitmqctl set_user_tags tonyg administrator Setting tags for user "tonyg" to [administrator] ... 查看用户标签 # rabbitmqctl list_users | grep tonyg tonyg [administrator] # delete_user username 删除用户 例如: # rabbitmqctl delete_user tonyg Deleting user "tonyg" ... # Access Control 访问控制 add_vhost vhost 增加vhost 例如: # rabbitmqctl add_vhost test Adding vhost "test" ... # list_vhosts [vhostinfoitem ...] 列出vhost # rabbitmqctl list_vhosts Listing vhosts ... / test # set_permissions [-p vhost] user conf write read 设置用户权限 例如: 配置用户tonyg对vhost test具有全部权限,即可读可写 # rabbitmqctl set_permissions -p test tonyg ".*" ".*" ".*" Setting permissions for user "tonyg" in vhost "test" ... # 配置用户tonyg对vhost / 且资源名称以test开头的资源具有可读可写权限 # rabbitmqctl set_permissions -p / tonyg "test.*" ".*" ".*" Setting permissions for user "tonyg" in vhost "/" ... # list_permissions [-p vhost] 列出虚拟主机允许访问权限的用户以及相应的权限 例如: # rabbitmqctl list_permissions -p / Listing permissions for vhost "/" ... guest .* .* .* tonyg test.* .* .* # list_user_permissions username 列出用户的权限 例如: # rabbitmqctl list_user_permissions tonyg Listing permissions for user "tonyg" ... test .* .* .* / test.* .* .* # set_topic_permissions [-p vhost] user exchange write read 这个参数还没有看明白... 详细信息如下: set_topic_permissions [-p vhost] user exchange write read vhost The name of the virtual host to which to grant the user access, defaulting to “/”. user The name of the user the permissions apply to in the target virtual host. exchange The name of the topic exchange the authorisation check will be applied to. write A regular expression matching the routing key of the published message. read A regular expression matching the routing key of the consumed message. Sets user topic permissions. For example, this command instructs the RabbitMQ broker to let the user named “tonyg” publish and consume messages going through the “amp.topic” exchange of the “/myvhost” virtual host with a routing key starting with “tonyg-”: rabbitmqctl set_topic_permissions -p /myvhost tonyg amq.topic “^tonyg-.*” “^tonyg-.*” Topic permissions support variable expansion for the following variables: username, vhost, and client_id. Note that client_id is expanded only when using MQTT. The previous example could be made more generic by using “^{username}-.*”: rabbitmqctl set_topic_permissions -p /myvhost tonyg amq.topic “^{username}-.*” “^{username}-.*” Server Status 服务状态 list_queues [-p vhost] [--offline | --online | --local] [queueinfoitem ...] 返回队列的状态信息 --offline:仅列出当前不可用的持久队列(节点不可用) --online:列出当前可用的队列 --local:仅列出主进程位于当前节点上的队列 queueinfoitem: name:名称 durable:是否可持久化 auto_delete:是否不再使用队列时自动删除队列 arguments:队列参数 messages_ready:准备传递给客户端的消息数 messages_unacknowledged:传递给客户端但是未被确认的消息数 messages:准备好和未确认消息的总和 consumers:消费者数量 例如: # rabbitmqctl list_queues -p / --online Timeout: 60.0 seconds ... Listing queues for vhost / ... test 2 # 查看vhost / 的队列是否有传递给客户端但是未被确认的消息数 # rabbitmqctl list_queues -p / messages_unacknowledged Timeout: 60.0 seconds ... Listing queues for vhost / ... 0 # 如果此参数不是为0,则需要注意了,队列会越累积越多的,最后可能会导致rabbitmq的崩塌 list_exchanges [ -p vhost ] [ exchangeinfoitem ... ] 返回交换机的信息 exchangeinfoitem: name:返回交换机的名称 type:返回交换机的类型 direct topic headers fanout durable:当交换机重启后是否仍然存在 auto_delete:当交换机不再使用时,是否是自动删除 internal:查看交换机是否是内部的,即不能由客户直接发布 例如: # rabbitmqctl list_exchanges Listing exchanges for vhost / ... amq.direct direct amq.topic topic amq.match headers amq.headers headers amq.fanout fanout amq.rabbitmq.trace topic direct # 查看交换机重启后是否仍然存在 # rabbitmqctl list_exchanges name durable Listing exchanges for vhost / ... amq.direct true amq.topic true amq.match true amq.headers true amq.fanout true amq.rabbitmq.trace true true # list_bindings [ -p vhost ] [ bindinginfoitem ... ] 返回绑定信息 bindinginfoitem: source_name:附加绑定信息源的信息 source_kind:附加绑定信息源的类型 destination_name:附加绑定信息的目标名称 destination_kind:附加绑定的消息的目标类型 routing_key:绑定的路由秘钥 arguments:绑定的参数 例如: # rabbitmqctl list_bindings Listing bindings for vhost /... exchange test queue test [] # list_connections [ connectioninfoitem ... ] 返回TCP/IP连接统计信息 connectioninfoitem: name:连接的名称 port:服务器端口 host:主机名称 例如: 返回连接的信息: # rabbitmqctl list_connections name port host Listing connections ... 127.0.0.1:57258 -> 127.0.0.1:5672 5672 127.0.0.1 # list_channels [ channelinfoitem ... ] 返回有关通道的信息 channelinfoitem: connection:与通道有关所属的连接管理的ID name:频道的可读名称 number:通道的编号 user:关联的用户名 vhost:与通道的运行主机 consumer_count:通过通道检索消息的逻辑AMQP使用者数。 messages_unacknowledged:通过此渠道发送但尚未确认的消息数。 messages_uncommitted:在尚未提交的事务中收到的消息数。 acks_uncommitted:尚未提交的交易中收到的确认数量。 例如: 查看当前的通道信息 # rabbitmqctl list_channels name number user vhost consumer_count messages_unacknowledged Listing channels ... 127.0.0.1:57258 -> 127.0.0.1:5672 (1) 1 guest / 1 0 # list_consumers [ -p vhost ] 列出消费者信息 例如: # rabbitmqctl list_consumers Listing consumers on vhost / ... test <[email protected]3.1592.0> ctag1.e1733293b7f9419583e67260e562f424 true 0 [] # node_health_check rabbitmq节点的运行状况 例如: # rabbitmqctl node_health_check Timeout: 70 seconds ... Checking health of node [email protected] ... Health check passed #
6.RabbitMQ运维
web管理界面
启用插件 # rabbitmq-plugins enable rabbitmq_management 目前还未发现不重启rabbitmq让rabbitmq_managemnet生效的办法 开放端口,重启rabbitmq # iptables -I INPUT -p tcp --dport 15672 -j ACCEPT # iptables -I INPUT -p udp --dport 15672 -j ACCEPT # /etc/init.d/iptables save # /etc/init.d/rabbitmq-server restart 登录网页 http://hostname:15672就可以登录了
guest只能够通过localhost进行访问
创建用户
# rabbitmqctl add_user test test
赋予tag
# rabbitmqctl set_user_tags test administrator
添加用户后就可以使用Test用户登录了,
Queued messages Ready:可以传递的消息数 Unacknowledged:未确认的消息数 Total:总的消息数 Message rates Publish:信息进入服务器的速率 Publisher confirm:服务器确认发布的速率 Return:将速率返回给发布者 Disk read:队列从磁盘读取速度 Disk write:队列从磁盘写入速度 需要注意的是,Unacknowledged一把情况下为0,若不为0,则证明危险了,rabbitmq可能消息会越积越多,导致系统崩掉 Global counts 返回的是总对象数 连接数、信道数、交换机数、队列数、消费者数等 Nodes 节点信息(节点可以粗略的看做是一台服务器) File descriptors:文件描述符(# ulimit -n可以显示当前系统的文件秒速符,也可更改) Socket descriptors:socket文件描述符数量 Erlang processes:Erlang进程的数量 Memory:内存 Disk space:磁盘剩余 Uptime:启动时间
RabbitMQ配置文件
如果不确定配置是否有配置文件,可以查看rabbitmq的log # head -n 15 /var/log/rabbitmq/rabbit@localhost.log ... Starting RabbitMQ 3.7.7 on Erlang 21.0.4 Copyright (C) 2007-2018 Pivotal Software, Inc. Licensed under the MPL. See http://www.rabbitmq.com/ ... node : [email protected] home dir : /var/lib/rabbitmq config file(s) : (none) cookie hash : C1eEyzogKIXagAsaVuyVLw== log(s) : /var/log/rabbitmq/[email protected] : /var/log/rabbitmq/[email protected]_upgrade.log database dir : /var/lib/rabbitmq/mnesia/[email protected] ... #
可以看到conf file(s)这行,值为none,说明还没有配置rabbitmq的配置文件
configure说明:http://www.rabbitmq.com/configure.html
rabbitmq.conf.example文件:https://github.com/rabbitmq/rabbitmq-server/blob/master/docs/rabbitmq.conf.example
rabbitmq.conf 和 rabbitmq-env.conf 位置官网如下图:
默认rabbitmq是没有默认配置文件的,需要创建,可以将github上面的conf文件复制进/etc/rabbitmq/rabbitmq.conf中
配置文件说明:
参考内容:http://www.rabbitmq.com/configure.html#config-items
关于网络部分的配置:
参考内容:http://www.rabbitmq.com/networking.html
默认的话,rabbitmq监听所有IP,端口为5672 isteners.tcp.default = 5672 也可以对IP进行监听 listeners.tcp.local = 127.0.0.1:5672 rabbitmq可以设置多个监听,比如:监听ipv4的192.168.56.209的5672端口和ipv6的5672端口 listeners.tcp.1 = 192.168.56.209:5672 listeners.tcp.local_v6 = ::1:5672 TCP监听Erlang的进程数,默认是10个 num_acceptors.tcp = 10 AMQP timeout时间,单位为毫秒 handshake_timeout = 10000 是否启动DNS反向查找,默认为False reverse_dns_lookups = true
关于安全的配置:
关于TLS配置,参考文档:http://www.rabbitmq.com/ssl.html#enabling-ssl
客户提供的SASL身份验证机制参考文档:https://yq.aliyun.com/articles/41959
身份验证和授权(表示不理解),参考文档:http://www.rabbitmq.com/access-control.html#loopback-users
TLS配置 关于TLS配置,参考文档:http://www.rabbitmq.com/ssl.html#enabling-ssl ssl_options.verify = verify_peer ssl_options.fail_if_no_peer_cert = false ssl_options.cacertfile = /path/to/cacert.pem ssl_options.certfile = /path/to/cert.pem ssl_options.keyfile = /path/to/key.pem 向客户提供的SASL身份验证机制,默认PLAIN、AMQPLAIN auth_mechanisms.1 = PLAIN auth_mechanisms.2 = AMQPLAIN auth_backends.1 = rabbit_auth_backend_ldap 关于默认User / VHost配置 rabbitmq默认的虚拟主机,默认/ default_vhost = / rabbitmq默认的用户名,默认guest default_user = guest rabbitmq默认密码,默认guest default_pass = guest 默认分配的权限,默认".*".".*",".*" default_permissions.configure = .* default_permissions.read = .* default_permissions.write = .*
关于网络协议相关配置:
heartbeat延迟,默认为60秒,若设置为0,则不能遵守服务器协议,若设置为0,可能会在大量连接的情况下提高性能,但可能会导致存在非活动连接的网络设备时连接中断。 heartbeat = 60 最大允许通道数,默认为2047(官网上的写的是2047),若设置为0,则代表不做限制,可能会出现通道泄露,官方建议不设置为0 channel_max = 128
资源限制于流量控制:
内存的阈值,默认为0.4,0.4是相对的值40%,默认为0.4 vm_memory_high_watermark.relative = 0.4 可以使用如下绝对值,2G vm_memory_high_watermark.absolute = 2GB 队列分页磁盘的值,默认0.5 vm_memory_high_watermark_paging_ratio = 0.5 RabbitMQ存储在分页磁盘的可用空间限制,当可用磁盘低于此限制时,将触发流量控制,默认50M disk_free_limit.absolute = 50mb
MISC/Advices配置:
是否启用Erlang即时编译器,默认为flase,若修改为true,可能在启动时延迟几分钟,用来增加服务的吞吐量,如果HiPE没有编译到Erlang中,会显示警告信息,启动正常起来,但是在windows平台上,HiPE不可用 hipe_compile = false 消息字节大小,若低于该大小,消息将直接嵌入到队列索引中,默认值4kb queue_index_embed_msgs_below = 4096
Management配置:
参考文档:http://www.rabbitmq.com/management.html#configuration
配置management端口 management.listener.port = 15672
如上就是配置文件的大概
官方建议的服务器配置:
参考文档:http://www.rabbitmq.com/production-checklist.html
针对于虚拟追、用户、权限 虚拟主机:如果不是搭建集群,且服务于单个系统时,默认虚拟主机 / 足够了 如果是有多用户时,建议使用单独的vhost 用户:删除默认用户guest,为每个应用程序分配独立的用户 资源限制: 内存: 保持默认40%即可,默认情况下,当rabbitmq检测到它使用超过40%的可用内存时,它将不接受任何消息,这是一个安全默认值,修改时要小心一点,因为操作系统和文件系统也要使用内存来加速所有进程操作,如果内存太小,可能会导致rabbitmq进程被Kill掉 官方建议内存的范围为0.4——0.6,不可超过0.7,因为系统必须留有至少30%的内存来处理其他进程。 磁盘空间 默认disk_free_limit是50M,默认适用于开发和教学,生产部署需要更高的磁盘空间,因为空间不足可能会导致节点故障,数据丢失等 官方建议, 1.disk_free_limit设置值是总内存大小,当磁盘空间低于此大小时,所有发布者都将被阻止,并且不会受到任何消息 2.disK_free_limit设置为内存的1.5倍,是一个更安全的生产价值 3.disk_free_limit设置为内存的2倍,是最保守的生产价值
7.利用RabbitMQ提供的API进行监控
rabbitmq提供的api例子:http://127.0.0.1:15672/api/
例如如下api所代表的含义:
这里就介绍nodes 和 overview api
nodes:
查看:
# curl -i -u test:test http://localhost:15672/api/nodes
内容如下:
overview也是类似的
api所监视的工具其实和网页版监视是一样的,只不过rabbitmq提供了一个接口,允许写代码去调用这个接口而已。
可以使用python代码来实现此功能:
#!/usr/local/python3/bin/python3 import requests import json import sys def getjson_nodes(): #定义url url = "http://localhost:15672/api/nodes" #获取内容,并且格式化为str r = requests.get(url=url,auth=(‘test‘,‘test‘)).text #去除头和尾的 [] 符号,因为头和尾[] 不符合json规范 text = r.strip(‘[‘).strip(‘]‘) #返回str return text def getjson_overview(): #和上面类似 url = "http://localhost:15672/api/overview" r = requests.get(url=url,auth=(‘test‘,‘test‘)).text text = r.strip(‘[‘).strip(‘]‘) return text def get_runqueue(jsondata): return jsondata["run_queue"] def get_fd_used(jsondata): fd_total = int(jsondata["fd_total"]) fd_used = int(jsondata["fd_used"]) fd_used_per = round((1.0 * fd_used / fd_total * 100),2) return fd_used_per def get_uptime(jsondata): return jsondata["uptime"] def get_socket_used(jsondata): sockets_total = int(jsondata["sockets_total"]) sockets_used = int(jsondata["sockets_used"]) sockets_used_per = round((1.0 * sockets_used / sockets_total * 100),2) return sockets_used_per def get_rabbitmq_version(jsondata): return jsondata["rabbitmq_version"] def get_erlang_version(jsondata): return jsondata["erlang_version"] def get_rabbitmq_node(jsondata): return jsondata["node"] def get_queue_messages_unacknowledged(jsondata): queue_total = jsondata["queue_totals"] if ‘messages_unacknowledged‘ in queue_total: return queue_total["messages_unacknowledged"] else: return 0 def get_queue_messages_ready(jsondata): queue_total = jsondata["queue_totals"] if ‘messages_ready‘ in queue_total: return queue_total["messages_ready"] else: return 0 def get_message_stats_publish(jsondata): message_stats = jsondata["message_stats"] if ‘publish‘ in message_stats: return message_stats["publish"] else: return 0 def get_message_stats_no_ack(jsondata): message_stats = jsondata["message_stats"] if ‘get_no_ack‘ in message_stats: return message_status["get_no_ack"] else: return 0 #定义main方法 def main(): #定义key的值,是python脚本的第一个参数 key = sys.argv[1] #返回nodes的json文档 jsondata_nodes = json.loads(getjson_nodes()) #返回overview的json文档 jsondata_overview = json.loads(getjson_overview()) #获取队列的运行数 if key == "get_runqueue": print (get_runqueue(jsondata_nodes)) #获取fd的使用率 elif key == "get_fd_used": print (get_fd_used(jsondata_nodes)) #获取运行时间 elif key == "get_uptime": print (get_uptime(jsondata_nodes)) #获取socket的使用率 elif key == "get_socket_used": print (get_socket_used(jsondata_nodes)) #获取rabbitmq的版本 elif key == "get_rabbitmq_version": print (get_rabbitmq_version(jsondata_overview)) #获取erlang的版本 elif key == "get_erlang_version": print (get_erlang_version(jsondata_overview)) #node的内容 elif key == "get_rabbitmq_node": print (get_rabbitmq_node(jsondata_overview)) #获取未确认的message个数 elif key == "get_queue_messages_unacknowledged": print (get_queue_messages_unacknowledged(jsondata_overview)) #获取队列的reday数量 elif key == "get_queue_messages_ready": print (get_queue_messages_unacknowledged(jsondata_overview)) #获取messate状态信息 elif key == "get_message_stats_publish": print (get_message_stats_publish(jsondata_overview)) elif key == "get_message_stats_no_ack": print (get_message_stats_publish(jsondata_overview)) #如果不符合上面的规则,那就打印错误 else: print ("Using:api.py {get_runqueue|get_fd_used|get_uptime|get_socket_used|get_rabbitmq_version|get_erlang_version|get_rabbitmq_node|get_queue_messages_unacknowledged|get_queue_messages_ready|get_message_stats_publish|get_message_stats_no_ack}") if __name__ == "__main__" : main()
可以让python脚本结合zabbix来实现对rabbitmq的监控,可以参考:https://github.com/jasonmcintosh/rabbitmq-zabbix
终于对rabbitmq有了一个基本的了解了,加油!
以上是关于初识 RabbitMQ的主要内容,如果未能解决你的问题,请参考以下文章
RabbitMQRabbitMQ和Erlang下载与安装步骤—2023超详细最新版