第五章
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了第五章相关的知识,希望对你有一定的参考价值。
目录
- 消息队列
- redis
1.消息队列
RabbitMQ安装
- 安装erlang http://www.erlang.org/
- 安装rabbitmq http://www.rabbitmq.com/download.html
- 安装 pika https://pika.readthedocs.io/en/0.10.0/
1 #!/usr/bin/env python 2 import pika 3 connection = pika.BlockingConnection(pika.ConnectionParameters(‘localhost‘)) 4 channel = connection.channel() 5 channel.queue_declare(queue=‘hello‘) 6 channel.basic_publish(exchange=‘‘, 7 routing_key=‘hello‘, 8 body=‘Hello World‘ 9 ) 10 print("[x] sent ‘Hello World!‘") 11 connection.close()
1 #!/usr/bin/env python 2 __author__ = ‘han‘ 3 import pika 4 connection = pika.BlockingConnection(pika.ConnectionParameters(‘localhost‘)) 5 channel = connection.channel() 6 channel.queue_declare(queue=‘hello‘) 7 def callback(ch,method,properties,body): 8 print("[x]Received %r"% body) 9 channel.basic_consume(callback, 10 queue=‘hello‘, 11 ) #no_ack=True 不和服务端确认 12 print(‘[*]Waiting for messages.to exit press CTRL+C‘) 13 channel.start_consuming()
-
持久化
1 import pika 2 #建立连接 3 connection = pika.BlockingConnection(pika.ConnectionParameters(‘127.0.0.1‘)) #建立连接 4 #建立管道 5 channel = connection.channel() 6 #建立消息队列,durable=True 开启持久化 7 channel.queue_declare(queue=‘hello1‘,durable=True) 8 channel.basic_publish(exchange=‘‘, 9 routing_key=‘hello1‘, #指定消息队列 10 body=‘Hello World‘, #发送消息 11 properties=pika.BasicProperties( #消息持久化 12 delivery_mode=2, 13 ) 14 ) 15 print("[x] sent ‘Hello World!‘") 16 connection.close()
1 import pika 2 #建立连接 3 connection = pika.BlockingConnection(pika.ConnectionParameters(‘127.0.0.1‘)) 4 #建立管道 5 channel = connection.channel() 6 #建立消息队列,durable=True开启持久化 7 channel.queue_declare(queue=‘hello1‘,durable=True) 8 def callback(ch,method,properties,body): 9 print("[x]Received %r"% body) 10 ch.basic_ack(delivery_tag=method.delivery_tag) #收到消息回复生产者 11 channel.basic_qos(prefetch_count=1) #负载均衡(处理完一个消息在发送下一个) 12 channel.basic_consume(callback, 13 queue=‘hello1‘, 14 ) #no_ack=True 不回复生产者 15 print(‘[*]Waiting for messages.to exit press CTRL+C‘) 16 channel.start_consuming()
查看消息持久化
cmd D:\Program Files\RabbitMQ Server\rabbitmq_server-3.6.11\sbin>rabbitmqctl.bat list_queues Listing queues
-
广播
- fanout: 所有bind到此exchange的queue都可以接收消息
- direct: 通过routingKey和exchange决定的那个唯一的queue可以接收消息
- topic:所有符合routingKey(此时可以是一个表达式)的routingKey所bind的queue可以接收消息
fanout
1 #!/usr/bin/env python 2 # _*_ encoding:utf-8 _*_ 3 import pika 4 import sys 5 connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘127.0.0.1‘)) 6 channel = connection.channel() 7 channel.exchange_declare(exchange=‘logs‘, 8 exchange_type="fanout") #指定广播模式,不同版本可能是type="fanout" 9 message = ‘ ‘.join(sys.argv[1:]) or "info:Hello World!" #发送的内容 argv1 或者 "info:Hello World!" 10 channel.basic_publish(exchange=‘logs‘, 11 routing_key=‘‘, 12 body=message) 13 print("[x] Sent %r" % message) 14 connection.close()
1 #!/usr/bin/env python 2 # _*_ encoding:utf-8 _*_ 3 import pika 4 __author__ =‘han‘ 5 connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘127.0.0.1‘)) 6 channel = connection.channel() 7 channel.exchange_declare(exchange=‘logs‘, 8 exchange_type=‘fanout‘) #指定广播模式,不同版本可能是type="fanout" 9 result = channel.queue_declare(exclusive=True) #不指定queue名字,rabbit会随机分配一个名字,exclusive=True会在使用此queue的消费者断开后,自动将queue删除 10 queue_name = result.method.queue #消息队列的名称 11 channel.queue_bind(exchange=‘logs‘, 12 queue=queue_name) #制定消息队列的名称 13 print(‘ [*] Waiting for logs. To exit press CTRL+C‘) 14 def callback(ch, method, properties,body): 15 print(" [x] %r" % body) 16 channel.basic_consume(callback, 17 queue=queue_name, 18 no_ack=True) 19 channel.start_consuming()
direct
1 #接收指定的内容 2 3 #!/usr/bin/ebv python 4 # _*_ coding:utf-8 _*_ 5 import pika 6 import sys 7 connection = pika.BlockingConnection(pika.ConnectionParameters(host="127.0.0.1")) 8 channel = connection.channel() 9 channel.exchange_declare(exchange=‘direct_logs‘, #设置direct模式 10 exchange_type=‘direct‘) 11 severity = sys.argv[1] if len(sys.argv) > 1 else ‘info‘ #设置默认info 12 message = ‘ ‘.join(sys.argv[2:]) or ‘Hello World!‘ 13 channel.basic_publish(exchange=‘direct_logs‘, #调用direct模式 14 routing_key=severity, #调用severity 15 body=message) 16 print(" [x] Sent %r:%r" % (severity,message)) 17 connection.close()
1 #!/usr/bin/env python 2 # _*_ encoding:utf-8 _*_ 3 import pika 4 import sys 5 connection = pika.BlockingConnection(pika.ConnectionParameters( 6 host=‘127.0.0.1‘)) 7 channel = connection.channel() 8 channel.exchange_declare(exchange=‘direct_logs‘, #设置direct模式 9 exchange_type=‘direct‘) 10 result = channel.queue_declare(exclusive=True) 11 queue_name = result.method.queue 12 severities = sys.argv[1:] 13 if not severities: 14 sys.stderr.write("usege: %s [info] [warning][reeor]\n"%sys.argv[0]) 15 sys.exit(1) 16 for severity in severities: 17 channel.queue_bind(exchange=‘direct_logs‘, #调用direct模式 18 queue=queue_name, 19 routing_key=severity) #调用severities 20 print(‘ [*] Waiting for logs. To exit press CTRL+C‘) 21 def callback(ch,method,properties,body): 22 print(" [x] %r:%r" % (method.routing_key, body)) 23 channel.basic_consume(callback, 24 queue=queue_name, 25 no_ack=True) 26 channel.start_consuming()
topic
1 #过滤模式 2 3 import pika 4 import sys 5 connection = pika.BlockingConnection(pika.ConnectionParameters( 6 host=‘localhost‘)) 7 channel = connection.channel() 8 channel.exchange_declare(exchange=‘topic_logs‘, 9 exchange_type=‘topic‘) 10 routing_key = sys.argv[1] if len(sys.argv) > 1 else ‘anonymous.info‘ 11 message = ‘ ‘.join(sys.argv[2:]) or ‘Hello World!‘ 12 channel.basic_publish(exchange=‘topic_logs‘, 13 routing_key=routing_key, 14 body=message) 15 print(" [x] Sent %r:%r" % (routing_key, message)) 16 connection.close()
1 import pika 2 import sys 3 connection = pika.BlockingConnection(pika.ConnectionParameters( 4 host=‘localhost‘)) 5 channel = connection.channel() 6 channel.exchange_declare(exchange=‘topic_logs‘, 7 exchange_type=‘topic‘) 8 result = channel.queue_declare(exclusive=True) 9 queue_name = result.method.queue 10 binding_keys = sys.argv[1:] 11 if not binding_keys: 12 sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0]) 13 sys.exit(1) 14 for binding_key in binding_keys: 15 channel.queue_bind(exchange=‘topic_logs‘, 16 queue=queue_name, 17 routing_key=binding_key) 18 print(‘ [*] Waiting for logs. To exit press CTRL+C‘) 19 def callback(ch, method, properties, body): 20 print(" [x] %r:%r" % (method.routing_key, body)) 21 channel.basic_consume(callback, 22 queue=queue_name, 23 no_ack=True) 24 channel.start_consuming()
1. To receive all the logs run: python receive_logs_topic.py "#" 2. To receive all logs from the facility "kern": python receive_logs_topic.py "kern.*" 3. Or if you want to hear only about "critical" logs: python receive_logs_topic.py "*.critical" 4. You can create multiple bindings: python receive_logs_topic.py "kern.*" "*.critical" 5. And to emit a log with a routing key "kern.critical" type: python emit_log_topic.py "kern.critical" "A critical kernel error"
rpc
1 #!/usr/bin/env python 2 # _*_ encoding:utf-8 _*_ 3 import pika 4 import sys 5 connection = pika.BlockingConnection(pika.ConnectionParameters( 6 host=‘127.0.0.1‘)) 7 channel = connection.channel() 8 channel.queue_declare(queue=‘rpc_queue‘) #设置rpc模式 9 #斐波那契 10 def fib(n): 11 if n == 0: 12 return 0 13 elif n == 1: 14 return 1 15 else: 16 return fib(n-1) + fib(n-2) 17 #2. 18 def on_request(ch,method,props,body): 19 n = int(body) #斐波那契 20 print("[.]fib(%s)"%n) 21 response = fib(n) 22 #3.发送 23 ch.basic_publish(exchange=‘‘, 24 routing_key=props.reply_to, #接收到的随机queue 25 properties=pika.BasicProperties(correlation_id= #接收到的uuid 26 props.correlation_id), 27 body = str(response)) #发送消息 28 ch.basic_ack(delivery_tag = method.delivery_tag) #等待确认 29 #1.开始接收 30 channel.basic_consume(on_request,queue=‘rpc_queue‘) 31 print("[x] Awaiting RPC requests") 32 channel.start_consuming()
1 #!/usr/bin/env python 2 # _*_ encoding:utf-8 _*_ 3 import pika 4 import sys 5 import uuid 6 class FibonacCiRpcClient(object): 7 def __init__(self): 8 self.connection = pika.BlockingConnection(pika.ConnectionParameters( 9 host = ‘127.0.0.1‘)) 10 self.channel= self.connection.channel() 11 result = self.channel.queue_declare(exclusive=True) #设置随机queue 12 self.callback_queue = result.method.queue 13 self.channel.basic_consume(self.on_response,no_ack=True, #设置接收 14 queue=self.callback_queue) 15 def on_response(self,ch,method,props,body): 16 if self.corr_id == props.correlation_id: #确认server和clinet的uuid是否相等 17 self.response = body 18 #1.发送 19 def call(self,n): 20 self.response = None 21 self.corr_id = str(uuid.uuid4()) 22 self.channel.basic_publish(exchange=‘‘, 23 routing_key=‘rpc_queue‘, #设置rpc模式 24 properties=pika.BasicProperties( 25 reply_to = self.callback_queue, #指定随机queue 26 correlation_id= self.corr_id), #uuid 27 body=str(n)) #发送消息 28 #2.没有数据开始接收 29 while self.response is None: 30 self.connection.process_data_events() #设置非堵塞模式 31 return int(self.response) #返回结果 32 fibonacci_rpc = FibonacCiRpcClient() 33 print(" [x] Requesting fib(5)") 34 response = fibonacci_rpc.call(5) 35 print(‘[.]Got%r‘%response)
2.redis
redis安装
yum install epel-release yum install redis redis-server /etc/redis.conf redis-cli -h 192.168.80.11
-
python执行rerdis命令
redis-py提供两个类Redis和StrictRedis用于实现Redis的命令,StrictRedis用于实现大部分官方的命令,并使用官方的语法和命令,Redis是StrictRedis的子类,用于向后兼容旧版本的redis-py。
#!/usr/bin/env python # _*_ encoding:utf-8 _*_ __author__ = ‘han‘ import redis r = redis.Redis(host=‘192.168.80.12‘,port=6379) r.set(‘foo‘,‘Bar‘) #设置一个字典 print(r.get(‘foo‘))
-
链接池
redis-py使用connection pool来管理对一个redis server的所有连接,避免每次建立、释放连接的开销。默认,每个Redis实例都会维护一个自己的连接池。可以直接建立一个连接池,然后作为参数Redis,这样就可以实现多个Redis实例共享一个连接池。
#!/usr/bin/env python # _*_ encoding:utf-8 _*_ __author__ = ‘han‘ import redis pool = redis.ConnectionPool(host=‘192.168.80.12‘,port=6379) r = redis.Redis(connection_pool=pool) r.set(‘foo‘,‘bar‘) print(r.get(‘foo‘))
-
字典
查看修改等..
set name han #设置一个字典 mset naem8 zhang name9 li #设置多个 192.168.80.12:6379> keys * #查看所有 1) "naem" 2) "aeg" 3) "name" 4) "foo" set test2 100 ex 5 #设置5秒 setex name4 3 alex #设置alex3秒 set 2b alex nx #不存在设置 set 2b abc xx #存在设置 getset name9 jack #设置新值并获取原来的值 getrange name9 0 2 #切片 setrange name9 0 l #修改
1 1.set naem11 alex 2 ord(‘a‘) 3 97 4 5 bin(97) 6 ‘0b1100001‘ 7 8 127 64 32 16 8 4 2 1 9 0 97 1 1 0 0 0 0 1 10 11 192.168.80.12:6379> setbit naem11 6 1 12 (integer) 0 13 14 192.168.80.12:6379> get naem11 15 "clex"
setbit
模拟用户在线 192.168.80.12:6379> setbit n5 1000 1 (integer) 0 192.168.80.12:6379> setbit n5 55 1 (integer) 0 192.168.80.12:6379> setbit n5 600 1 (integer) 0 192.168.80.12:6379> bitcount n5 (integer) 3 共3 192.168.80.12:6379> getbit n5 55 查看 (integer) 1
incrbyfloat
1 #支持小数 2 3 192.168.80.12:6379> set n6 1.0 4 OK 5 192.168.80.12:6379> incrbyfloat n6 1.2 6 "2.2" 7 192.168.80.12:6379> incrbyfloat n6 1.4 8 "3.6"
incr
模拟用户在线 192.168.80.12:6379> incr name8 (integer) 1 192.168.80.12:6379> incr name8 (integer) 2 192.168.80.12:6379> incr name8 加 (integer) 3 192.168.80.12:6379> decr name8 减 (integer) 2 192.168.80.12:6379> decr name8 (integer) 1
append
#追加 192.168.80.12:6379> set name9 zhang 设置字典 OK 192.168.80.12:6379> append name9 san 追加为zhangsan (integer) 8 192.168.80.12:6379> get name9 "zhangsan"
-
Hash
以上是关于第五章的主要内容,如果未能解决你的问题,请参考以下文章