Rabbitmq 消息队列
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Rabbitmq 消息队列相关的知识,希望对你有一定的参考价值。
RabbitMQ 消息队列介绍RabbitMQ是一种消息队列,与线程queue和进程QUEUE作用是一样的。
RabbitMQ是一个中间程序,可以实现不同进程之间的通信(比如python和Java之间,QQ和Word之间等);
普通情况下A进程与B进程之间通信,两者之间需要建立很多连接和单独写一些代码,但是使用RabbitMQ的话就可以实现帮助不同进程之间的数据通信。
A进程交给RabbitMQ,RabbitMQ在交给B,同样B交给RabbitMQ,RabbitMQ在交给A,RabbitMQ可以实现A与B进程之间的连接和信息转换。
使用RabbitMQ可以实现很多个独立进程之间的交互,所有其他独立进程都可以用RabbitMQ作为中间程序。
py 消息队列:
线程 queue(同一进程下线程之间进行交互)
进程 Queue(父子进程进行交互 或者 同属于同一进程下的多个子进程进行交互)
如果是两个完全独立的python程序,也是不能用上面两个queue进行交互的,或者和其他语言交互有哪些实现方式呢。
【Disk、Socket、其他中间件】这里中间件不仅可以支持两个程序之间交互,可以支持多个程序,可以维护好多个程序的队列。
虽然可以通过硬盘的方式实现多个独立进程交互,但是硬盘速度比较慢,而RabbitMQ则能够很好的、快速的帮助两个独立进程实现交互。
像这种公共的中间件有好多成熟的产品:
RabbitMQ
ZeroMQ
ActiveMQ
……
RabbitMQ:erlang语言 开发的。
Python中连接RabbitMQ的模块:pika 、Celery(分布式任务队列) 、haigha
可以维护很多的队列
其中pika是RabbitMQ常用的模块
RabbitMQ 教程官网:http://www.rabbitmq.com/getstarted.html
几个概念说明:
Broker:简单来说就是消息队列服务器实体。
Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。
Queue:消息队列载体,每个消息都会被投入到一个或多个队列。
Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来。
Routing Key:路由关键字,exchange根据这个关键字进行消息投递。
vhost:虚拟主机,一个broker里可以开设多个vhost,用作不同用户的权限分离。
producer:消息生产者,就是投递消息的程序。
consumer:消息消费者,就是接受消息的程序。
channel:消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务
![image_1cfq381mj7341i1b4lnitk1h3t9.png-178.4kB][1]
RabbitMQ不像之前学的python Queue都在一个队列里实现交互,RabbitMQ有多个队列(图中红色部分代表队列),每个队列都可以将消息发给多个接收端(C是接收端,P是生产消息端)
RabbitMQ基本示例.
1、Rabbitmq 安装
Windos系统
pip install pika
ubuntu系统
install rabbitmq-server # 直接搞定
以下centos系统
1)Install Erlang
# For EL5:
rpm -Uvh http://download.fedoraproject.org/pub/epel/5/i386/epel-release-5-4.noarch.rpm
# For EL6:
rpm -Uvh http://download.fedoraproject.org/pub/epel/6/i386/epel-release-6-8.noarch.rpm
# For EL7:
rpm -Uvh http://download.fedoraproject.org/pub/epel/7/x86_64/e/epel-release-7-8.noarch.rpm
yum install erlang
2)Install RabbitMQ Server
rpm --import https://www.rabbitmq.com/rabbitmq-release-signing-key.asc
yum install rabbitmq-server-3.6.5-1.noarch.rpm
3)use RabbitMQ Server
chkconfig rabbitmq-server on
service rabbitmq-server stop/start
或者
rabbitmq-server start
rabbitmq已经开启,等待传输
2、基本示例
发送端 producer
import pika
# 建立一个实例;相当于建立一个socket。
#通过ctrl+ConnectionParameters可以看到能传很多参数,如果远程还可以传用户名密码。
connection = pika.BlockingConnection(
pika.ConnectionParameters(‘localhost‘,5672) # 默认端口5672,可不写
)
# 声明一个管道,在管道里发消息
channel = connection.channel()
# 在管道里声明queue
channel.queue_declare(queue=‘hello‘)
# RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange.
channel.basic_publish(exchange=‘‘,
routing_key=‘hello‘, # queue名字,将消息发给hello这个queue
body=‘Hello World!‘) # 消息内容
print(" [x] Sent ‘Hello World!‘")
connection.close() # 发完消息后关闭队列
执行结果:
[x] Sent ‘Hello World!‘
注意一定要开启rabbitmq,否则会报错
接收端 consumer
import pika
import time
# 建立实例
connection = pika.BlockingConnection(pika.ConnectionParameters(
‘localhost‘))
# 声明管道
channel = connection.channel()
# 为什么又声明了一个‘hello’队列?
# 如果这个queue确定已经声明了,可以不声明。但是你不知道是生产者还是消费者先运行,所以要声明两次。如果消费者没声明,且消费者先运行的话,就会报错。
# 生产者先声明queue,消费者不声明,但是消费者后运行就不会报错。
channel.queue_declare(queue=‘hello‘)
def callback(ch, method, properties, body): # 四个参数为标准格式
print(ch, method, properties) # 打印看一下是什么
# ch是管道内存对象地址;method是内容相关信息 properties后面讲 body消息内容
print(" [x] Received %r" % body)
#time.sleep(15)
#ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basic_consume( # 消费消息
‘hello‘, # 你要从哪个队列里收消息
callback, # 如果收到消息,就调用callback函数来处理消息 # 注意调用的函数以前模块是放在形参第一个位置的,后面修改到第二个位置了,如果放错位置会报错
# no_ack=True # 写的话,如果接收消息,机器宕机消息就丢了
# 一般不写。宕机则生产者检测到发给其他消费者
)
print(‘ [*] Waiting for messages. To exit press CTRL+C‘)
channel.start_consuming() # 开始消费消息(开始接收消息,一直收,如果没消息就卡主在这里了)
执行结果:
[*] Waiting for messages. To exit press CTRL+C
<BlockingChannel impl=<Channel number=1 OPEN conn=<SelectConnection OPEN transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7f715d76f128> params=<ConnectionParameters host=localhost port=5672 virtual_host=/ ssl=False>>>> <Basic.Deliver([‘consumer_tag=ctag1.b728277178e746118699d5b4302a0314‘, ‘delivery_tag=1‘, ‘exchange=‘, ‘redelivered=False‘, ‘routing_key=hello‘])> <BasicProperties>
[x] Received b‘Hello World!‘
收到了bytes格式的 Hello World!
消费者(接收端)这边看到已经卡主了
如果此时单独在运行一下生产者(发送端),直接可以从消费者看到新收到的消息
rabbitmq 消息分发轮询
重新开启rabbitmq
运行三个接收者(消费者)
运行发送者,可以看到被第一个接收者给收到信息了
第二次运行发送者,第二个接收者收到信息了
第三次运行发送者,第三个接收者收到信息了
上面几次运行说明了,依次的将信息发送每一个接收者
接收端 consumer
import pika
import time
# 建立实例
connection = pika.BlockingConnection(pika.ConnectionParameters(
‘localhost‘))
# 声明管道
channel = connection.channel()
channel.queue_declare(queue=‘hello‘)
def callback(ch, method, properties, body):
print(ch, method, properties)
print(" [x] Received %r" % body)
# 正常回调函数(callback)执行完成就表示信息接收完成,如果在还没执行完成时就出现异常就表示信息没有正常接收,比如断网、断电等,会导致信息不能正常接收。
# 下面sleep 60秒,在60秒之前就将该模块终止执行来模拟异常情况。
time.sleep(60)
#ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basic_consume(
‘hello‘,
callback,
# no_ack=True 表示不管消息是否接收(处理)完成,都不会回复确认消息
# 如果producer不关心 comsumer是否处理完,可以使用该参数
# 但是一般都不会使用它
# no_ack=True
)
print(‘ [*] Waiting for messages. To exit press CTRL+C‘)
channel.start_consuming() #
在centos中重新执行rabbitmq-server start来清空队列里的消息
然后在pycharm开启三个comsumer,在去运行等待接收消息
再去执行producer来发送消息,执行producer后,立即关闭第一个comsumer,这样消息就会因为第一个comsumer没接收成功跑到第二个comsumer去,以此类推。
关闭第二个comsumer,第三个comsumer收到信息
这张图是将三个comsumer同时都关闭了,这样三个comsumer都收不到消息,说明producer的消息没有被接收,此时再去开启第一个comsumer,这时第一个comsumer会将消息给接收过来。
我们将sleep注释掉,也是这种现象,这是因为comsumer并没有发送确认消息给producer
import pika
import time
# 建立实例
connection = pika.BlockingConnection(pika.ConnectionParameters(
‘localhost‘))
# 声明管道
channel = connection.channel()
channel.queue_declare(queue=‘hello‘)
def callback(ch, method, properties, body):
print(ch, method, properties)
print(" [x] Received %r" % body)
time.sleep(30)
ch.basic_ack(delivery_tag = method.delivery_tag) # 告诉生成者,消息处理完成
channel.basic_consume(
‘hello‘,
callback,
# no_ack=True
)
print(‘ [*] Waiting for messages. To exit press CTRL+C‘)
channel.start_consuming() #
此时的代码:当其中一个comsumer执行完成,并发送确认消息后再去中断,下一个comsumer就不会收到消息;反之,如果还没发送确认消息就中断了,那么消息就会被下一个comsumer接收到。
rabbitmq 消息持久化
如果producer端宕机,那么队列的数据也会消失;这样就需要让队列消息持久化
# durable=True 该代码只是将生成的队列持久化(不是消息),如果producer宕机,队列会存在,单消息会丢
# 要注意需要在producer端和 comsumer端都要 写durable=True
channel.queue_declare(queue=‘hello‘,durable=True)
在centos重新开启 rabbitmq-server start
在producer端
将producer代码执行三次,将三个消息放入队列
import pika
。
connection = pika.BlockingConnection(
pika.ConnectionParameters(‘localhost‘,5672)
)
channel = connection.channel()
channel.queue_declare(queue=‘hello‘,durable=True)
channel.basic_publish(exchange=‘‘,
routing_key=‘hello‘,
body=‘Hello World!‘,
# 下面的代码是让消息持久化
properties = pika.BasicProperties(delivery_mode=2)
)
print(" [x] Sent ‘Hello World!‘")
connection.close()
将producer代码执行三次,将三个消息放入队列
import pika
import time
connection = pika.BlockingConnection(pika.ConnectionParameters(
‘localhost‘))
channel = connection.channel()
channel.queue_declare(queue=‘hello‘,durable=True)
def callback(ch, method, properties, body):
print(ch, method, properties)
print(" [x] Received %r" % body)
# time.sleep(30) #注释掉
ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basic_consume(
‘hello‘,
callback
)
print(‘ [*] Waiting for messages. To exit press CTRL+C‘)
channel.start_consuming() #
以上是关于Rabbitmq 消息队列的主要内容,如果未能解决你的问题,请参考以下文章
RabbitMQ学习笔记五:RabbitMQ之优先级消息队列
RabbitMQ:第二章:Spring整合RabbitMQ(简单模式,广播模式,路由模式,通配符模式,消息可靠性投递,防止消息丢失,TTL,死信队列,延迟队列,消息积压,消息幂等性)(代码