python2.0_s12_day10_rabbitMQ使用介绍

Posted zhming

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了python2.0_s12_day10_rabbitMQ使用介绍相关的知识,希望对你有一定的参考价值。

RabbitMQ
RabbitMQ是一个在AMQP基础上完整的,可复用的企业消息系统。他遵循Mozilla Public License开源协议。

MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法。应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们。消 息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术。排队指的是应用程序通过 队列来通信。队列的使用除去了接收和发送应用程序同时执行的要求。
RabbitMQ安装
        Linux
        安装配置epel源
           $ rpm -ivh http://dl.fedoraproject.org/pub/epel/6/i386/epel-release-6-8.noarch.rpm

        安装erlang,因为RabiitMQ用erlang语言写的
           $ yum -y install erlang

        安装RabbitMQ
           $ yum -y install rabbitmq-server
       注意:service rabbitmq-server start/stop
        MAC
        安装 http://www.rabbitmq.com/install-standalone-mac.html
    安装API
        pip install pika #pika是官方提供的,当然还有其他的
        or
        easy_install pika
        or
        源码
         
        https://pypi.python.org/pypi/pika
   一、实现最简单的队列通信
send端
 1         #!/usr/bin/evn python3.5
 2         #__author__:"ted.zhou"
 3         ‘‘‘
 4         zibbitMQ最简单的队列通信代码范例
 5         ‘‘‘
 6         import pika
 7 
 8         connection = pika.BlockingConnection(pika.ConnectionParameters(localhost)) # 连接一个rabbitMQ,返回连接成功后的实例
 9         channel = connection.channel()      # 创建一个管道,用于传输各种队列.--连接成功后,还不能直接使用,需要在这个连接的实例中创建一个管道.
10 
11         # 声明一个queue
12         # channel.queue_declare(queue=hello) # 在这个管道里声明一个队列 ,队列的名称为"hello"
13         ‘‘‘
14         使用pika连接并创建队列需要三步
15         1.使用pika.BlockingConnection() 创建一个连接
16         2.创建一个管道
17         3.声明一个队列
18         ‘‘‘
19 
20         # 紧接着就可以通过这个管道发送内容了,在发送时,必须有三个参数
21         # exchege = ‘‘ 这个在发布订阅模式时,会用到,具体高级用法会提到,这里默认给‘‘,这样它内部还是会调用一个默认类型.
22         # routing_key = hello,这里的routing_key 是选择通过哪个队列发送
23         # body = Hello World! 要发送的内容
24         channel.basic_publish(exchange=‘‘,
25                               routing_key=hello,   # 接收端不是这个参数,而是queue
26                               body=Hello World!)
27 
28         print(" [x] Sent ‘Hello World!‘") # 生产者端打印发送信息,表示代码已经执行到这里
29         connection.close() # 关闭这个连接

receive端
 1  #!/usr/bin/env python3.5
 2         __author__ = "ted.zhou"
 3         ‘‘‘
 4         python使用zabbitMQ实现最简单的队列通信之接收端代码范例
 5         ‘‘‘
 6 
 7         import pika
 8 
 9         # 使用pika模块,连接到指定的rabbitMQ服务器
10         connection = pika.BlockingConnection(pika.ConnectionParameters(localhost))
11 
12         # 连接创建成功后,实例化一个管道
13         channel = connection.channel()
14 
15         # 然后在管道中声明一个队列,表示我这个管道里可以跑 hello这个队列,
16         # 我们在发送端声明了一个hello 的queue,这里为什么还要声明一次 ,因为当接收端先启动的时候,此时不声明,下面代码在接收时会报错.
17         # 当然发送端如果先启动了,这里声明也不会报错.
18 
19 
20         channel.queue_declare(queue=hello)
21 
22         # 紧接着我们就要进行接收队列里的消息,但是接收之前我们要知道这个消息我们收来做哪些操作呢,只接过来没啥意义.
23         # 所以定义一个callback函数
24 
25         # 这里注意,接收端定义的callback函数,一定要带三个参数
26         # 1.ch 2.method 3.properties 4.其后才是信息主题body
27         # 前面3个参数是做什么的,暂时用不到,后面高级的用法会举例
28         def callback(ch,method,properties,body):
29             print("[x] Received %r" %body)
30 
31         # 紧接着定义接收,定义完接收并不是直接就接收了,这个和发送端的basic_publish()方法不太一样,basic_publish()是直接就发送了,而接收basic_consume()方法定义后,还需要调用一个start方法
32         # 定义管道的接收方法.
33         # 参数介绍: queue 指定 接收的队列名称 , no_ack=True 是定义此接收方法是否要确认执行完成,如果为True,
34         # 说明不需要验证执行状态,也就是说当一个callback需要处理6分钟,当5分钟时程序卡死了,此消息也就没了,如果为False,5分钟卡死后,消息在队列中依然存在
35         channel.basic_consume(callback,
36                               queue=hello,   # 发送端不是这个参数,而是routing_key
37                               no_ack=True)
38         print( [*] Waiting for messages. To exit press CTRL+C)
39         channel.start_consuming() # 开启接收,没有就阻塞

晋级:
二、队列持久化&消息持久化:
我们上面的例子,在发送端管道cannel中声明了‘hello‘队列.
为了避免当接收端先启动的情况下,因为发送端还未运行程序导致rabbitMQ服务中没有‘hello‘队列,导致接收端程序报错,所以在接收端中的管道也声明了‘hello‘队列
无论是发送端还是接收端在管道cannel中声明了‘hello‘队列,在rabbitMQ服务器中,你都可以通过命令查看此队列的信息:
MacBook-Pro:~ tedzhou$ sudo rabbitmqctl list_queues
Listing queues ...
hello 0
那么问题来了,当发送端发送了很多信息在‘hello‘队列中,接收端还没启动呢,这时候所有的信息都存在hello队列,如下这种情况:
MacBook-Pro:~ tedzhou$ sudo rabbitmqctl list_queues
Listing queues ...
hello 7
如果此时rabbitMQ服务器挂了,或者重启了,会有两个问题:1.这个‘hello‘队列还存在吗? 2.‘hello‘队列中的信息还存在吗?
我们做下测试:
        停止rabbitMQ服务
            MacBook-Pro:~ tedzhou$ sudo rabbitmqctl stop
            Stopping and halting node [email protected] ...
        启动rabbitMQ服务
            MacBook-Pro:~ tedzhou$ sudo rabbitmq-server
        查看rabbitMQ的队列
            MacBook-Pro:~ tedzhou$ sudo rabbitmqctl  list_queues
            Listing queues ...
        结果证明了:
            1.队列没有了
            2.消息更没有了
        整成业务中,我们肯定希望这些队列和消息能够保留下来.所以我们要解决两个问题.
1.持久化队列
2.持久化消息
1.队列持久化代码范例
要在声明队列的时候,加上队列持久化参数
channel.queue_declare(queue=‘hello‘, durable=True)
2.消息持久化代码范例
要在发送消息的代码部分,加上消息持久化的属性,delivery_mode=2就是说这个消息持久化消息,直到消费掉.(老实说delivery_mode有30多种,常用的就这一种)
channel.basic_publish(exchange=‘‘,
                      routing_key="task_queue",
                      body=message,
                      properties=pika.BasicProperties(
                         delivery_mode = 2, # make message persistent
                      ))
发送端要在声明队列和发送消息中更改代码
 1         #!/usr/bin/evn python3.5
 2         #__author__:"ted.zhou"
 3         ‘‘‘
 4         zibbitMQ最简单的队列通信代码范例
 5         ‘‘‘
 6         import pika
 7 
 8         connection = pika.BlockingConnection(pika.ConnectionParameters(localhost)) # 连接一个rabbitMQ,返回连接成功后的实例
 9         channel = connection.channel()      # 创建一个管道,用于传输各种队列.--连接成功后,还不能直接使用,需要在这个连接的实例中创建一个管道.
10 
11         # 声明一个queue
12         # channel.queue_declare(queue=hello) # 在这个管道里声明一个队列 ,队列的名称为"hello"
13         channel.queue_declare(queue=hello,durable=True) # durable=True 设置此队列持久化属性为True
14         ‘‘‘
15         使用pika连接并创建队列需要三步
16         1.使用pika.BlockingConnection() 创建一个连接
17         2.创建一个管道
18         3.声明一个队列
19         ‘‘‘
20 
21         # 紧接着就可以通过这个管道发送内容了,在发送时,必须有三个参数
22         # exchege = ‘‘ 这个在发布订阅模式时,会用到,具体高级用法会提到,这里默认给‘‘,这样它内部还是会调用一个默认类型.
23         # routing_key = hello,这里的routing_key 是选择通过哪个队列发送
24         # body = Hello World! 要发送的内容
25         channel.basic_publish(exchange=‘‘,
26                               routing_key=hello,   # 接收端不是这个参数,而是queue
27                               body=Hello World!,
28                               properties=pika.BasicProperties(          # 消息持久化加入的参数
29                                       delivery_mode = 2,)
30                               )
31 
32         print(" [x] Sent ‘Hello World!‘") # 生产者端打印发送信息,表示代码已经执行到这里
33         connection.close() # 关闭这个连接

接收端1.需要在声明队列中设置持久化属性,2.它要在callback中获得接收到的数据de
 1          #!/usr/bin/env python3.5
 2         __author__ = "ted.zhou"
 3         ‘‘‘
 4         python使用zabbitMQ实现最简单的队列通信之接收端代码范例
 5         ‘‘‘
 6 
 7         import pika
 8 
 9         # 使用pika模块,连接到指定的rabbitMQ服务器
10         connection = pika.BlockingConnection(pika.ConnectionParameters(localhost))
11 
12         # 连接创建成功后,实例化一个管道
13         channel = connection.channel()
14 
15         # 然后在管道中声明一个队列,表示我这个管道里可以跑 hello这个队列,
16         # 我们在发送端声明了一个hello 的queue,这里为什么还要声明一次 ,因为当接收端先启动的时候,此时不声明,下面代码在接收时会报错.
17         # 当然发送端如果先启动了,这里声明也不会报错.
18 
19 
20         #channel.queue_declare(queue=hello)
21         channel.queue_declare(queue=hello,durable=True) #durable=True 设置此队列持久化属性为True
22 
23 
24         # 紧接着我们就要进行接收队列里的消息,但是接收之前我们要知道这个消息我们收来做哪些操作呢,只接过来没啥意义.
25         # 所以定义一个callback函数
26 
27         # 这里注意,接收端定义的callback函数,一定要带三个参数
28         # 1.ch 2.method 3.properties 4.其后才是信息主题body
29         # 前面3个参数是做什么的,暂时用不到,后面高级的用法会举例
30         def callback(ch,method,properties,body):
31             print("[x] Received %r" %body)
32             time.sleep(body.count(b.))
33             print(" [x] Done")
34             ch.basic_ack(delivery_tag = method.delivery_tag)    # 获得delivery_tag,具体啥一起,老师没说,就说咱加上!
35 
36         # 紧接着定义接收,定义完接收并不是直接就接收了,这个和发送端的basic_publish()方法不太一样,basic_publish()是直接就发送了,而接收basic_consume()方法定义后,还需要调用一个start方法
37         # 定义管道的接收方法.
38         # 参数介绍: queue 指定 接收的队列名称 , no_ack=True 是定义此接收方法是否要确认执行完成,如果为True,
39         # 说明不需要验证执行状态,也就是说当一个callback需要处理6分钟,当5分钟时程序卡死了,此消息也就没了,如果为False,5分钟卡死后,消息在队列中依然存在
40         channel.basic_consume(callback,
41                               queue=hello)   # 发送端不是这个参数,而是routing_key
42 
43         print( [*] Waiting for messages. To exit press CTRL+C)
44         channel.start_consuming() # 开启接收,没有就阻塞

我们通过查看rabbitMQ里的队列情况,来验证下是否持久化成功.
 1 首先只运行发送端程序,运行6遍.
 2         查看队列:
 3             MacBook-Pro:~ tedzhou$ sudo rabbitmqctl  list_queues
 4             Listing queues ...
 5             hello  6
 6         停掉服务:
 7             MacBook-Pro:~ tedzhou$ sudo rabbitmqctl  stop
 8             Stopping and halting node [email protected] ...
 9         开启服务:
10             MacBook-Pro:~ tedzhou$sudo rabbitmq-server &
11         再次查看队列:
12             MacBook-Pro:~ tedzhou$ sudo rabbitmqctl  list_queues
13             Listing queues ...
14             hello  6

验证结果: 持久化 队列&消息成功.

用法晋级2
.Work Queues
技术分享

在这种模式下,RabbitMQ会默认把p发的消息依次分发给各个消费者(c),跟负载均衡差不多

消息生产者代码
 1  import pika
 2          
 3         connection = pika.BlockingConnection(pika.ConnectionParameters(
 4                        localhost))
 5         channel = connection.channel()
 6          
 7         #声明queue
 8         channel.queue_declare(queue=task_queue)
 9          
10         #n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange.
11         import sys
12          
13         message =  .join(sys.argv[1:]) or "Hello World!"
14         channel.basic_publish(exchange=‘‘,
15                               routing_key=task_queue,
16                               body=message,
17                               properties=pika.BasicProperties(
18                               delivery_mode = 2, # make message persistent
19                               ))
20         print(" [x] Sent %r" % message)
21         connection.close()
    消费者代码:
 1  import pika,time
 2          
 3         connection = pika.BlockingConnection(pika.ConnectionParameters(
 4                        localhost))
 5         channel = connection.channel()
 6          
 7          
 8          
 9         def callback(ch, method, properties, body):
10             print(" [x] Received %r" % body)
11             time.sleep(body.count(b.))
12             print(" [x] Done")
13             ch.basic_ack(delivery_tag = method.delivery_tag)
14          
15          
16         channel.basic_consume(callback,
17                               queue=task_queue,
18                               )
19          
20         print( [*] Waiting for messages. To exit press CTRL+C)
21         channel.start_consuming()
    当你多次运行一个生产者的代码,而运行3个消费者的代码,你会发现消息会轮询3个消费者程序,也就是消费者会依次接收到代码,这个就像简单的负载均衡.
那么问题来了,加入运行消费者程序的3台机器的配置不一样,好的1台,消费一条消息需要1分钟, 性能差的机器要10分钟,那么前面说到的负载均衡就会导致,差的严重影响效率.
我们在LVS这类负载均衡是可以设置权重,同样消费者在接收消息时也可以设置相应的功能,但不是权重,它比权重更人性化,它可以保证一个消费者程序,同时只能保证1个信息在消费,当然也可以设置同一时刻保证在消费2个信息

           技术分享


具体实现代码如下:
生产者代码不变:
 1             #!/usr/bin/env python
 2             import pika
 3             import sys
 4              
 5             connection = pika.BlockingConnection(pika.ConnectionParameters(
 6                     host=localhost))
 7             channel = connection.channel()
 8              
 9             channel.queue_declare(queue=task_queue, durable=True)
10              
11             message =  .join(sys.argv[1:]) or "Hello World!"
12             channel.basic_publish(exchange=‘‘,
13                                   routing_key=task_queue,
14                                   body=message,
15                                   properties=pika.BasicProperties(
16                                      delivery_mode = 2, # make message persistent
17                                   ))
18             print(" [x] Sent %r" % message)
19             connection.close()
        消费者代码加入channel.basic_qos(prefetch_count=1),代码如下:
 1             #!/usr/bin/env python
 2             import pika
 3             import time
 4              
 5             connection = pika.BlockingConnection(pika.ConnectionParameters(
 6                     host=localhost))
 7             channel = connection.channel()
 8              
 9             channel.queue_declare(queue=task_queue, durable=True)
10             print( [*] Waiting for messages. To exit press CTRL+C)
11              
12             def callback(ch, method, properties, body):
13                 print(" [x] Received %r" % body)
14                 time.sleep(body.count(b.))
15                 print(" [x] Done")
16                 ch.basic_ack(delivery_tag = method.delivery_tag)
17              
18             channel.basic_qos(prefetch_count=1)  #表示同意时刻保证客户端程序只处理一个消息
19             channel.basic_consume(callback,
20                                   queue=task_queue)
21              
22             channel.start_consuming()
    rabbitMQ高级用法
四、Publish\Subscribe(消息发布\订阅) 
之前的例子都基本都是1对1的消息发送和接收,即消息只能发送到指定的queue里,但有些时候你想让你的消息被所有的Queue收到,类似广播的效果,这时候就要用到exchange了,
Exchange在定义的时候是有类型的,以决定到底是哪些Queue符合条件,可以接收消息

fanout: 所有bind到此exchange的queue都可以接收消息
direct: 通过routingKey和exchange决定的那个唯一的queue可以接收消息
topic:所有符合routingKey(此时可以是一个表达式)的routingKey所bind的queue可以接收消息
   表达式符号说明:#代表一个或多个字符,*代表任何字符
例:#.a会匹配a.a,aa.a,aaa.a等
*.a会匹配a.a,b.a,c.a等
注:使用RoutingKey为#,Exchange Type为topic的时候相当于使用fanout 

headers: 通过headers 来决定把消息发给哪些queue

              技术分享    



以上是关于python2.0_s12_day10_rabbitMQ使用介绍的主要内容,如果未能解决你的问题,请参考以下文章

python2.0_s12_day9_协程&Gevent协程

python2.0_s12_day21_web聊天室一

python2.0_s12_day9之day8遗留知识(queue队列&生产者消费者模型)

python2.0 s12 day8 _ python线程

python2.0 s12 day8 _ 堡垒机前戏paramiko模块

python2.0_day19_充分使用Django_form实现前端操作后台数据库