python2.0_s12_day10_rabbitMQ使用介绍
Posted zhming
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了python2.0_s12_day10_rabbitMQ使用介绍相关的知识,希望对你有一定的参考价值。
RabbitMQ
RabbitMQ是一个在AMQP基础上完整的,可复用的企业消息系统。他遵循Mozilla Public License开源协议。
MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法。应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们。消 息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术。排队指的是应用程序通过 队列来通信。队列的使用除去了接收和发送应用程序同时执行的要求。
RabbitMQ安装
Linux 安装配置epel源 $ rpm -ivh http://dl.fedoraproject.org/pub/epel/6/i386/epel-release-6-8.noarch.rpm 安装erlang,因为RabiitMQ用erlang语言写的 $ yum -y install erlang 安装RabbitMQ $ yum -y install rabbitmq-server 注意:service rabbitmq-server start/stop
MAC 安装 http://www.rabbitmq.com/install-standalone-mac.html
安装API
pip install pika #pika是官方提供的,当然还有其他的 or easy_install pika or 源码 https://pypi.python.org/pypi/pika
一、实现最简单的队列通信
send端
1 #!/usr/bin/evn python3.5 2 #__author__:"ted.zhou" 3 ‘‘‘ 4 zibbitMQ最简单的队列通信代码范例 5 ‘‘‘ 6 import pika 7 8 connection = pika.BlockingConnection(pika.ConnectionParameters(‘localhost‘)) # 连接一个rabbitMQ,返回连接成功后的实例 9 channel = connection.channel() # 创建一个管道,用于传输各种队列.--连接成功后,还不能直接使用,需要在这个连接的实例中创建一个管道. 10 11 # 声明一个queue 12 # channel.queue_declare(queue=‘hello‘) # 在这个管道里声明一个队列 ,队列的名称为"hello" 13 ‘‘‘ 14 使用pika连接并创建队列需要三步 15 1.使用pika.BlockingConnection() 创建一个连接 16 2.创建一个管道 17 3.声明一个队列 18 ‘‘‘ 19 20 # 紧接着就可以通过这个管道发送内容了,在发送时,必须有三个参数 21 # exchege = ‘‘ 这个在发布订阅模式时,会用到,具体高级用法会提到,这里默认给‘‘,这样它内部还是会调用一个默认类型. 22 # routing_key = ‘hello‘,这里的routing_key 是选择通过哪个队列发送 23 # body = ‘Hello World!‘ 要发送的内容 24 channel.basic_publish(exchange=‘‘, 25 routing_key=‘hello‘, # 接收端不是这个参数,而是queue 26 body=‘Hello World!‘) 27 28 print(" [x] Sent ‘Hello World!‘") # 生产者端打印发送信息,表示代码已经执行到这里 29 connection.close() # 关闭这个连接
receive端
1 #!/usr/bin/env python3.5 2 __author__ = "ted.zhou" 3 ‘‘‘ 4 python使用zabbitMQ实现最简单的队列通信之接收端代码范例 5 ‘‘‘ 6 7 import pika 8 9 # 使用pika模块,连接到指定的rabbitMQ服务器 10 connection = pika.BlockingConnection(pika.ConnectionParameters(‘localhost‘)) 11 12 # 连接创建成功后,实例化一个管道 13 channel = connection.channel() 14 15 # 然后在管道中声明一个队列,表示我这个管道里可以跑 ‘hello‘这个队列, 16 # 我们在发送端声明了一个‘hello‘ 的queue,这里为什么还要声明一次 ,因为当接收端先启动的时候,此时不声明,下面代码在接收时会报错. 17 # 当然发送端如果先启动了,这里声明也不会报错. 18 19 20 channel.queue_declare(queue=‘hello‘) 21 22 # 紧接着我们就要进行接收队列里的消息,但是接收之前我们要知道这个消息我们收来做哪些操作呢,只接过来没啥意义. 23 # 所以定义一个callback函数 24 25 # 这里注意,接收端定义的callback函数,一定要带三个参数 26 # 1.ch 2.method 3.properties 4.其后才是信息主题body 27 # 前面3个参数是做什么的,暂时用不到,后面高级的用法会举例 28 def callback(ch,method,properties,body): 29 print("[x] Received %r" %body) 30 31 # 紧接着定义接收,定义完接收并不是直接就接收了,这个和发送端的basic_publish()方法不太一样,basic_publish()是直接就发送了,而接收basic_consume()方法定义后,还需要调用一个start方法 32 # 定义管道的接收方法. 33 # 参数介绍: queue 指定 接收的队列名称 , no_ack=True 是定义此接收方法是否要确认执行完成,如果为True, 34 # 说明不需要验证执行状态,也就是说当一个callback需要处理6分钟,当5分钟时程序卡死了,此消息也就没了,如果为False,5分钟卡死后,消息在队列中依然存在 35 channel.basic_consume(callback, 36 queue=‘hello‘, # 发送端不是这个参数,而是routing_key 37 no_ack=True) 38 print(‘ [*] Waiting for messages. To exit press CTRL+C‘) 39 channel.start_consuming() # 开启接收,没有就阻塞
晋级:
二、队列持久化&消息持久化:
我们上面的例子,在发送端管道cannel中声明了‘hello‘队列.
为了避免当接收端先启动的情况下,因为发送端还未运行程序导致rabbitMQ服务中没有‘hello‘队列,导致接收端程序报错,所以在接收端中的管道也声明了‘hello‘队列
无论是发送端还是接收端在管道cannel中声明了‘hello‘队列,在rabbitMQ服务器中,你都可以通过命令查看此队列的信息:
MacBook-Pro:~ tedzhou$ sudo rabbitmqctl list_queues
Listing queues ...
hello 0
那么问题来了,当发送端发送了很多信息在‘hello‘队列中,接收端还没启动呢,这时候所有的信息都存在hello队列,如下这种情况:
MacBook-Pro:~ tedzhou$ sudo rabbitmqctl list_queues
Listing queues ...
hello 7
如果此时rabbitMQ服务器挂了,或者重启了,会有两个问题:1.这个‘hello‘队列还存在吗? 2.‘hello‘队列中的信息还存在吗?
我们做下测试:
停止rabbitMQ服务 MacBook-Pro:~ tedzhou$ sudo rabbitmqctl stop Stopping and halting node ‘[email protected]‘ ... 启动rabbitMQ服务 MacBook-Pro:~ tedzhou$ sudo rabbitmq-server 查看rabbitMQ的队列 MacBook-Pro:~ tedzhou$ sudo rabbitmqctl list_queues Listing queues ... 结果证明了: 1.队列没有了 2.消息更没有了
整成业务中,我们肯定希望这些队列和消息能够保留下来.所以我们要解决两个问题.
1.持久化队列
2.持久化消息
1.队列持久化代码范例
要在声明队列的时候,加上队列持久化参数
channel.queue_declare(queue=‘hello‘, durable=True)
2.消息持久化代码范例
要在发送消息的代码部分,加上消息持久化的属性,delivery_mode=2就是说这个消息持久化消息,直到消费掉.(老实说delivery_mode有30多种,常用的就这一种)
channel.basic_publish(exchange=‘‘,
routing_key="task_queue",
body=message,
properties=pika.BasicProperties(
delivery_mode = 2, # make message persistent
))
发送端要在声明队列和发送消息中更改代码
1 #!/usr/bin/evn python3.5 2 #__author__:"ted.zhou" 3 ‘‘‘ 4 zibbitMQ最简单的队列通信代码范例 5 ‘‘‘ 6 import pika 7 8 connection = pika.BlockingConnection(pika.ConnectionParameters(‘localhost‘)) # 连接一个rabbitMQ,返回连接成功后的实例 9 channel = connection.channel() # 创建一个管道,用于传输各种队列.--连接成功后,还不能直接使用,需要在这个连接的实例中创建一个管道. 10 11 # 声明一个queue 12 # channel.queue_declare(queue=‘hello‘) # 在这个管道里声明一个队列 ,队列的名称为"hello" 13 channel.queue_declare(queue=‘hello‘,durable=True) # durable=True 设置此队列持久化属性为True 14 ‘‘‘ 15 使用pika连接并创建队列需要三步 16 1.使用pika.BlockingConnection() 创建一个连接 17 2.创建一个管道 18 3.声明一个队列 19 ‘‘‘ 20 21 # 紧接着就可以通过这个管道发送内容了,在发送时,必须有三个参数 22 # exchege = ‘‘ 这个在发布订阅模式时,会用到,具体高级用法会提到,这里默认给‘‘,这样它内部还是会调用一个默认类型. 23 # routing_key = ‘hello‘,这里的routing_key 是选择通过哪个队列发送 24 # body = ‘Hello World!‘ 要发送的内容 25 channel.basic_publish(exchange=‘‘, 26 routing_key=‘hello‘, # 接收端不是这个参数,而是queue 27 body=‘Hello World!‘, 28 properties=pika.BasicProperties( # 消息持久化加入的参数 29 delivery_mode = 2,) 30 ) 31 32 print(" [x] Sent ‘Hello World!‘") # 生产者端打印发送信息,表示代码已经执行到这里 33 connection.close() # 关闭这个连接
接收端1.需要在声明队列中设置持久化属性,2.它要在callback中获得接收到的数据de
1 #!/usr/bin/env python3.5 2 __author__ = "ted.zhou" 3 ‘‘‘ 4 python使用zabbitMQ实现最简单的队列通信之接收端代码范例 5 ‘‘‘ 6 7 import pika 8 9 # 使用pika模块,连接到指定的rabbitMQ服务器 10 connection = pika.BlockingConnection(pika.ConnectionParameters(‘localhost‘)) 11 12 # 连接创建成功后,实例化一个管道 13 channel = connection.channel() 14 15 # 然后在管道中声明一个队列,表示我这个管道里可以跑 ‘hello‘这个队列, 16 # 我们在发送端声明了一个‘hello‘ 的queue,这里为什么还要声明一次 ,因为当接收端先启动的时候,此时不声明,下面代码在接收时会报错. 17 # 当然发送端如果先启动了,这里声明也不会报错. 18 19 20 #channel.queue_declare(queue=‘hello‘) 21 channel.queue_declare(queue=‘hello‘,durable=True) #durable=True 设置此队列持久化属性为True 22 23 24 # 紧接着我们就要进行接收队列里的消息,但是接收之前我们要知道这个消息我们收来做哪些操作呢,只接过来没啥意义. 25 # 所以定义一个callback函数 26 27 # 这里注意,接收端定义的callback函数,一定要带三个参数 28 # 1.ch 2.method 3.properties 4.其后才是信息主题body 29 # 前面3个参数是做什么的,暂时用不到,后面高级的用法会举例 30 def callback(ch,method,properties,body): 31 print("[x] Received %r" %body) 32 time.sleep(body.count(b‘.‘)) 33 print(" [x] Done") 34 ch.basic_ack(delivery_tag = method.delivery_tag) # 获得delivery_tag,具体啥一起,老师没说,就说咱加上! 35 36 # 紧接着定义接收,定义完接收并不是直接就接收了,这个和发送端的basic_publish()方法不太一样,basic_publish()是直接就发送了,而接收basic_consume()方法定义后,还需要调用一个start方法 37 # 定义管道的接收方法. 38 # 参数介绍: queue 指定 接收的队列名称 , no_ack=True 是定义此接收方法是否要确认执行完成,如果为True, 39 # 说明不需要验证执行状态,也就是说当一个callback需要处理6分钟,当5分钟时程序卡死了,此消息也就没了,如果为False,5分钟卡死后,消息在队列中依然存在 40 channel.basic_consume(callback, 41 queue=‘hello‘) # 发送端不是这个参数,而是routing_key 42 43 print(‘ [*] Waiting for messages. To exit press CTRL+C‘) 44 channel.start_consuming() # 开启接收,没有就阻塞
我们通过查看rabbitMQ里的队列情况,来验证下是否持久化成功.
1 首先只运行发送端程序,运行6遍. 2 查看队列: 3 MacBook-Pro:~ tedzhou$ sudo rabbitmqctl list_queues 4 Listing queues ... 5 hello 6 6 停掉服务: 7 MacBook-Pro:~ tedzhou$ sudo rabbitmqctl stop 8 Stopping and halting node ‘[email protected]‘ ... 9 开启服务: 10 MacBook-Pro:~ tedzhou$sudo rabbitmq-server & 11 再次查看队列: 12 MacBook-Pro:~ tedzhou$ sudo rabbitmqctl list_queues 13 Listing queues ... 14 hello 6
验证结果: 持久化 队列&消息成功.
用法晋级2
三.Work Queues
在这种模式下,RabbitMQ会默认把p发的消息依次分发给各个消费者(c),跟负载均衡差不多
消息生产者代码
1 import pika 2 3 connection = pika.BlockingConnection(pika.ConnectionParameters( 4 ‘localhost‘)) 5 channel = connection.channel() 6 7 #声明queue 8 channel.queue_declare(queue=‘task_queue‘) 9 10 #n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange. 11 import sys 12 13 message = ‘ ‘.join(sys.argv[1:]) or "Hello World!" 14 channel.basic_publish(exchange=‘‘, 15 routing_key=‘task_queue‘, 16 body=message, 17 properties=pika.BasicProperties( 18 delivery_mode = 2, # make message persistent 19 )) 20 print(" [x] Sent %r" % message) 21 connection.close()
消费者代码:
1 import pika,time 2 3 connection = pika.BlockingConnection(pika.ConnectionParameters( 4 ‘localhost‘)) 5 channel = connection.channel() 6 7 8 9 def callback(ch, method, properties, body): 10 print(" [x] Received %r" % body) 11 time.sleep(body.count(b‘.‘)) 12 print(" [x] Done") 13 ch.basic_ack(delivery_tag = method.delivery_tag) 14 15 16 channel.basic_consume(callback, 17 queue=‘task_queue‘, 18 ) 19 20 print(‘ [*] Waiting for messages. To exit press CTRL+C‘) 21 channel.start_consuming()
当你多次运行一个生产者的代码,而运行3个消费者的代码,你会发现消息会轮询3个消费者程序,也就是消费者会依次接收到代码,这个就像简单的负载均衡.
那么问题来了,加入运行消费者程序的3台机器的配置不一样,好的1台,消费一条消息需要1分钟, 性能差的机器要10分钟,那么前面说到的负载均衡就会导致,差的严重影响效率.
我们在LVS这类负载均衡是可以设置权重,同样消费者在接收消息时也可以设置相应的功能,但不是权重,它比权重更人性化,它可以保证一个消费者程序,同时只能保证1个信息在消费,当然也可以设置同一时刻保证在消费2个信息
具体实现代码如下:
生产者代码不变:
1 #!/usr/bin/env python 2 import pika 3 import sys 4 5 connection = pika.BlockingConnection(pika.ConnectionParameters( 6 host=‘localhost‘)) 7 channel = connection.channel() 8 9 channel.queue_declare(queue=‘task_queue‘, durable=True) 10 11 message = ‘ ‘.join(sys.argv[1:]) or "Hello World!" 12 channel.basic_publish(exchange=‘‘, 13 routing_key=‘task_queue‘, 14 body=message, 15 properties=pika.BasicProperties( 16 delivery_mode = 2, # make message persistent 17 )) 18 print(" [x] Sent %r" % message) 19 connection.close()
消费者代码加入channel.basic_qos(prefetch_count=1),代码如下:
1 #!/usr/bin/env python 2 import pika 3 import time 4 5 connection = pika.BlockingConnection(pika.ConnectionParameters( 6 host=‘localhost‘)) 7 channel = connection.channel() 8 9 channel.queue_declare(queue=‘task_queue‘, durable=True) 10 print(‘ [*] Waiting for messages. To exit press CTRL+C‘) 11 12 def callback(ch, method, properties, body): 13 print(" [x] Received %r" % body) 14 time.sleep(body.count(b‘.‘)) 15 print(" [x] Done") 16 ch.basic_ack(delivery_tag = method.delivery_tag) 17 18 channel.basic_qos(prefetch_count=1) #表示同意时刻保证客户端程序只处理一个消息 19 channel.basic_consume(callback, 20 queue=‘task_queue‘) 21 22 channel.start_consuming()
rabbitMQ高级用法
四、Publish\Subscribe(消息发布\订阅)
之前的例子都基本都是1对1的消息发送和接收,即消息只能发送到指定的queue里,但有些时候你想让你的消息被所有的Queue收到,类似广播的效果,这时候就要用到exchange了,
Exchange在定义的时候是有类型的,以决定到底是哪些Queue符合条件,可以接收消息
fanout: 所有bind到此exchange的queue都可以接收消息
direct: 通过routingKey和exchange决定的那个唯一的queue可以接收消息
topic:所有符合routingKey(此时可以是一个表达式)的routingKey所bind的queue可以接收消息
表达式符号说明:#代表一个或多个字符,*代表任何字符
例:#.a会匹配a.a,aa.a,aaa.a等
*.a会匹配a.a,b.a,c.a等
注:使用RoutingKey为#,Exchange Type为topic的时候相当于使用fanout
headers: 通过headers 来决定把消息发给哪些queue
以上是关于python2.0_s12_day10_rabbitMQ使用介绍的主要内容,如果未能解决你的问题,请参考以下文章
python2.0_s12_day9_协程&Gevent协程
python2.0_s12_day9之day8遗留知识(queue队列&生产者消费者模型)