Rabbitmq简单使用
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Rabbitmq简单使用相关的知识,希望对你有一定的参考价值。
Rabblimq? AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。
? 消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。 AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。 RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、php、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。
安装
环境
CentOS Linux release 7.4.1708 (Core)
主机名:lek01
IP地址:172.16.50.104
Rabbitmq version: 3.7.3
配置erlang源
/etc/yum.repos.d/erlang.repo
[erlang]
gpgcheck=0
humanname=erlang Repository
baseurl=http://packages.erlang-solutions.com/rpm/centos/$releasever/$basearch
name=erlang Repository
安装
yum install https://dl.bintray.com/rabbitmq/all/rabbitmq-server/3.7.5/rabbitmq-server-3.7.5-1.el7.noarch.rpm -y
配置Rabbitmq(可选)
添加/etc/rabbitmq/rabbitmq-env.conf
[email protected]
启动
systemctl enable rabbitmq-server && systemctl start rabbitmq-server
rabbitmq-plugins enable rabbitmq_management #启动 web ui 插件
命令参考
语法:rabbitmqctl -n <node> -l <command> [<command options>]
-n node
-q quiet
-t timeout
-l longnames
默认 node "[email protected]",server为主机名称
虚拟主机默认是 ‘’‘/’
Commands:
# 添加用户
add_user <username> <password>
# 添加虚拟主机
add_vhost <vhost>
# 验证用户密码
authenticate_user <username> <password>
# 停止同步
cancel_sync_queue [-p <vhost>] queue
# 更改群集节点的类型
change_cluster_node_type <disc|ram>
# 更改用户密码
change_password <username> <password>
# 清除参数值
clear_global_parameter <key>
# 清除运算策略,与clear_policy相同
clear_operator_policy [-p <vhost>] <key>
# 清除参数
clear_parameter [-p <vhost>] <component_name> <key>
# 清除密码
clear_password <username>
# 清除用户某个虚拟主机权限
clear_permissions [-p vhost] <username>
# 清除虚拟主机策略
clear_policy [-p <vhost>] <key>
clear_topic_permissions [-p vhost] <username> [<exchange>]
clear_vhost_limits [-p <vhost>]
close_all_connections [-p <vhost> --limit <limit>] [-n <node> --global] [--per-connection-delay <delay>] <explanation>
close_connection <connectionpid> <explanation>
# 显示集群所有节点状态
cluster_status
decode value passphrase [--cipher cipher] [--hash hash] [--iterations iterations]
# 删除用户
delete_user <username>
# 删除虚拟主机
delete_vhost <vhost>
encode value passphrase [--cipher cipher] [--hash hash] [--iterations iterations]
environment
eval <expr>
exec <expr> [--offline]
force_boot
force_reset
forget_cluster_node [--offline] <existing_cluster_member_node>
help <command>
hipe_compile <directory>
join_cluster [--disc|--ram] <existing_cluster_member_node>
# 列出虚拟主机binding信息
list_bindings [-p <vhost>] [<bindinginfoitem> ...]
list_channels [<channelinfoitem> ...]
list_ciphers
list_connections [<connectioninfoitem> ...]
list_consumers [-p vhost] [<consumerinfoitem> ...]
list_exchanges [-p <vhost>] [<exchangeinfoitem> ...]
list_global_parameters
list_hashes
list_operator_policies [-p <vhost>]
list_parameters [-p <vhost>]
list_permissions [-p <vhost>]
list_policies [-p <vhost>]
list_queues [-p <vhost>] [--online] [--offline] [--local] [<queueinfoitem> ...]
list_topic_permissions [-p <vhost>]
list_unresponsive_queues [--local] [--queue-timeout <queue-timeout>] [<unresponsiveq_ueueinfoitem> ...]
# 列出用户虚拟主机权限
list_user_permissions <username>
list_user_topic_permissions <username>
# 列出所有用户
list_users
list_vhost_limits [-p <vhost>] [--global]
# 列出所有虚拟主机
list_vhosts [<vhostinfoitem> ...]
# 验证节点状态
node_health_check
# 清除队列信息
purge_queue [-p vhost] queue
rename_cluster_node <oldnode1> <newnode1> [oldnode2] [newnode2] ...
# 打印服务器运行报告
report
# 清空节点所有信息,包含用户,虚拟主机等
reset
# 清空虚拟主机下所有信息
restart_vhost [-p <vhost>]
# 日志切割
rotate_logs
# 设置集群名称
set_cluster_name <name>
set_disk_free_limit <disk_limit>
set_disk_free_limit mem_relative <fraction>
# 设置全局运行时参数
set_global_parameter <name> <value>
set_operator_policy [-p <vhost>] [--priority <priority>] [--apply-to <apply-to>] <name> <pattern> <definition>
set_parameter [-p <vhost>] <component_name> <name> <value>
# 设置用户虚拟主机权限
set_permissions [-p <vhost>] <username> <conf> <write> <read>
set_policy [-p <vhost>] [--priority <priority>] [--apply-to <apply-to>] <name> <pattern> <definition>
set_topic_permissions [-p <vhost>] <username> <exchange> <write_pattern> <read_pattern>
# 设置用户标签
set_user_tags <username> <tag> [...]
set_vhost_limits [-p <vhost>] <definition>
set_vm_memory_high_watermark <fraction>
set_vm_memory_high_watermark absolute <value>
shutdown
start_app
status
stop [<pidfile>]
stop_app
sync_queue [-p <vhost>] queue
trace_off [-p <vhost>]
trace_on [-p <vhost>]
update_cluster_nodes <existing_cluster_member_node_to_seed_from>
wait [<pid_file>] [--pid|-P <pid>]
python实列
准备
# 创建虚拟主机
rabbitmqctl add_vhost bbs
# 创建用户
rabbitmqctl add_user bbs ad1211
# 添加标签
rabbitmqctl set_user_tags bbs management
# 设置权限
rabbitmqctl set_permissions -p bbs bbs ".*" ".*" ".*"
# 安装 python 扩展
pip install pika
消费者
# -*- coding: utf-8 -*-
__author__ = ‘fan‘
import pika
rabbitmq_host = ‘172.16.50.104‘
rabbitmq_user = ‘bbs‘
rabbitmq_pwd = ‘ad1211‘
rabbitmq_vhost = ‘bbs‘
rabbitmq_queue = ‘info‘
rabbitmq_exchange = ‘bbs‘
rabbitmq_conn = pika.BlockingConnection(
pika.ConnectionParameters(
rabbitmq_host,
credentials=pika.PlainCredentials(rabbitmq_user, rabbitmq_pwd),
virtual_host=rabbitmq_vhost
)
).channel()
def callback(ch, method, properties, body):
print(" [x] %r:%r" % (method.routing_key, body))
#消费者
rabbitmq_conn.basic_consume(callback,
queue=rabbitmq_queue,
no_ack=True)
rabbitmq_conn.start_consuming()
生产者
# -*- coding: utf-8 -*-
__author__ = ‘fan‘
import pika
import sys
body = sys.argv[1]
rabbitmq_host = ‘172.16.50.104‘
rabbitmq_user = ‘bbs‘
rabbitmq_pwd = ‘ad1211‘
rabbitmq_vhost = ‘bbs‘
rabbitmq_exchange = ‘bbs‘
rabbitmq_exchange_type = ‘direct‘
rabbitmq_quene = ‘info‘
rabbitmq_routing_key = ‘info‘
rabbitmq_conn = pika.BlockingConnection(
pika.ConnectionParameters(
rabbitmq_host,
credentials=pika.PlainCredentials(rabbitmq_user, rabbitmq_pwd),
virtual_host=rabbitmq_vhost
)
).channel()
# 创建 exchange
rabbitmq_conn.exchange_declare(exchange=rabbitmq_exchange,exchange_type=rabbitmq_exchange_type,durable=True)
# 创建 queue
rabbitmq_conn.queue_declare(queue=rabbitmq_quene,durable=True)
# queue 与 exchange bind
rabbitmq_conn.queue_bind(queue=rabbitmq_quene,exchange=rabbitmq_exchange,routing_key=rabbitmq_routing_key)
rabbitmq_conn.basic_publish(exchange=rabbitmq_exchange,
routing_key=rabbitmq_routing_key,
body=body
)
运行
先运行生产者 , 在运行消费者
# python production.py "hellow word!"
# python consumption.py
[x] ‘info‘:‘hello word‘
以上是关于Rabbitmq简单使用的主要内容,如果未能解决你的问题,请参考以下文章