python第十一周:RabbitMQRedis
Posted 野生的马
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了python第十一周:RabbitMQRedis相关的知识,希望对你有一定的参考价值。
Rabbit Mq消息队列
RabbitMQ能为你做些什么?
消息系统允许软件、应用相互连接和扩展.这些应用可以相互链接起来组成一个更大的应用,或者将用户设备和数据进行连接.消息系统通过将消息的发送和接收分离来实现应用程序的异步和解偶.
或许你正在考虑进行数据投递,非阻塞操作或推送通知。或许你想要实现发布/订阅,异步处理,或者工作队列。所有这些都可以通过消息系统实现。
RabbitMQ是一个消息代理 - 一个消息系统的媒介。它可以为你的应用提供一个通用的消息发送和接收平台,并且保证消息在传输过程中的安全。
技术亮点
*可靠性
RabbitMQ提供了多种技术可以让你在性能和可靠性之间进行权衡。这些技术包括持久性机制、投递确认、发布者证实和高可用性机制。
*灵活的路由
消息在到达队列前是通过交换机进行路由的。RabbitMQ为典型的路由逻辑提供了多种内置交换机类型。如果你有更复杂的路由需求,可以将这些交换机组合起来使用,你甚至可以实现自己的交换机类型,并且当做RabbitMQ的插件来使用。
*集群
在相同局域网中的多个RabbitMQ服务器可以聚合在一起,作为一个独立的逻辑代理来使用。
*联合
对于服务器来说,它比集群需要更多的松散和非可靠链接。为此RabbitMQ提供了联合模型。
*高可用的队列
在同一个集群里,队列可以被镜像到多个机器中,以确保当其中某些硬件出现故障后,你的消息仍然安全。
*多协议
RabbitMQ 支持多种消息协议的消息传递。
*广泛的客户端
只要是你能想到的编程语言几乎都有与其相适配的RabbitMQ客户端。
*可视化管理工具
RabbitMQ附带了一个易于使用的可视化管理工具,它可以帮助你监控消息代理的每一个环节。
*追踪
如果你的消息系统有异常行为,RabbitMQ还提供了追踪的支持,让你能够发现问题所在。
*插件系统
RabbitMQ附带了各种各样的插件来对自己进行扩展。你甚至也可以写自己的插件来使用。
远程连接Rabbit MQ server
首先要在rabbitmq server上创建一个用户
sudo rabbitmqct1 add_user alex alex3714
同时还要配置权限,允许从外面访问
sudo rabbimqct1 set_permissions -p / alex ".*" ".*" ".*"
set_permissions [-p vhost] {user} {write} {read}
vhost
The name of the virtual host to which to grant the user access, defaulting to /.
user
The name of the user to grant access to the specified virtual host.
conf
A regular expression matching resource names for which the user is granted configure permissions.
write
A regular expression matching resource names for which the user is granted write permissions.
read
A regular expression matching resource names for which the user is granted read permissions.
客户端连接的时候需要配置认证参数
credentials = pika.PlainCredentials("alex","alex3374")
connencion = pika.BlockingConnection(pika.ConnectionParameters(
"10.211.55.5",5627,"/",credentials))
channel = connection.channel()
实现一个最简单的队列通信:hello world程序
producer ------ consumer 通信过程
producer
# -*- coding:utf-8 -*- #!/user/bin/env.python #Author:Mr Wu import pika #先建立连接 connection = pika.BlockingConnection( pika.ConnectionParameters( host="localhost" ) ) #建立管道 channel = connection.channel() #声明队列 queue = channel.queue_declare(queue="alex") #队列的名字叫做alex #发送消息 channel.basic_publish( exchange="", #转发器 routing_key="alex", #将消息发送到叫做alex的队列里面 body="hello world", #消息的内容 ) print("send message: hello world") #关闭连接 connection.close() #output: #send message: hello world
consumer:
# -*- coding:utf-8 -*- #!/user/bin/env.python #Author:Mr Wu import pika #建立连接 connection = pika.BlockingConnection( pika.ConnectionParameters( host="localhost" ) ) #建立管道 channel = connection.channel() #声明一个queue #由于生产者和消费者程序启动的顺序可能不同,如果哪一方先启动且 #没有声明一个queue,那么这方的程序就会报错,所以为了保险起见, #在两方都声明一个相同的queue queue = channel.queue_declare(queue="alex") #alex是queue的名字 def callback(ch,method,properties,body): \'\'\'回调函数,如果收到消息就会调用此函数 ch:管道对象的内存地址 method:消息、及传送对象的相关参数 properties:消息持久化及返回队列等参数信息 body:消息的内容 \'\'\' print("receive message: %s"%body) print("ch: %s"%ch) print("method: %s"%method) channel.basic_consume( callback, queue="alex", #从叫做alex的队列中去消息 ) print("waiting message from alex......") channel.start_consuming() #开始消费,若没有消息则会阻塞 #output: \'\'\' waiting message from alex...... receive message: b\'hello world\' ch: <BlockingChannel impl=<Channel number=1 OPEN conn=<SelectConnection OPEN socket=(\'::1\', 11957, 0, 0)->(\'::1\', 5672, 0, 0) params=<ConnectionParameters host=localhost port=5672 virtual_host=/ ssl=False>>>> method: <Basic.Deliver([\'consumer_tag=ctag1.7c8c5280a34144d7b961405f8150ee9d\', \'delivery_tag=1\', \'exchange=\', \'redelivered=False\', \'routing_key=alex\'])> \'\'\'
消息持久化
一般情况下,如果Rabbit Mq的服务突然中断的话,那么已经声明的队列、发送的消息等都会丢失,如果想要保存队列和消息的话,就要用到消息持久化。
producer:
import pika #先建立连接 connection = pika.BlockingConnection( pika.ConnectionParameters( host="localhost" ) ) #建立管道 channel = connection.channel() #声明队列 queue = channel.queue_declare(queue="alex",durable=True) #队列的名字叫做alex, #durable=True:make queue persistent #发送消息 channel.basic_publish( exchange="", #转发器 routing_key="alex", #将消息发送到叫做alex的队列里面 body="hello world", #消息的内容 properties=pika.BasicProperties( delivery_mode=2, #make message persistent ) ) print("send message: hello world") #关闭连接 connection.close()
consumer:
import pika #建立连接 connection = pika.BlockingConnection( pika.ConnectionParameters( host="localhost" ) ) #建立管道 channel = connection.channel() #声明一个queue #由于生产者和消费者程序启动的顺序可能不同,如果哪一方先启动且 #没有声明一个queue,那么这方的程序就会报错,所以为了保险起见, #在两方都声明一个相同的queue queue = channel.queue_declare(queue="alex",durable=True) #alex是queue的名字 #durable=True:make queue persistent def callback(ch,method,properties,body): \'\'\'回调函数,如果收到消息就会调用此函数 ch:管道对象的内存地址 method:消息、及传送对象的相关参数 properties:消息持久化及返回队列等参数信息 body:消息的内容 \'\'\' print("receive message: %s"%body) #print("ch: %s"%ch) #print("method: %s"%method)
ch.basic_ack(delivery_tag=method.delivery_tag) #处理完消息后发送确认给生产者 channel.basic_consume( callback, queue="alex", #从叫做alex的队列中去消息 ) print("waiting message from alex......") channel.start_consuming() #开始消费,若没有消息则会阻塞
消息公平分发
当有多个消费者时,在默认情况下,Rabbit Mq会按顺序将消息发送给各个消费者。这样做有一个缺点:当多个消费者的性能不等的情况下,性能高的消费者很快就能处理完消息从而无事可做,而性能低下的消费者处理不完消息从而积压的消息越来越多。
RabbitMq提出的解决办法就是在各个消费者端配置perfetch=1,意思就是告诉生产者这个消费者端的消息没有处理完,不要往这个消费者发送消息。
consumer:
import pika #建立连接 connection = pika.BlockingConnection( pika.ConnectionParameters( host="localhost" ) ) #建立管道 channel = connection.channel() #声明一个queue #由于生产者和消费者程序启动的顺序可能不同,如果哪一方先启动且 #没有声明一个queue,那么这方的程序就会报错,所以为了保险起见, #在两方都声明一个相同的queue queue = channel.queue_declare(queue="alex",durable=True) #alex是queue的名字 #durable=True:make queue persistent def callback(ch,method,properties,body): \'\'\'回调函数,如果收到消息就会调用此函数 ch:管道对象的内存地址 method:消息、及传送对象的相关参数 properties:消息持久化及返回队列等参数信息 body:消息的内容 \'\'\' #print("receive message: %s"%body) #print("ch: %s"%ch) print("method: %s"%method) ch.basic_ack(delivery_tag=method.delivery_tag) #处理完消息后发送确认给生产者 channel.basic_qos(prefetch_count=1) #只要消费者消息没有处理完,生产者就不会往这个消费者发送消息 channel.basic_consume( callback, queue="alex", #从叫做alex的队列中去消息 ) print("waiting message from alex......") channel.start_consuming() #开始消费,若没有消息则会阻塞
Publish\\Subscribe 消息发布、订阅
消息的发布订阅模式简单的来说就是广播模式:一个生产者---->多个消费者,一个生产者生产消息而很多消费者能够消费这个消息。
而实现这一功能是通过Exchange(交换器)实现的,而exchange在定义的时候是有类别的,来决定那些queue符合,可以接收消息。
exchange模式:
fanout:所有绑定到此exchange的queue都可以接收到消息
publiser:
# -*- coding:utf-8 -*- #!/user/bin/env.python #Author:Mr Wu import pika #建立连接 connection = pika.BlockingConnection( pika.ConnectionParameters( host="localhost" ) ) #声明管道 channel = connection.channel() #声明转发器 channel.exchange_declare( exchange="alex", exchange_type="fanout", ) #生产消息 channel.basic_publish( exchange="alex", routing_key="", body="hello world", ) print("send message: hello world") connection.close()
subscriber:
# -*- coding:utf-8 -*- #!/user/bin/env.python #Author:Mr Wu import pika #建立连接 connection = pika.BlockingConnection( pika.ConnectionParameters( host="localhost", ) ) #建立管道 channel = connection.channel() #声明转发器 channel.exchange_declare( exchange="alex", exchange_type="fanout", ) #生成一个随机队列 result = channel.queue_declare(exclusive=True) #在此queue的消费者断开后,会自动删除这个queue #获取生成的queue的名字 queue_name = result.method.queue channel.queue_bind( queue=queue_name, exchange="alex", ) def callback(ch,method,properties,body): print("receive message: ",body) channel.basic_consume( callback, queue=queue_name, ) print("wait for message from %s"%queue_name) channel.start_consuming()
有选择的接收消息(exchange_type=direct)
RabbitMq还支持根据关键字发送,即队列绑定关键字,发送者将根据关键字发送消息到exchange,exchange根据关键字判断将数据发送至指定队列。
简单点说:通过routingKey和exchange决定哪个queue可以接收消息
publisher:
# -*- coding:utf-8 -*- #!/user/bin/env.python #Author:Mr Wu import pika,random #建立连接 connection = pika.BlockingConnection( pika.ConnectionParameters( host="localhost" ) ) #声明管道 channel = connection.channel() #声明转发器 channel.exchange_declare( exchange="DJ", exchange_type="direct", ) key = "3334" #routingKey message = input(">>>:") #生产消息 channel.basic_publish( exchange="DJ", routing_key=key, body=message, ) print("send message: %s"%message) connection.close()
subscriber:
# -*- coding:utf-8 -*- #!/user/bin/env.python #Author:Mr Wu import pika #建立连接 connection = pika.BlockingConnection( pika.ConnectionParameters( host="localhost", ) ) #建立管道 channel = connection.channel() #声明转发器 channel.exchange_declare( exchange="DJ", exchange_type="direct", ) #生成一个随机队列 result = channel.queue_declare(exclusive=True) #在此queue的消费者断开后,会自动删除这个queue #获取生成的queue的名字 queue_name = result.method.queue channel.queue_bind( queue=queue_name, exchange="DJ", routing_key="3334" ) def callback(ch,method,properties,body): print("receive message: ",body) print("routingKey: %s"%method.routing_key) channel.basic_consume( callback, queue=queue_name, ) print("wait for message from %s"%queue_name) channel.start_consuming()
更细致的消息过滤(exchange_type=topic)
所有符合routingKey(此时可以是一个表达式)的routingKey所bind的queue可以接收消息
表达式符号说明:
#:代表一个或多个字符
*:代表任何字符
例如:#.a会匹配a.a ab.a abc.a
*a会匹配a.a b.a c.a
publisher:
# -*- coding:utf-8 -*- #!/user/bin/env.python #Author:Mr Wu import pika,sys #建立连接 connection = pika.BlockingConnection( pika.ConnectionParameters( host="localhost" ) ) #声明管道 channel = connection.channel() #声明转发器 channel.exchange_declare( exchange="sss", exchange_type="topic", ) key = sys.argv[1] if len(sys.argv) > 1 else "animal.info" message = " ".join(sys.argv[2:]) or "hello world" #生产消息 channel.basic_publish( exchange="sss", routing_key=key, body=message, ) print("send message: %s"%message) connection.close()
subscriber:
# -*- coding:utf-8 -*- #!/user/bin/env.python #Author:Mr Wu import pika,sys #建立连接 connection = pika.BlockingConnection( pika.ConnectionParameters( host="localhost", ) ) #建立管道 channel = connection.channel() #声明转发器 channel.exchange_declare( exchange="sss", exchange_type="topic", ) #生成一个随机队列 result = channel.queue_declare(exclusive=True) #在此queue的消费者断开后,会自动删除这个queue #获取生成的queue的名字 queue_name = result.method.queue bind_keys = sys.argv[1:] if not bind_keys: sys.stderr.write("Usage:%s [binding_key]...\\n"%sys.argv[0]) sys.exit(1) for bind_key in bind_keys: channel.queue_bind( exchange="sss", queue=queue_name, routing_key=bind_key, ) def callback(ch,method,properties,body): print("receive message: ",body) print("routingKey: %s"%method.routing_key) channel.basic_consume( callback, queue=queue_name, ) print("wait for message from %s"%queue_name) channel.start_consuming()
headers
通过headers来决定把消息发给那些queue
代码暂定。。。。。。。。。。
远程过程调用:RPC(remote procedure call)
简单来说就是客户端发送命令-----》服务端处理命令,结果-----》客户端
程序逻辑:
client
# -*- coding:utf-8 -*- #!/user/bin/env.python #Author:Mr Wu import pika,time,uuid class RpcClient(object): def __init__(self): self.connection = pika.BlockingConnection( pika.ConnectionParameters( host="localhost", ) ) self.channel = self.connection.channel() self.result = self.channel.queue_declare(exclusive=True) self.callback_queue = self.result.method.queue self.channel.basic_consume( self.on_response, queue=self.callback_queue, ) def on_response(self,ch,method,properties,body): if properties.correlation_id == self.corr_id: self.response = body.decode() def on_request(self,command): self.response = None self.corr_id = str(uuid.uuid4()) self.channel.basic_publish( exchange="", routing_key="rpc_queue", properties=pika.BasicProperties( reply_to=self.callback_queue, correlation_id=self.corr_id, ), body=command, ) while self.response is None: self.connection.process_data_events() #相当于非阻塞版的start_consume() print("waiting for message......") time.sleep(0.5) return self.response obj = RpcClient() command = input(">>>:") result = obj.on_request(command) print(result) \'\'\' output: >>>:ipconfig waiting for message...... waiting for message...... Windows IP 配置 以太网适配器 以太网: 媒体状态 . . . . . . . . . . . . : 媒体已断开连接 连接特定的 DNS 后缀 . . . . . . . : 无线局域网适配器 本地连接* 3: 媒体状态 . . . . . . . . . . . . : 媒体已断开连接 连接特定的 DNS 后缀 . . . . . . . : 无线局域网适配器 本地连接* 12: 媒体状态 . . . . . . . . . . . . : 媒体已断开连接 连接特定的 DNS 后缀 . . . . . . . : 以太网适配器 VMware Network Adapter VMnet1: 连接特定的 DNS 后缀 . . . . . . . : 本地链接 IPv6 地址. . . . . . . . : fe80::7ddd:a3e4:9673:512e%7 IPv4 地址 . . . . . . . . . . . . : 192.168.74.1 子网掩码 . . . . . . . . . . . . : 255.255.255.0 默认网关. . . . . . . . . . . . . : 以太网适配器 VMware Network Adapter VMnet8: 连接特定的 DNS 后缀 . . . . . . . : 本地链接 IPv6 地址. . . . . . . . : fe80::4cc1:5dc2:37f:7e7b%16 IPv4 地址 . . . . . . . . . . . . : 192.168.43.1 子网掩码 . . . . . . . . . . . . : 255.255.255.0 默认网关. . . . . . . . . . . . . : 无线局域网适配器 WLAN: 连接特定的 DNS 后缀 . . . . . . . : IPv6 地址 . . . . . . . . . . . . : 2001:da8:215:8f01:8d1d:db29:3fd2:c6d6 临时 IPv6 地址. . . . . . . . . . : 2001:da8:215:8f01:1d2a:e364:2e17:ffa3 本地链接 IPv6 地址. . . . . . . . : fe80::8d1d:db29:3fd2:c6d6%10 IPv4 地址 . . . . . . . . . . . . : 10.122.252.64 子网掩码 . . . . . . . . . . . . : 255.255.192.0 默认网关. . . . . . . . . . . . . : fe80::274:9cff:fe7d:fadb%10 10.122.192.1 \'\'\'
server
# -*- coding:utf-8 -*- #!/user/bin/env.python #Author:Mr Wu import pika,os class RpcServer(object): def __init__(self): self.connection = pika.BlockingConnection( pika.ConnectionParameters( host="localhost", ) ) self.channel = self.connection.channel() self.channel.queue_declare(queue="rpc_queue") self.channel.basic_consume( self.on_response, queue="rpc_queue", ) def on_request(self,command): result = os.popen(command).read() return result def on_response(self,ch,method,properties,body): command = body.decode() return_result = self.on_request(command) self.channel.basic_publish( exchange="", routing_key=properties.reply_to, properties=pika.BasicProperties( correlation_id=properties.correlation_id, ), body=return_result, ) obj = RpcServer() obj.channel.start_consuming()
Redis缓存数据库
介绍
Redis是完全开源免费的,遵守BSD协议,是一个高性能的key-value数据库。
Redis与其他key-value缓存产品有以下三个特点:
*Redis支持数据的持久化,可以将内存中的数据保存在磁盘中,重启的时候可以再次加载进行使用。
*Redis不仅仅支持简单的key-value类型的数据,同时还提供list、set、zset、hash等数据结构的存储。
*Redis支持数据的备份,即master-slave模式的数据备份。
优势
*性能极高,Redis能读的速度是110000次/s,写的速度是81000次/s 以上是关于python第十一周:RabbitMQRedis的主要内容,如果未能解决你的问题,请参考以下文章 Python自动化开发学习的第十一周----WEB框架--Django基础 广东海洋大学 电子1151 孔yanfei python语言程序设计 第十一周