rabbitMQ
Posted amyleell
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了rabbitMQ相关的知识,希望对你有一定的参考价值。
rabbitMQ介绍及基本使用
官方文档看这里:
http://www.rabbitmq.com/getstarted.html
一、队列的介绍
1、什么是rabbitMQ?
RabbitMQ是一个在AMQP基础上完整的,可复用的企业消息系统。他遵循Mozilla Public License开源协议。
MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法。应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们。消 息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术。排队指的是应用程序通过 队列来通信。队列的使用除去了接收和发送应用程序同时执行的要求。
本质:socket之间的通信。
2、其他的消息队列?
queue:内存中的,一般用于测试,将列表放入内存中。
redis列表:非专业队列,可以做队列。
rabbitMQ:专业队列,消息队列。可以做持久化,可以保证数据的安全。
zeroMQ:基于内存的队列,比rabbitMQ更快,因为是 将数据放到内存。
3、为什么要有消息队列?
1 处理生产者消费者不对等的关系。
可以根据生产者的多少(用户请求的多少),消费者动态的增加或减少。
2 进行数据通信:
- restfull api 传送的通过http协议发送的json格式数据
- webservice传送的通过http协议发送的xml格式数据(在rest api诞生之前做的web网站之间的进行数据交互是通过webservice做的,c#,java居多)
- rpc(远程服务调用),基于socket并使用自己封装的协议进行数据传输,做数据交互。
‘处理者’在消息队列的一端等候任务到来,一旦有人发送请求就会被立即接收,并做处理。在消息队列旁再临时创建一个队列,发送请求之后就在这个队列一端进行等候,并通知处理者将消息处理完之后就放到这个队列中。任务处理完之后处理者将结果放到队列中,等发送者拿到消息之后就删除临时创建的队列。一般在公司内部做交互,通过rpc来进行通信,各个公司使用的协议不一定相同。升级版:给待处理信息写一个函数名,并传参数,让服务端找到这个函数,通过反射执行,获取结果,然后将结果放到队列中。
二、rabbit MQ的安装
服务端安装:
安装配置epel源 $ rpm -ivh http://dl.fedoraproject.org/pub/epel/6/i386/epel-release-6-8.noarch.rpm 安装erlang $ yum -y install erlang 安装RabbitMQ $ yum -y install rabbitmq-server
运行:
方式1:rabbitmq-server (hang住) ctrl+c停止
方式2:systemctl start rabbitmq-server(在后台进程运行)
默认无密码,如果有密码:
1 sudo rabbitmqctl add_user wupeiqi 123
# 设置用户为administrator角色
2 sudo rabbitmqctl set_user_tags peiqi administrator
# 设置权限
3 sudo rabbitmqctl set_permissions -p "/" root ".*" ".*" ".*"
#重启:
4 systemctl restart rabbitmq-server
客户端:
pip3 install pika
三、使用
基于Queue实现生产者消费者模型
import Queue import threading message = Queue.Queue(10) def producer(i): while True: message.put(i) def consumer(i): while True: msg = message.get() for i in range(12): t = threading.Thread(target=producer, args=(i,)) t.start() for i in range(10): t = threading.Thread(target=consumer, args=(i,)) t.start()
对于RabbitMQ来说,生产和消费不再针对内存里的一个Queue对象,而是某台服务器上的RabbitMQ Server实现的消息队列。
import pika credentials = pika.PlainCredentials("root","123") connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘192.168.13.92‘,credentials=credentials)) channel = connection.channel() # 创建一个队列:s91 channel.queue_declare(queue=‘s91‘) # 向队列s91中发送一个 Hello World! channel.basic_publish(exchange=‘‘,routing_key=‘s91‘,body=‘66‘) connection.close()
import pika credentials = pika.PlainCredentials("root","123") connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘192.168.13.92‘,credentials=credentials)) channel = connection.channel() channel.queue_declare(queue=‘s91‘) def callback(ch, method, properties, body): print(body) channel.basic_consume(callback,queue=‘s91‘,no_ack=True) channel.start_consuming()
################生产者######### import pika #无密码 connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘localhost‘)) channel = connection.channel() #创建队列 channel.queue_declare(queue=‘hello‘) channel.basic_publish(exchange=‘‘, routing_key=‘hello‘, body=‘Hello World!‘) print(" [x] Sent ‘Hello World!‘") connection.close() ###############消费者################# #!/usr/bin/env python import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘localhost‘)) channel = connection.channel() channel.queue_declare(queue=‘hello‘) #进行回调 def callback(ch, method, properties, body): print(" [x] Received %r" % body) channel.basic_consume(callback, queue=‘hello‘, no_ack=True) print(‘ [*] Waiting for messages. To exit press CTRL+C‘) channel.start_consuming()
1、acknowledgment 消息不丢失
no-ack = False,如果消费者遇到情况(its channel is closed, connection is closed, or TCP connection is lost)挂掉了,那么,RabbitMQ会重新将该任务添加到队列中。
import pika credentials = pika.PlainCredentials("root","123") connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘192.168.13.92‘,credentials=credentials)) channel = connection.channel() # 创建一个队列:s91 channel.queue_declare(queue=‘s91‘) # 向队列s91中发送一个 Hello World! channel.basic_publish(exchange=‘‘,routing_key=‘s91‘,body=‘66‘) connection.close()
import pika credentials = pika.PlainCredentials("root","123") connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘192.168.13.92‘,credentials=credentials)) channel = connection.channel() # channel.queue_declare(queue=‘s91‘) def callback(ch, method, properties, body): print(body) ## 进行确认 ch.basic_ack(delivery_tag=method.delivery_tag) channel.basic_consume(callback,queue=‘s91‘,no_ack=False) channel.start_consuming()
2、durable 消息不丢失(服务端持久化)
就算服务端或客户端挂掉,也没有关系,RabbitMQ会重新将该任务添加到队列中。
import pika credentials = pika.PlainCredentials("root","123") connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘192.168.13.92‘,credentials=credentials)) channel = connection.channel() # make message persistent channel.queue_declare(queue=‘s92‘, durable=True) channel.basic_publish(exchange=‘‘, routing_key=‘s92‘, body=‘Hello World!‘, properties=pika.BasicProperties( delivery_mode=2, # make message persistent )) connection.close()
import pika credentials = pika.PlainCredentials("root","123") connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘192.168.13.92‘,credentials=credentials)) channel = connection.channel() # make message persistent channel.queue_declare(queue=‘s92‘, durable=True) def callback(ch, method, properties, body): print(" [x] Received %r" % body) ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_consume(callback,queue=‘s92‘,no_ack=False) channel.start_consuming()
3、消息获取顺序
默认消息队列里的数据是按照顺序被消费者拿走,例如:消费者1 去队列中获取 奇数 序列的任务,消费者1去队列中获取 偶数 序列的任务。
channel.basic_qos(prefetch_count=1) 表示谁来谁取,不再按照奇偶数排列
import pika credentials = pika.PlainCredentials("root","123") connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘192.168.13.92‘,credentials=credentials)) channel = connection.channel() # make message persistent channel.queue_declare(queue=‘s92‘, durable=True) channel.basic_publish(exchange=‘‘, routing_key=‘s92‘, body=‘Hello World!‘, properties=pika.BasicProperties( delivery_mode=2, # make message persistent )) connection.close()
import pika credentials = pika.PlainCredentials("root","123") connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘192.168.13.92‘,credentials=credentials)) channel = connection.channel() # make message persistent channel.queue_declare(queue=‘s92‘, durable=True) def callback(ch, method, properties, body): print(" [x] Received %r" % body) ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume(callback,queue=‘s92‘,no_ack=False) channel.start_consuming()
4、发布订阅
发布订阅和简单的消息队列区别在于,发布订阅会将消息发送给所有的订阅者,而消息队列中的数据被消费一次便消失。所以,RabbitMQ实现发布和订阅时,会为每一个订阅者创建一个队列,而发布者发布消息时,会将消息放置在所有相关队列中。
可有多个订阅者
exchange type = fanout
import pika credentials = pika.PlainCredentials("root","123") connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘192.168.13.92‘,credentials=credentials)) channel = connection.channel() channel.exchange_declare(exchange=‘e1‘,exchange_type=‘fanout‘) message = "Hello World!" channel.basic_publish(exchange=‘e1‘,routing_key=‘‘,body=message) connection.close()
import pika credentials = pika.PlainCredentials("root","123") connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘192.168.13.92‘,credentials=credentials)) channel = connection.channel() channel.exchange_declare(exchange=‘e1‘,exchange_type=‘fanout‘) # 随机生成对列 名 result = channel.queue_declare(exclusive=True) queue_name = result.method.queue # 让队列和e1绑定 channel.queue_bind(exchange=‘e1‘,queue=queue_name) def callback(ch, method, properties, body): print(" [x] %r" % body) channel.basic_consume(callback,queue=queue_name,no_ack=True) channel.start_consuming()
5、关键字发送
exchange type = direct
之前事例,发送消息时明确指定某个队列并向其中发送消息,RabbitMQ还支持根据关键字发送,即:队列绑定关键字,发送者将数据根据关键字发送到消息exchange,exchange根据 关键字 判定应该将数据发送至指定队列。
import pika credentials = pika.PlainCredentials("root","123") connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘192.168.13.92‘,credentials=credentials)) channel = connection.channel() channel.exchange_declare(exchange=‘e2‘,exchange_type=‘direct‘) message = "Hello World!" channel.basic_publish(exchange=‘e2‘,routing_key=‘error‘,body=message) connection.close()
import pika credentials = pika.PlainCredentials("root","123") connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘192.168.13.92‘,credentials=credentials)) channel = connection.channel() channel.exchange_declare(exchange=‘e2‘,exchange_type=‘direct‘) # 随机生成对列名 result = channel.queue_declare(exclusive=True) queue_name = result.method.queue # 让队列和e1绑定 channel.queue_bind(exchange=‘e2‘,queue=queue_name,routing_key=‘info‘) channel.queue_bind(exchange=‘e2‘,queue=queue_name,routing_key=‘error‘) def callback(ch, method, properties, body): print(" [x] %r" % body) channel.basic_consume(callback,queue=queue_name,no_ack=True) channel.start_consuming()
import pika credentials = pika.PlainCredentials("root","123") connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘192.168.13.92‘,credentials=credentials)) channel = connection.channel() channel.exchange_declare(exchange=‘e2‘,exchange_type=‘direct‘) # 随机生成对列名 result = channel.queue_declare(exclusive=True) queue_name = result.method.queue # 让队列和e1绑定 channel.queue_bind(exchange=‘e2‘,queue=queue_name,routing_key=‘error‘) def callback(ch, method, properties, body): print(" [x] %r" % body) channel.basic_consume(callback,queue=queue_name,no_ack=True) channel.start_consuming()
6、模糊匹配
exchange type = topic
在topic类型下,可以让队列绑定几个模糊的关键字,之后发送者将数据发送到exchange,exchange将传入”路由值“和 ”关键字“进行匹配,匹配成功,则将数据发送到指定队列。
- # 表示可以匹配 0 个 或 多个 单词
- * 表示只能匹配 一个 单词
1
2
3
|
发送者路由值 队列中 old.boy.python old. * - - 不匹配 old.boy.python old. # -- 匹配 |
import pika credentials = pika.PlainCredentials("root","123") connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘192.168.13.92‘,credentials=credentials)) channel = connection.channel() channel.exchange_declare(exchange=‘e3‘,exchange_type=‘topic‘) message = "Hello World!" channel.basic_publish(exchange=‘e3‘,routing_key=‘info.xx.uu‘,body=message) connection.close()
import pika credentials = pika.PlainCredentials("root","123") connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘192.168.13.92‘,credentials=credentials)) channel = connection.channel() channel.exchange_declare(exchange=‘e3‘,exchange_type=‘topic‘) # 随机生成对列名 result = channel.queue_declare(exclusive=True) queue_name = result.method.queue # 让队列和e1绑定 channel.queue_bind(exchange=‘e3‘,queue=queue_name,routing_key=‘info.*‘) def callback(ch, method, properties, body): print(" [x] %r" % body) channel.basic_consume(callback,queue=queue_name,no_ack=True) channel.start_consuming()
注意:
sudo rabbitmqctl add_user wupeiqi 123 # 设置用户为administrator角色 sudo rabbitmqctl set_user_tags wupeiqi administrator # 设置权限 sudo rabbitmqctl set_permissions -p "/" root ".*" ".*" ".*" # 然后重启rabbiMQ服务 sudo /etc/init.d/rabbitmq-server restart # 然后可以使用刚才的用户远程连接rabbitmq server了。 ------------------------------ credentials = pika.PlainCredentials("wupeiqi","123") connection = pika.BlockingConnection(pika.ConnectionParameters(‘192.168.14.47‘,credentials=credentials)) 复制代码
#!/usr/bin/env python # -*- coding:utf-8 -*- import pika from pika.adapters.blocking_connection import BlockingChannel credentials = pika.PlainCredentials("root", "123") conn = pika.BlockingConnection(pika.ConnectionParameters(host=‘10.211.55.20‘, credentials=credentials)) # 超时时间 conn.add_timeout(5, lambda: channel.stop_consuming()) channel = conn.channel() channel.queue_declare(queue=‘hello‘) def callback(ch, method, properties, body): print(" [x] Received %r" % body) channel.stop_consuming() channel.basic_consume(callback, queue=‘hello‘, no_ack=True) print(‘ [*] Waiting for messages. To exit press CTRL+C‘) channel.start_consuming()
终极归纳:
使用:
a. 普通消息队列
b. 批量向多个队列中发送(订阅者发布者)
c. 根据关键字匹配向队列中发送
d. 模糊匹配向队列中发送
1. exchange的作用?
- exchange和队列进行绑定
- 用户向队列发送数据时,无序再找队列,直接向exchange中发送即可。
2. rabbitmq中有几种exchange?
- fanout,只要绑定就发
- dirct,确定关键字
- topic,模糊匹配
3. 消息持久化和ack
- 服务端(durable)
- 客户端(ack)
4. 超时
5. 消息顺序
6. 其他消息队列?
- queue
- redis列表
- kafka
- zeromq
7. 什么时候用过消息队列?
- 防止消息堆积(消息提醒)
- 订单处理(celery+消息队列)
以上是关于rabbitMQ的主要内容,如果未能解决你的问题,请参考以下文章
带着新人学springboot的应用07(springboot+RabbitMQ 下)