Python 11:RabbitMQredis
Posted 水无
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Python 11:RabbitMQredis相关的知识,希望对你有一定的参考价值。
Python 11:RabbitMQ、redis
1、RabbitMQ
2、缓存数据库:redis
一、RabbitMQ
1、基本信息
市面上的MQ:rabbitMQ、ZeroMQ、ActiveMQ
rabbitMQ是基于erlang 开发的安装前需要先装环境,默认端口5672
2、简单通信
1 import pika
2 connection = pika.BlockingConnection(
3 pika.ConnectionParameters(‘localhost‘))
4 channel = connection.channel() #声明一个管道
5 #声明queue
6 channel.queue_declare(queue=‘hello‘)
7 channel.basic_publish(exchange=‘‘,
8 routing_key=‘hello‘, #queue 名字
9 body=‘Hello World!‘) #消息内容
10 print(" [x] Sent ‘Hello World!‘")
11 connection.close()
1 import pika
2 connection = pika.BlockingConnection(pika.ConnectionParameters(‘localhost‘))
3 channel = connection.channel()
4 ‘‘‘
5 已经在produce里声明队列了,为什么这里还要再次声明?
6 如果你确认这个queue已经确实被声明过了,那就可以不写
7 但是我们不确认是send先运行还是receive先运行
8 所以在这里再次声明一下,如果先前已经声明过了,这里就会pass,如果没有声明,这里就会起到声明queue的作用
9 ‘‘‘
10 channel.queue_declare(queue=‘hello‘) #再次声明队列
11 def callback(ch, method, properties, body):
12 print(" [x] Received %r" % body)
13 ch.basic_ack(delivery_tag=method.delivery_tag) #与发送方确认收到消息(参考no_ack = True)
14 channel.basic_consume( #消费消息
15 callback, #如果收到消息就调用callback函数处理消息
16 queue=‘hello‘,
17 no_ack=True) #no ackownledgement #不确认消息,接收端无论是否收到消息都不与发送端确认
18 print(‘ [*] Waiting for messages. To exit press CTRL+C‘)
19 channel.start_consuming()
在这种模式下,RabbitMQ会默认把send发的消息依次分发给各个receive,跟负载均衡差不多
3、消息的持久化
channel.queue_declare(queue
=
‘hello‘
, durable
=
True
)
发送方接收方管道都要加上:durable
=
True参数,作用:持久化queue,rabbitMQ宕机后queue仍会存在,但是消息会丢失
channel.basic_publish(exchange
=
‘‘,r
outing_key
=
"task_queue"
,
body
=
message,
properties
=
pika.BasicProperties(
delivery_mode
=
2
,
))
properties参数
:properties
=
pika.BasicProperties(
delivery_mode
=
2,)
delivery_mode
=
2 就表示把queue中的消息持久化
channel.basic_qos(prefetch_count
=
1)
1 import pika 2 import sys 3 connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘localhost‘)) 4 channel = connection.channel() 5 channel.queue_declare(queue=‘task_queue‘, durable=True) 6 message = ‘ ‘.join(sys.argv[1:]) or "Hello World!" 7 channel.basic_publish(exchange=‘‘,routing_key=‘task_queue‘, body=message,properties=pika.BasicProperties(delivery_mode = 2,)) 8 print(" [x] Sent %r" % message) 9 connection.close()
1 import pika 2 import time 3 connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘localhost‘)) 4 channel = connection.channel() 5 channel.queue_declare(queue=‘task_queue‘, durable=True) 6 print(‘ [*] Waiting for messages. To exit press CTRL+C‘) 7 def callback(ch, method, properties, body): 8 print(" [x] Received %r" % body) 9 time.sleep(body.count(b‘.‘)) 10 print(" [x] Done") 11 ch.basic_ack(delivery_tag = method.delivery_tag) 12 channel.basic_qos(prefetch_count=1) 13 channel.basic_consume(callback,queue=‘task_queue‘) 14 channel.start_consuming()
5、广播
1 import pika 2 import sys 3 connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘localhost‘)) 4 channel = connection.channel() 5 #在producer和consumer中分别声明一次以保证所要使用的exchange存在 6 channel.exchange_declare(exchange=‘logs‘,exchange_type= ‘fanout‘) 7 message = ‘ ‘.join(sys.argv[1:]) or "info: Hello World!" 8 channel.basic_publish(exchange=‘logs‘,routing_key=‘‘,body=message) 9 print(" [x] Sent %r" % message) 10 connection.close()
1 import pika 2 connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘localhost‘)) 3 channel = connection.channel() 4 channel.exchange_declare(exchange=‘logs‘,exchange_type=‘fanout‘) #在send和receive中分别声明一次以保证所要使用的exchange存在 5 #在不同的send和receive间共享queue时指明queue的name是重要的 6 #但某些时候,比如日志系统,需要接收所有的log message而非一个子集 7 #而且仅对当前的message 流感兴趣,对于过时的message不感兴趣,那么 8 #可以申请一个临时队列这样,每次连接到RabbitMQ时会以一个随机的名字生成 9 #一个新的空的queue,将exclusive置为True,这样在consumer从RabbitMQ断开后会删除该queue 10 result = channel.queue_declare(exclusive=True) #不指定queue名字,rabbit会随机分配一个名字,exclusive=True会在使用此queue的消费者断开后,自动将queue删除 11 queue_name = result.method.queue #用于获取临时queue的name 12 channel.queue_bind(exchange=‘logs‘,queue=queue_name) 13 #exchange与queue之间的关系成为binding 14 #binding告诉exchange将message发送该哪些queue 15 print(‘ [*] Waiting for logs. To exit press CTRL+C‘) 16 def callback(ch, method, properties, body): 17 print(" [x] %r" % body) 18 #从指定地queue中consume message且不确认 19 channel.basic_consume(callback,queue=queue_name,no_ack=True) 20 channel.start_consuming()
TypeError: exchange_declare() got an unexpected keyword argument ‘type‘问题
解决办法:pika版本不同导致的用法不同,把type换成exchange_type
2、direct:有选择的接受消息(exchange_type=direct)
1 import pika 2 import sys 3 connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘localhost‘)) 4 channel = connection.channel() 5 channel.exchange_declare(exchange=‘direct_logs‘,exchange_type=‘direct‘) 6 severity = sys.argv[1] if len(sys.argv) > 1 else ‘info‘ 7 message = ‘ ‘.join(sys.argv[2:]) or ‘Hello World!‘ 8 channel.basic_publish(exchange=‘direct_logs‘,routing_key=severity,body=message) 9 print(" [x] Sent %r:%r" % (severity, message)) 10 connection.close()
1 import pika 2 import sys 3 4 connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘localhost‘)) 5 channel = connection.channel() 6 channel.exchange_declare(exchange=‘direct_logs‘,exchange_type=‘direct‘) 7 result = channel.queue_declare(exclusive=True) 8 queue_name = result.method.queue 9 severities = sys.argv[1:] 10 if not severities: 11 sys.stderr.write("Usage: %s [info] [warning] [error] " % sys.argv[0]) 12 sys.exit(1) 13 for severity in severities: 14 channel.queue_bind(exchange=‘direct_logs‘,queue=queue_name,routing_key=severity) 15 print(‘ [*] Waiting for logs. To exit press CTRL+C‘) 16 def callback(ch, method, properties, body): 17 print(" [x] %r:%r" % (method.routing_key, body)) 18 channel.basic_consume(callback,queue=queue_name,no_ack=True) 19 channel.start_consuming()
接收端启动必须带参数:python receive.py info warning
3、topic:更细致的消息过滤
1 import pika 2 import sys 3 connection = pika.BlockingConnection(pika.ConnectionParameters( host=‘localhost‘)) 4 channel = connection.channel() 5 channel.exchange_declare(exchange=‘topic_logs‘,exchange_type=‘topic‘) 6 routing_key = sys.argv[1] if len(sys.argv) > 1 else ‘anonymous.info‘ 7 message = ‘ ‘.join(sys.argv[2:]) or ‘Hello World!‘ 8 channel.basic_publish(exchange=‘topic_logs‘, routing_key=routing_key,body=message) 9 print(" [x] Sent %r:%r" % (routing_key, message)) 10 connection.close()
1 import pika 2 import sys 3 connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘localhost‘)) 4 channel = connection.channel() 5 channel.exchange_declare(exchange=‘topic_logs‘,exchange_type=‘topic‘) 6 result = channel.queue_declare(exclusive=True) 7 queue_name = result.method.queue 8 binding_keys = sys.argv[1:] 9 if not binding_keys: 10 sys.stderr.write("Usage: %s [binding_key]... " % sys.argv[0]) 11 sys.exit(1) 12 for binding_key in binding_keys: 13 channel.queue_bind(exchange=‘topic_logs‘,queue=queue_name,routing_key=binding_key) 14 print(‘ [*] Waiting for logs. To exit press CTRL+C‘) 15 def callback(ch, method, properties, body): 16 print(" [x] %r:%r" % (method.routing_key, body)) 17 channel.basic_consume(callback,queue=queue_name,no_ack=True) 18 channel.start_consuming()
接收端启动必须带参数:python receive.py info *.warning *.rabbit
6、rpc(Remote procedure call)
1 import pika 2 import time 3 connection = pika.BlockingConnection(pika.ConnectionParameters( 4 host=‘localhost‘)) 5 channel = connection.channel() 6 channel.queue_declare(queue=‘rpc_queue‘) 7 def fib(n): 8 if n == 0: 9 return 0 10 elif n == 1: 11 return 1 12 else: 13 return fib(n-1) + fib(n-2) 14 def on_request(ch, method, props, body): 15 n = int(body) 16 print(" [.] fib(%s)" % n) 17 response = fib(n) 18 ch.basic_publish(exchange=‘‘, 19 routing_key=props.reply_to, 20 properties=pika.BasicProperties(correlation_id = 21 props.correlation_id), 22 body=str(response)) 23 ch.basic_ack(delivery_tag = method.delivery_tag) 24 channel.basic_qos(prefetch_count=1) 25 channel.basic_consume(on_request, queue=‘rpc_queue‘) 26 print(" [x] Awaiting RPC requests") 27 channel.start_consuming()
import pika import uuid class FibonacciRpcClient(object): def __init__(self): self.connection = pika.BlockingConnection(pika.ConnectionParameters( host=‘localhost‘)) self.channel = self.connection.channel() result = self.channel.queue_declare(exclusive=True) self.callback_queue = result.method.queue self.channel.basic_consume(self.on_response, no_ack=True, queue=self.callback_queue) def on_response(self, ch, method, props, body): if self.corr_id == props.correlation_id: self.response = body def call(self, n): self.response = None self.corr_id = str(uuid.uuid4()) self.channel.basic_publish(exchange=‘‘, routing_key=‘rpc_queue‘, properties=pika.BasicProperties( reply_to = self.callback_queue, correlation_id = self.corr_id, ), body=str(n)) while self.response is None: self.connection.process_data_events() return int(self.response) fibonacci_rpc = FibonacciRpcClient() print(" [x] Requesting fib(30)") response = fibonacci_rpc.call(30) print(" [.] Got %r" % response)
二、缓存数据库:redis
1、介绍
NoSQL(NoSQL = Not Only SQL ),泛指非关系型的数据库,NoSQL数据库的产生就是为了解决大规模数据集合多重数据种类带来的挑战,尤其是大数据应用难题。
NoSQL数据库在以下的这几种情况下比较适用:
1、数据模型比较简单;
2、需要灵活性更强的IT系统;
3、对数据库性能要求较高;
4、不需要高度的数据一致性;
5、对于给定key,比较容易映射复杂值的环境。
2、分类
分类 | 举例 | 应用场景 | 数据模型 | 优点 | 缺点 |
键值(key-value) | Tokyo Cabinet/Tyrant, Redis, Voldemort, Oracle BDB | 内容缓存,主要用于处理大量数据的高访问负载,也用于一些日志系统等等 | Key 指向 Value 的键值对,通常用hash table来实现 | 查找速度快 | 数据无结构化,通常只被当作字符串或者二进制数据 |
列存储数据库 | Cassandra, HBase, Riak | 分布式的文件系统 | 以列簇式存储,将同一列数据存在一起 | 查找速度快,可扩展性强,更容易进行分布式扩展 | 功能相对局限 |
文档型数据库 | CouchDB, MongoDb | Web应用(与Key-Value类似,Value是结构化的,不同的是数据库能够了解Value的内容) | Key-Value对应的键值对,Value为结构化数据 | 数据结构要求不严格,表结构可变,不需要像关系型数据库一样需要预先定义表结构 | 查询性能不高,而且缺乏统一的查询语法。 |
图形(Graph)数据库 | Neo4J, InfoGrid, Infinite Graph | 社交网络,推荐系统等。专注于构建关系图谱 | 图结构 | 利用图结构相关算法。比如最短路径寻址,N度关系查找等 | 很多时候需要对整个图做计算才能得出需要的信息,而且这种结构不太好做分布式的集群方案。 |
3、redis介绍
redis是主流的key-value nosql 数据库之一。和Memcached类似,它支持存储的value类型相对更多,包括string(字符串)、list(链表)、set(集合)、zset(sorted set --有序集合)和hash(哈希类型)。这些数据类型都支持push/pop、add/remove及取交集并集和差集及更丰富的操作,而且这些操作都是原子性的。在此基础上,redis支持各种不同方式的排序。与memcached一样,为了保证效率,数据都是缓存在内存中。区别的是redis会周期性的把更新的数据写入磁盘或者把修改操作写入追加的记录文件,并且在此基础上实现了master-slave(主从)同步。
Redis优点
1、异常快速 :因为数据存在内存中,类似于HashMap,HashMap的优势就是查找和操作的时间复杂度都是O(1)
2、支持丰富的数据类型 : Redis支持最大多数开发人员已经知道如列表,集合,可排序集合,哈希等数据类型。
3、操作都是原子的 :所谓的原子性就是对数据的更改要么全部执行,要么全部不执行。
4、丰富的特性:可用于缓存,消息,按key设置过期时间,过期后将会自动删除
4、redis使用Python链接
1 import redis 2 r = redis.Redis(host=‘localhost‘, port=6379) 3 r.set(‘key‘, ‘value‘) 4 print r.get(‘key‘)
1 import redis 2 pool = redis.ConnectionPool(host=‘localhost‘, port=6379) 3 r = redis.Redis(connection_pool=pool) 4 r.set(‘key‘, ‘value‘) 5 print r.get(‘key‘)
5、String操作
redis中的String在在内存中按照一个name对应一个value来存储
1 #set(name, value, ex=None, px=None, nx=False, xx=False) 2 ‘‘‘ 3 在Redis中设置值,默认,不存在则创建,存在则修改 4 参数: 5 ex,过期时间(秒) 6 px,过期时间(毫秒) 7 nx,如果设置为True,则只有name不存在时,当前set操作才执行 8 xx,如果设置为True,则只有name存在时,岗前set操作才执行 9 ‘‘‘ 10 11 # setnx(name, value) 12 ‘‘‘设置值,只有name不存在时,执行设置操作(添加)‘‘‘ 13 14 #setex(name, value, time) 15 ‘‘‘设置值、参数、time:过期时间(数字秒 或 timedelta对象)‘‘‘ 16 17 #psetex(name, time_ms, value) 18 ‘‘‘设置值、参数、time_ms,过期时间(数字毫秒 或 timedelta对象)‘‘‘ 19 20 #mset(*args, **kwargs) 21 ‘‘‘批量设置值: 22 mset(k1=‘v1‘, k2=‘v2‘) 23 mget({‘k1‘: ‘v1‘, ‘k2‘: ‘v2‘})‘‘‘ 24 25 #get(name) 获取值 26 27 #mget(keys, *args) 28 ‘‘‘批量获取 29 mget(‘ylr‘, ‘wupeiqi‘) 30 r.mget([‘ylr‘, ‘wupeiqi‘])‘‘‘ 31 32 #getset(name, value) 设置新值并获取原来的值 33 34 #getrange(key, start, end) 35 ‘‘‘ 获取子序列(根据字节获取,非字符) 36 参数: 37 name,Redis 的 name 38 start,起始位置(字节) 39 end,结束位置(字节) 40 ‘‘‘ 41 # setrange(name, offset, value) 42 ‘‘‘修改字符串内容,从指定字符串索引开始向后替换(新值太长时,则向后添加) 43 参数: 44 offset,字符串的索引,字节(一个汉字三个字节) 45 value,要设置的值‘‘‘ 46 47 # strlen(name) 48 ‘‘‘返回name对应值的字节长度(一个汉字3个字节)‘‘‘ 49 50 # incr(self, name, amount=1) 51 ‘‘‘自增 name对应的值,当name不存在时,则创建name=amount,否则,则自增。 52 参数: 53 name,Redis的name 54 amount,自增数(必须是整数)‘‘‘ 55 56 # 注:同incrby 57 # incrbyfloat(self, name, amount=1.0) 58 ‘‘‘自增 name对应的值,当name不存在时,则创建name=amount,否则,则自增。 59 参数: 60 name,Redis的name 61 amount,自增数(浮点型)‘‘‘ 62 63 # decr(self, name, amount=1) 64 ‘‘‘自减 name对应的值,当name不存在时,则创建name=amount,否则,则自减。 65 参数: 66 name,Redis的name 67 amount,自减数(整数)‘‘‘ 68 69 # append(key, value) 70 ‘‘‘在redis name对应的值后面追加内容 71 参数: 72 key, redis的name 73 value, 要追加的字符串‘‘‘ 74 75 76 # setbit(name, offset, value) 77 ‘‘‘(对name对应值的二进制表示的位进行操作 78 参数: 79 name,redis的name 80 offset,位的索引(将值变换成二进制后再进行索引) 81 value,值只能是 1 或 0)‘‘‘ 82 83 # getbit(name, offset) 84 ‘‘‘(获取name对应的值的二进制表示中的某位的值 (0或1))‘‘‘ 85 86 # bitcount(key, start=None, end=None) 87 ‘‘‘(获取name对应的值的二进制表示中 1 的个数 88 参数: 89 key,Redis的name 90 start,位起始位置 91 end,位结束位置) 92 ‘‘‘ 93 import redis 94 r = redis.Redis(host=‘localhost‘, port=6379) 95 r.set(‘n1‘, ‘abc‘) 96 ord("a") #97 97 print(bin(97)) #0b1100001 98 r.setbit("n1",6,1) #把二进制的第六为改为1, a>c 99 print(r.get("n1")) #b‘cbc‘ 100 101 print(r.getbit("n1",5)) #0 获取第5位的值 102 103 print(r.bitcount("n1")) # 11 "cbc"对应的二进制中有多少个1 104 105 #应用:用最省空间的方式,存储在线用户数及分别是哪些用户在线
6、Hash操作
hash表现形式上有些像pyhton中的dict,可以存储一组关联性较强的数据
1 # hset(name, key, value) 2 ‘‘‘name对应的hash中设置一个键值对(不存在,则创建;否则,修改) 3 参数: 4 name,redis的name 5 key,name对应的hash中的key 6 value,name对应的hash中的value 7 注: 8 hsetnx(name, key, value),当name对应的hash中不存在当前key时则创建(相当于添加)‘‘‘ 9 10 # hmset(name, mapping) 11 ‘‘‘在name对应的hash中批量设置键值对 12 参数: 13 name,redis的name 14 mapping,字典,如:{‘k1‘:‘v1‘, ‘k2‘: ‘v2‘} 15 如: 16 r.hmset(‘xx‘, {‘k1‘:‘v1‘, ‘k2‘: ‘v2‘})‘‘‘ 17 18 # hget(name,key) 19 ‘‘‘在name对应的hash中获取根据key获取value‘‘‘ 20 21 # hmget(name, keys, *args) 22 ‘‘‘‘在name对应的hash中获取多个key的值 23 参数: 24 name,reids对应的name 25 keys,要获取key集合,如:[‘k1‘, ‘k2‘, ‘k3‘] 26 *args,要获取的key,如:k1,k2,k3 27 如: 28 r.mget(‘xx‘, [‘k1‘, ‘k2‘]) 29 或 30 print r.hmget(‘xx‘, ‘k1‘, ‘k2‘)‘‘‘ 31 32 # hgetall(name) 33 ‘‘‘获取name对应hash的所有键值‘‘‘ 34 35 # hlen(name) 36 ‘‘‘获取name对应的hash中键值对的个数‘‘‘ 37 38 # hkeys(name) 39 ‘‘‘获取name对应的hash中所有的key的值‘‘‘ 40 41 # hvals(name) 42 ‘‘‘获取name对应的hash中所有的value的值‘‘‘ 43 44 # hexists(name, key) 45 ‘‘‘检查name对应的hash是否存在当前传入的key‘‘‘ 46 47 # hdel(name,*keys) 48 ‘‘‘将name对应的hash中指定key的键值对删除‘‘‘ 49 50 # hincrby(name, key, amount=1) 51 ‘‘‘自增name对应的hash中的指定key的值,不存在则创建key=amount 52 参数: 53 name,redis中的name 54 key, hash对应的key 55 amount,自增数(整数)‘‘‘ 56 # hincrbyfloat(name, key, amount=1.0) 57 ‘‘‘自增name对应的hash中的指定key的值,不存在则创建key=amount 58 参数: 59 name,redis中的name 60 key, hash对应的key 61 amount,自增数(浮点数) 62 自增name对应的hash中的指定key的值,不存在则创建key=amount‘‘‘ 63 64 # hscan(name, cursor=0, match=None, count=None) 65 ‘‘‘增量式迭代获取,对于数据大的数据非常有用,hscan可以实现分片的获取数据,并非一次性将数据全部获取完,从而放置内存被撑爆 66 参数: 67 name,redis的name 68 cursor,游标(基于游标分批取获取数据) 69 match,匹配指定key,默认None 表示所有的key 70 count,每次分片最少获取个数,默认None表示采用Redis的默认分片个数 71 如: 72 第一次:cursor1, data1 = r.hscan(‘xx‘, cursor=0, match=None, count=None) 73 第二次:cursor2, data1 = r.hscan(‘xx‘, cursor=cursor1, match=None, count=None) 74 直到返回值cursor的值为0时,表示数据已经通过分片获取完毕‘‘‘ 75 76 # hscan_iter(name, match=None, count=None) 77 ‘‘‘利用yield封装hscan创建生成器,实现分批去redis中获取数据 78 参数: 79 match,匹配指定key,默认None 表示所有的key 80 count,每次分片最少获取个数,默认None表示采用Redis的默认分片个数 81 如: 82 for item in r.hscan_iter(‘xx‘): 83 print item 84 ‘‘‘
7、List操作
redis中的List在在内存中按照一个name对应一个List来存储
1 # lpush(name,values) 2 ‘‘‘在name对应的list中添加元素,每个新的元素都添加到列表的最左边 3 如: 4 r.lpush(‘oo‘, 11,22,33) 5 保存顺序为: 33,22,11 6 扩展: 7 rpush(name, values) 表示从右向左操作‘‘‘ 8 # lpushx(name,value) 9 ‘‘‘在name对应的list中添加元素,只有name已经存在时,值添加到列表的最左边 10 更多: 11 rpushx(name, value) 表示从右向左操作‘‘‘ 12 # llen(name) 13 ‘‘‘name对应的list元素的个数‘‘‘ 14 # linsert(name, where, refvalue, value)) 15 ‘‘‘在name对应的列表的某一个值前或后插入一个新值 16 参数: 17 name,redis的name 18 where,BEFORE或AFTER 19 refvalue,标杆值,即:在它前后插入数据 20 value,要插入的数据‘‘‘ 21 # r.lset(name, index, value) 22 ‘‘‘对name对应的list中的某一个索引位置重新赋值 23 参数: 24 name,redis的name 25 index,list的索引位置 26 value,要设置的值‘‘‘ 27 # r.lrem(name, num, value) 28 ‘‘‘在name对应的list中删除指定的值 29 参数: 30 name,redis的name 31 value,要删除的值 32 num, num=0,删除列表中所有的指定值; 33 # num=2,从前到后,删除2个; 34 # num=-2,从后向前,删除2个‘‘‘ 35 # lpop(name) 36 ‘‘‘在name对应的列表的左侧获取第一个元素并在列表中移除,返回值则是第一个元素 37 更多: 38 # rpop(name) 表示从右向左操作‘‘‘ 39 # lindex(name, index) 40 ‘‘‘在name对应的列表中根据索引获取列表元素‘‘‘ 41 # lrange(name, start, end) 42 ‘‘‘在name对应的列表分片获取数据 43 # 参数: 44 # name,redis的name 45 # start,索引的起始位置 46 # end,索引结束位置‘‘‘ 47 # ltrim(name, start, end) 48 ‘‘‘在name对应的列表中移除没有在start-end索引之间的值 49 参数: 50 name,redis的name 51 start,索引的起始位置 52 end,索引结束位置‘‘‘ 53 # rpoplpush(src, dst) 54 ‘‘‘从一个列表取出最右边的元素,同时将其添加至另一个列表的最左边 55 参数: 56 src,要取数据的列表的name 57 dst,要添加数据的列表的name‘‘‘ 58 # blpop(keys, timeout) 59 ‘‘‘将多个列表排列,按照从左到右去pop对应列表的元素 60 参数: 61 keys,redis的name的集合 62 timeout,超时时间,当元素所有列表的元素获取完之后,阻塞等待列表内有数据的时间(秒), 0 表示永远阻塞 63 更多: 64 r.brpop(keys, timeout),从右向左获取数据‘‘‘ 65 # brpoplpush(src, dst, timeout=0) 66 ‘‘‘从一个列表的右侧移除一个元素并将其添加到另一个列表的左侧 67 参数: 68 src,取出并要移除元素的列表对应的name 69 dst,要插入元素的列表对应的name 70 timeout,当src对应的列表中没有数据时,阻塞等待其有数据的超时时间(秒),0 表示永远阻塞‘‘‘
8、Set操作
Set集合就是不允许重复的列表
1 # sadd(name,values) 2 ‘‘‘name对应的集合中添加元素‘‘‘ 3 # scard(name) 4 ‘‘‘获取name对应的集合中元素个数‘‘‘ 5 # sdiff(keys, *args) 6 ‘‘‘在第一个name对应的集合中且不在其他name对应的集合的元素集合‘‘‘ 7 # sdiffstore(dest, keys, *args) 8 ‘‘‘获取第一个name对应的集合中且不在其他name对应的集合,再将其新加入到dest对应的集合中‘‘‘ 9 # sinter(keys, *args) 10 ‘‘‘获取多一个name对应集合的交集‘‘‘ 11 # sinterstore(dest, keys, *args) 12 ‘‘‘获取多一个name对应集合的交集,再讲其加入到dest对应的集合中‘‘‘ 13 # sismember(name, value) 14 ‘‘‘检查value是否是name对应的集合的成员‘‘‘ 15 # smembers(name) 16 ‘‘‘获取name对应的集合的所有成员‘‘‘ 17 # smove(src, dst, value) 18 ‘‘‘将某个成员从一个集合中移动到另外一个集合‘‘‘ 19 # spop(name) 20 ‘‘‘从集合的右侧(尾部)移除一个成员,并将其返回‘‘‘ 21 # srandmember(name, numbers) 22 ‘‘‘从name对应的集合中随机获取 numbers 个元素‘‘‘ 23 # srem(name, values) 24 ‘‘‘在name对应的集合中删除某些值‘‘‘ 25 # sunion(keys, *args) 26 ‘‘‘获取多一个name对应的集合的并集‘‘‘ 27 # sunionstore(dest,keys, *args) 28 ‘‘‘获取多一个name对应的集合的并集,并将结果保存到dest对应的集合中‘‘‘ 29 # sscan(name, cursor=0, match=None, count=None) 30 # sscan_iter(name, match=None, count=None) 31 ‘‘‘同字符串的操作,用于增量迭代分批获取元素,避免内存消耗太大‘‘‘
有序集合,在集合的基础上,为每元素排序;元素的排序需要根据另外一个值来进行比较,所以,对于有序集合,每一个元素有两个值,即:值和分数,分数专门用来做排序
1 # zadd(name, *args, **kwargs) 2 ‘‘‘在name对应的有序集合中添加元素 3 如: 4 zadd(‘zz‘, ‘n1‘, 1, ‘n2‘, 2) 5 或 6 zadd(‘zz‘, n1=11, n2=22)‘‘‘ 7 # zcard(name) 8 ‘‘‘获取name对应的有序集合元素的数量‘‘‘ 9 # zcount(name, min, max) 10 ‘‘‘获取name对应的有序集合中分数 在 [min,max] 之间的个数‘‘‘ 11 # zincrby(name, value, amount) 12 ‘‘‘自增name对应的有序集合的 name 对应的分数‘‘‘ 13 # r.zrange( name, start, end, desc=False, withscores=False, score_cast_func=float) 14 ‘‘‘按照索引范围获取name对应的有序集合的元素 15 参数: 16 name,redis的name 17 start,有序集合索引起始位置(非分数) 18 end,有序集合索引结束位置(非分数) 19 desc,排序规则,默认按照分数从小到大排序 20 withscores,是否获取元素的分数,默认只获取元素的值 21 score_cast_func,对分数进行数据转换的函数 22 更多: 23 从大到小排序 24 zrevrange(name, start, end, withscores=False, score_cast_func=float) 25 按照分数范围获取name对应的有序集合的元素 26 zrangebyscore(name, min, max, start=None, num=None, withscores=False, score_cast_func=float) 27 从大到小排序 28 zrevrangebyscore(name, max, min, start=None, num=None, withscores=False, score_cast_func=float)‘‘‘ 29 # zrank(name, value) 30 ‘‘‘获取某个值在 name对应的有序集合中的排行(从 0 开始) 31 更多: 32 zrevrank(name, value),从大到小排序‘‘‘ 33 # zrem(name, values) 34 ‘‘‘删除name对应的有序集合中值是values的成员 35 如:zrem(‘zz‘, [‘s1‘, ‘s2‘])‘‘‘ 36 # zremrangebyrank(name, min, max) 37 ‘‘‘根据排行范围删除‘‘‘ 38 # zremrangebyscore(name, min, max) 39 ‘‘‘根据分数范围删除‘‘‘ 40 # zscore(name, value) 41 ‘‘‘获取name对应有序集合中 value 对应的分数‘‘‘ 42 # zinterstore(dest, keys, aggregate=None) 43 ‘‘‘获取两个有序集合的交集,如果遇到相同值不同分数,则按照aggregate进行操作 44 aggregate的值为: SUM MIN MAX‘‘‘ 45 # zunionstore(dest, keys, aggregate=None) 46 ‘‘‘获取两个有序集合的并集,如果遇到相同值不同分数,则按照aggregate进行操作 47 aggregate的值为: SUM MIN MAX‘‘‘ 48 # zscan(name, cursor=0, match=None, count=None, score_cast_func=float) 49 # zscan_iter(name, match=None, count=None,score_cast_func=float) 50 ‘‘‘同字符串相似,相较于字符串新增score_cast_func,用来对分数进行操作‘‘‘
9、其他常用操作
1 # delete(*names) 2 ‘‘‘根据删除redis中的任意数据类型‘‘‘ 3 # exists(name) 4 ‘‘‘检测redis的name是否存在‘‘‘ 5 # keys(pattern=‘*‘) 6 ‘‘‘根据模型获取redis的name 7 更多: 8 KEYS * 匹配数据库中所有 key 。 9 KEYS h?llo 匹配 hello , hallo 和 hxllo 等。 10 KEYS h*llo 匹配 hllo 和 heeeeello 等。 11 KEYS h[ae]llo 匹配 hello 和 hallo ,但不匹配 hillo‘‘‘ 12 # expire(name ,time) 13 ‘‘‘为某个redis的某个name设置超时时间‘‘‘ 14 # rename(src, dst) 15 ‘‘‘对redis的name重命名为‘‘‘ 16 # move(name, db)) 17 ‘‘‘将redis的某个值移动到指定的db下‘‘‘ 18 # randomkey() 19 ‘‘‘随机获取一个redis的name(不删除)‘‘‘ 20 # type(name) 21 ‘‘‘获取name对应值的类型‘‘‘ 22 # scan(cursor=0, match=None, count=None) 23 # scan_iter(match=None, count=None) 24 ‘‘‘同字符串操作,用于增量迭代获取key‘‘‘
10、管道
redis-py默认在执行每次请求都会创建(连接池申请连接)和断开(归还连接池)一次连接操作,如果想要在一次请求中指定多个命令,则可以使用pipline实现一次请求指定多个命令,并且默认情况下一次pipline 是原子性操作
1 import redis 2 3 pool = redis.ConnectionPool(host=‘localhost‘, port=6379) 4 r = redis.Redis(connection_pool=pool) 5 # pipe = r.pipeline(transaction=False) 6 pipe = r.pipeline(transaction=True) 7 pipe.set(‘n1‘, ‘aaa‘) 8 pipe.set(‘n2‘, ‘bbb‘) 9 10 pipe.execute()
以上是关于Python 11:RabbitMQredis的主要内容,如果未能解决你的问题,请参考以下文章