@@@文章内容参照老男孩教育 Alex金角大王,武Sir银角大王@@@
RabbitMQ队列
下载安装http://www.rabbitmq.com/install-windows.html
RabbitMQ 教程官网:http://www.rabbitmq.com/getstarted.html
RabbitMQ:erlang语言 开发的。
Python中连接RabbitMQ的模块:pika 、Celery(分布式任务队列) 、haigha可以维护很多的队列
几个概念说明:
- Broker:简单来说就是消息队列服务器实体
- Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列
- Queue:消息队列载体,每个消息都会被投入到一个或多个队列
- Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来
- Routing Key:路由关键字,exchange根据这个关键字进行消息投递
- vhost:虚拟主机,一个broker里可以开设多个vhost,用作不同用户的权限分离
- producer:消息生产者,就是投递消息的程序
- consumer:消息消费者,就是接受消息的程序
- channel:消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务
安装Python RabbitMQ模块
1 pip install pika 2 3 # 源码 https://pypi.python.org/pypi/pika
一、最简单的队列通信
1 # 发送端 2 import pika 3 4 connection = pika.BlockingConnection(pika.ConnectionParameters(‘localhost‘)) 5 channel = connection.channel() 6 7 channel.queue_declare(queue=‘hello‘) # 声明queue 8 channel.basic_publish(exchange=‘‘, routing_key=‘hello‘, body=‘Hello World!‘) 9 print(‘[x] sent "Hello World!"‘) 10 connection.close() 11 12 13 14 # 接收端 15 import pika 16 17 connection = pika.BlockingConnection(pika.ConnectionParameters(‘localhost‘)) 18 channel = connection.channel() 19 20 channel.queue_declare(queue=‘hello‘) 21 22 def callback(ch, method, properties, body): 23 print(‘[x] Received %r‘ %body) 24 25 channel.basic_consume(callback, queue=‘hello‘, no_ack=True) 26 27 print(‘[*] Waiting for messages. To exit press CTRL+C‘) 28 channel.start_consuming()
远程连接rabbitmq server
1 # 首先在rabbitmq server上创建一个用户 2 sudo rabbitmqctl add_user admin admin 3 4 # 同时还要配置权限,允许从外面访问 5 sudo rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"
客户端连接时需要配置认证参数
1 credentials = pika.PlainCredentials(‘admin‘,‘admin‘) 2 3 connection = pika.BlockingConnection(pika.ConnectionParameters(‘10.0.0.10‘,5672,‘/‘,credentials)) 4 5 channel = connection.channel()
二、Work Queues
在这种模式下,RabbitMQ会默认把P发的消息依次分发给各个消费者(C),跟负载均衡差不多
1 # 消息提供者 2 import pika, time 3 4 connection = pika.BlockingConnection(pika.ConnectionParameters(‘localhost‘)) 5 channel = connection.channel() 6 7 channel.queue_declare(queue=‘hello2‘) 8 9 message = ‘Hello World! %s‘ %time.ctime() 10 channel.basic_publish(exchange=‘‘, routing_key=‘hello2‘, 11 body=message, 12 properties=pika.BasicProperties(delivery_mode=2,)) # 让消息持久 13 14 print(‘[x] Sent %r‘ %message) 15 connection.close()
1 # 消费者 2 import pika, time 3 4 connection = pika.BlockingConnection(pika.ConnectionParameters(‘localhost‘)) 5 channel = connection.channel() 6 7 channel.queue_declare(queue=‘hello2‘) 8 9 def callback(ch, method, properties,body): 10 print(‘[x] Received %r‘ %body) 11 time.sleep(10) 12 print(‘[x] Done‘) 13 print(‘method.delivery_tag‘,method.delivery_tag) 14 ch.basic_ack(delivery_tag=method.delivery_tag) 15 16 channel.basic_consume(callback, queue=‘hello2‘,) 17 18 print(‘[*] Waiting for messages. To exit press CTRL+C‘) 19 channel.start_consuming()
此时,先启动消息生产者,然后再分别启动3个消费者,通过生产者多发送几条消息,你会发现,这几条消息会被依次分配到各个消费者身上
当RabbitMQ退出或崩溃时,它将会忘记队列和消息,除非您告诉它不要这样做。需要有两件事来确保消息不会丢失:我们需要将队列和消息标记为持久的
我们需要确保RabbitMQ永远不会丢失队列。为了实现这一目的,我们需要将其声明为持久的:
1 channel.queue_declare(queue=‘hello‘, durable=True)
三、消息公平分发
如果Rabbit只管按顺序把消息发到各个消费者身上,不考虑消费者负载的话,很可能出现,一个机器配置不高的消费者那里堆积了很多消息处理不完,同时配置高的消费者却一直很轻松。为解决此问题,可以在各个消费者端,配置perfetch=1,意思就是告诉RabbitMQ在我这个消费者当前消息还没处理完的时候就不要再给我发新消息了
1 channel.basic_qos(prefetch_count=1)
1 # 生产者端 2 import pika, time 3 4 connection = pika.BlockingConnection(pika.ConnectionParameters(‘localhost‘)) 5 channel = connection.channel() 6 7 channel.queue_declare(queue=‘hello2‘,durable=True) 8 9 message = ‘Hello World! %s‘ %time.ctime() 10 channel.basic_publish(exchange=‘‘, routing_key=‘hello2‘, 11 body=message, 12 properties=pika.BasicProperties(delivery_mode=2,)) 13 14 print(‘[x] Sent %r‘ %message) 15 connection.close()
1 # 消费者端 2 import pika, time 3 4 connection = pika.BlockingConnection(pika.ConnectionParameters(‘localhost‘)) 5 channel = connection.channel() 6 7 channel.queue_declare(queue=‘hello2‘,durable=True) 8 9 def callback(ch, method, properties,body): 10 print(ch) 11 time.sleep(10) 12 print(‘[x] Received %r‘ %body) 13 print(‘[x] Done‘) 14 ch.basic_ack(delivery_tag=method.delivery_tag) 15 16 channel.basic_qos(prefetch_count=1) 17 channel.basic_consume(callback, queue=‘hello2‘,) 18 19 print(‘[*] Waiting for messages. To exit press CTRL+C‘) 20 channel.start_consuming()
四、广播模式
Exchange在定义的时候是有类型的,以决定到底是哪些Queue符合条件,可以接收消息
- fanout:所有bind到此exchange的queue都可以接收消息
- direct:通过routingkey和exchange决定的那个唯一的queue可以接收消息
- topic:所有符合routingkey(此时可以是一个表达式)的routingkey所bind的queue可以接收消息表达式符号说明:#代表一个或多个字符,*代表任何字符
例:#.a会匹配a.a,aa.a,aaa.a等
*.a会匹配a.a,b.a,c.a等
注:使用RoutingKey为#,Exchange Type为topic的时候相当于使用fanout - headers:通headers来决定把消息发给哪些queue
1 import pika 2 import sys 3 4 connection = pika.BlockingConnection(pika.ConnectionParameters(‘localhost‘)) 5 channel = connection.channel() 6 7 channel.exchange_declare(exchange=‘logs‘, exchange_type=‘fanout‘) 8 9 message = ‘ ‘.join(sys.argv[1:]) or "info: Hello World!" 10 channel.basic_publish(exchange=‘logs‘, routing_key=‘‘, body=message) 11 print(" [x] Sent %r" % message) 12 connection.close()
1 import pika 2 3 connection = pika.BlockingConnection(pika.ConnectionParameters(‘localhost‘)) 4 channel = connection.channel() 5 6 channel.exchange_declare(exchange=‘logs‘, exchange_type=‘fanout‘) 7 8 # 不指定queue名字,rabbit会随机分配一个名字,exclusive=True会在使用此queue的消费者断开后,自动将queue删除 9 result = channel.queue_declare(exclusive=True) 10 queue_name = result.method.queue 11 channel.queue_bind(exchange=‘logs‘, queue=queue_name) 12 13 print(‘ [*] Waiting for logs. To exit press CTRL+C‘) 14 15 def callback(ch, method, properties, body): 16 print(" [x] %r" % body) 17 18 channel.basic_consume(callback, queue=queue_name,) 19 20 channel.start_consuming()
五、有选择的接收消息(exchange type=direct)
RabbitMQ还支持根据关键字发送,即:队列绑定关键字,发送者将数据根据关键字发送到消息exchange,exchange根据关键字判定应该将数据发送至指定队列。
1 import pika, sys 2 3 connection = pika.BlockingConnection(pika.ConnectionParameters(‘localhost‘)) 4 channel = connection.channel() 5 6 channel.exchange_declare(exchange=‘direct_logs‘, exchange_type=‘direct‘) 7 8 severity = sys.argv[1] if len(sys.argv) > 1 else ‘info‘ 9 message = ‘ ‘.join(sys.argv[2:]) or ‘Hello World!‘ 10 channel.basic_publish(exchange=‘direct_logs‘, routing_key=severity, body=message) 11 print(‘[x] Sent %r:%r‘ %(severity, message)) 12 13 connection.close()
1 import pika, sys 2 3 connection = pika.BlockingConnection(pika.ConnectionParameters(‘localhost‘)) 4 channel = connection.channel() 5 6 channel.exchange_declare(exchange=‘direct_logs‘, exchange_type=‘direct‘) 7 result = channel.queue_declare(exclusive=True) 8 queue_name = result.method.queue 9 10 severities = sys.argv[1:] 11 if not severities: 12 sys.stderr.write(‘Usage:%s [info] [warning] [error]\\n‘ %sys.argv[0]) 13 sys.exit(1) 14 15 for severity in severities: 16 channel.queue_bind(exchange=‘direct_logs‘, queue=queue_name, routing_key=severity) 17 18 print(‘[*] Waiting for logs. To exit press CTRL+C‘) 19 20 def callback(ch,method,properties,body): 21 print(‘[x] %r:%r‘ %(method.routing_key, body)) 22 23 channel.basic_consume(callback, queue=queue_name, no_ack=True) 24 25 channel.start_consuming()
六、更细致的消息过滤
1 import pika, sys 2 3 connection = pika.BlockingConnection(pika.ConnectionParameters(‘localhost‘)) 4 channel = connection.channel() 5 6 channel.exchange_declare(exchange=‘topic_logs‘, exchange_type=‘topic‘) 7 8 routing_key = sys.argv[1] if len(sys.argv) > 1 else ‘anonymous.info‘ 9 message = ‘ ‘.join(sys.argv[2:]) or ‘Hello World!‘ 10 11 channel.basic_publish(exchange=‘topic_logs‘, routing_key=routing_key, body=message) 12 print(‘[x] Sent %r:%r‘ %(routing_key, message)) 13 14 connection.close()
1 import pika, sys 2 3 connection = pika.BlockingConnection(pika.ConnectionParameters(‘localhost‘)) 4 channel = connection.channel() 5 6 channel.exchange_declare(exchange=‘topic_logs‘, exchange_type=‘topic‘) 7 result = channel.queue_declare(exclusive=True) 8 queue_name = result.method.queue 9 10 binding_keys = sys.argv[1:] 11 if not binding_keys: 12 sys.stderr.write(‘Usage:%s [binding_key]...\\n‘ %sys.argv[0]) 13 sys.exit(1) 14 15 for binding_key in binding_keys: 16 channel.queue_bind(exchange=‘topic_logs‘, queue=queue_name, routing_key=binding_key) 17 18 print(‘[*] Waiting for logs. To exit press CTRL+C‘) 19 20 def callback(ch,method,properties,body): 21 print(‘[x] %r:%r‘ %(method.routing_key, body)) 22 23 channel.basic_consume(callback, queue=queue_name, no_ack=True) 24 25 channel.start_consuming()
To receive all the logs run: 要接收所有的日志: python receive_logs_topic.py "#" To receive all logs from the facility "kern": 从设备的“kern”接收所有的日志: python receive_logs_topic.py "kern.*" Or if you want to hear only about "critical" logs: 如果你只想听到关于“关键”的日志: python receive_logs_topic.py "*.critical" You can create multiple bindings: 您可以创建多个绑定 python receive_logs_topic.py "kern.*" "*.critical" And to emit a log with a routing key "kern.critical" type: 然后用一个连接键“kern”发出一个日志。关键”类型: python emit_log_topic.py "kern.critical" "A critical kernel error"
Remote Procedure Call(RPC)
为了说明如何使用RPC服务,我们将创建一个简单的客户端类。它将公开一个名为调用的方法,该方法发送RPC请求和阻塞,直到收到应答:
1 fibonacci_rpc = FibonacciRpcClient() 2 result = fibonacci_rpc.call(4) 3 print("fib(4) is %r" % result)
1 # RPC server 2 import pika, time 3 4 connection = pika.BlockingConnection(pika.ConnectionParameters(‘localhost‘)) 5 channel = connection.channel() 6 7 channel.queue_declare(queue=‘rpc_queue‘) 8 9 def fib(n): 10 if n == 0: 11 return 0 12 elif n == 1: 13 return 1 14 else: 15 return fib(n-1) + fib(n-2) 16 17 18 def on_request(ch, method, props, body): 19 n = int(body) 20 print(‘[.]fib(%s)‘ %n) 21 response = fib(n) 22 ch.basic_publish(exchange=‘‘, routing_key=props.reply_to, 23 properties=pika.BasicProperties(correlation_id=props.correlation_id), 24 body=str(response)) 25 ch.basic_ack(delivery_tag=method.delivery_tag) 26 27 channel.basic_qos(prefetch_count=1) 28 channel.basic_consume(on_request, queue=‘rpc_queue‘) 29 print(‘[x] Awaiting RPC requests‘) 30 channel.start_consuming()
1 # RPC client 2 import pika, uuid 3 4 class FibonacciRpcClient(object): 5 def __init__(self): 6 self.connection = pika.BlockingConnection(pika.ConnectionParameters(‘localhost‘)) 7 self.channel =self.connection.channel() 8 result = self.channel.queue_declare(exclusive=True) 9 self.callback_queue = result.method.queue 10 self.channel.basic_consume(self.on_response, no_ack=True, queue=self.callback_queue) 11 12 def on_response(self, ch, method, props, body): 13 if self.corr_id == props.correlation_id: 14 self.response = body 15 16 def call(self, n): 17 self.response = None 18 self.corr_id = str(uuid.uuid4()) 19 self.channel.basic_publish(exchange=‘‘, routing_key=‘rpc_queue‘, 20 properties=pika.BasicProperties(reply_to=self.callback_queue, correlation_id=self.corr_id), 21 body=str(n)) 22 while self.response is None: 23 self.connection.process_data_events() 24 return int(self.response) 25 26 fibonacci_rpc = FibonacciRpcClient() 27 28 print(‘[x] Requesting fib(30)‘) 29 response = fibonacci_rpc.call(30) 30 print(‘[.] Got %r‘ %response)
Redis缓存数据库
NoSQL(NoSQL = Not Only SQL),意即“不仅仅是SQL”,泛指非关系型的数据库,随着互联网web2.0网站的兴起,传统的关系数据库在应付web2.0网站,特别是超大规模和高并发的SNS类型的web2.0纯动态网站已经显得力不从心,暴露了很多难以克服的问题,而非关系型的数据库则由于其本身的特点得到了非常迅速的发展。NoSQL数据库的产生就是为了解决大规模数据集合多重数据种类带来的挑战,尤其是大数据应用难题
NoSQL数据库的四大分类
- 键值(Key-Value)存储数据库 如:Tokyo Cabinet/Tyrant, Redis, Voldemort, Oracle BDB
- 列存储数据库 如:Cassandra, HBase, Riak
- 文档型数据库 如:CouchDB, MongoDb. 国内也有文档型数据库SequoiaDB,已经开源
- 图形(Graph)数据库 如:Neo4J, InfoGrid, Infinite Graph
一、Redis介绍
redis是业界主流的key-value nosql 数据库之一。和Memcached类似,它支持存储的value类型相对更多,包括string(字符串)、list(链表)、set(集合)、zset(sorted set --有序集合)和hash(哈希类型)。这些数据类型都支持push/pop、add/remove及取交集并集和差集及更丰富的操作,而且这些操作都是原子性的。在此基础上,redis支持各种不同方式的排序。与memcached一样,为了保证效率,数据都是缓存在内存中。区别的是redis会周期性的把更新的数据写入磁盘或者把修改操作写入追加的记录文件,并且在此基础上实现了master-slave(主从)同步
Redis优点
- 异常快速 : Redis是非常快的,每秒可以执行大约110000设置操作,81000个/每秒的读取操作
- 支持丰富的数据类型 : Redis支持最大多数开发人员已经知道如列表,集合,可排序集合,哈希等数据类型
- 操作都是原子的 : 所有 Redis 的操作都是原子,从而确保当两个客户同时访问 Redis 服务器得到的是更新后的值(最新值)
- MultiUtility工具:Redis是一个多功能实用工具,可以在很多如:缓存,消息传递队列中使用(Redis原生支持发布/订阅),在应用程序中,如:Web应用程序会话,网站页面点击数等任何短暂的数据
二、安装Redis环境
CentOS7上安装Redis
1.下载Redis $ wget http://download.redis.io/releases/redis-4.0.6.tar.gz 2.解压安装包 $ tar xvfz redis-4.0.6.tar.gz 3.进入解压之后的目录,进行编译 $ cd redis-4.0.6 $ make $ make install 4.启动Redis $redis-server 5.查看Redis是否还在运行 $redis-cli 这将打开一个 Redis 提示符,如下图所示: redis 127.0.0.1:6379> 在上面的提示信息中:127.0.0.1 是本机的IP地址,6379是 Redis 服务器运行的端口。现在输入 PING 命令,如下图所示: redis 127.0.0.1:6379> ping PONG 这说明现在你已经成功地在计算机上安装了 Redis
Python安装Redis
pip install redis
三、Redis API的使用
1、连接方式
操作模式
redis-py提供两个类Redis和StrictRedis用于实现Redis的命令,StrictRedis用于实现大部分官方的命令,并使用官方的语法和命令,Redis是StrictRedis的子类,用于向后兼容旧版本的redis-py
1 import redis 2 3 r = redis.Redis(host=‘10.0.0.10‘, port=6379) 4 r.set(‘foo‘, ‘bar‘) 5 print(r.get(‘foo‘))
连接池
redus-py使用connection pool来管理对一个redis server的所有连接,避免每次建立、释放连接的开销。默认,每个Redis实例都会维护一个自已的连接池。可以直接建立一个连接池,然后作为参数Redis,这样就可以实现多个Redis实例共享一个连接池。
2、操作
String操作
redis中的String在内存中按照一个name对应一个value来存储。
set(name, value, ex=None, px=None, nx=False, xx=False)
1 # 在Redis中设置值,默认,不存在则创建,存在则修改 2 # 参数: 3 # ex,过期时间(秒) 4 # px,过期时间(毫秒) 5 # nx,如果设置为True,则只有name不存在时,当前set操作才执行 6 # xx,如果设置为True,则只有name存在时,岗前set操作才执行 7 8 set name ‘abc‘
setnx(name, value)
1 # 设置值,只有name不存在时,执行设置操作(添加) 2 setnx names ‘abc‘
setex(name, time, value)
1 # 设置键的值和到期时间 2 # 参数: 3 # time,过期时间(数字秒 或 timedelta对象) 4 setex name 10 ‘abc‘
psetex(name, time_ms, walue)
1 # 在关键字的毫秒内设置值和到期时间 2 # 参数: 3 # time_ms,过期时间(数字毫秒 或 timedelta对象) 4 pestex name 200 ‘abc‘
mset(*args, **kwargs)
1 # 批量设置值 2 mset(k1=‘v1‘, k2=‘v2‘) 3 # or 4 mget({‘k1‘: ‘v1‘, ‘k2‘: ‘v2‘})
get(name)
1 # 获取值 2 get name
mget(keys, *args)
1 # 批量获取值 2 mget name names
getset(name, value)
1 # 设置新值并获取原来的值 2 getset name ‘Abc‘
getrange(key, start, end)
1 # 获取子序列(根据字节获取,非字符) 2 # 参数: 3 # name,Redis 的 name 4 # start,起始位置(字节) 5 # end,结束位置(字节) 6 getrange name 1 3
setrange(name, offset, value)
1 # 修改字符串内容,从指定字符串索引开始向后替换(新值太长时,则向后添加) 2 # 参数: 3 # offset,字符串的索引,字节(一个汉字三个字节) 4 # value,要设置的值 5 setrange name 0 ‘Hello‘
setbit(name, offset, value)
1 # 对name对应值的二进制表示的位进行操作 2 3 # 参数: 4 # name,redis的name 5 # offset,位的索引(将值变换成二进制后再进行索引) 6 # value,值只能是 1 或 0 7 setbit names 1 0
*用途举例,用最省空间的方式,存储在线用户数及分别是哪些用户在线
getbit(name, offset)
1 # 获取name对应的值的二进制表示中的某位的值 (0或1)
bitcount(key, start=None, end=None)
1 # 获取name对应的值的二进制表示中 1 的个数 2 # 参数: 3 # key,Redis的name 4 # start,位起始位置 5 # end,位结束位置 6 bitcount name 1 5
strlen(name)
1 # 返回name对应值的字节长度(一个汉字3个字节) 2 strlen name
incr(self, name, amount=1)
1 # 自增 name对应的值,当name不存在时,则创建name=amount,否则,则自增 2 # 参数: 3 # name,Redis的name 4 # amount,自增数(必须是整数) 5 # 注:同incrby 6 incr names
incrbyfloat(self, name, amount=1.0)
1 # 自增 name对应的值,当name不存在时,则创建name=amount,否则,则自增 2 # 参数: 3 # name,Redis的name 4 # amount,自增数(浮点型)
decr(self, name, amount=1)
1 # 自减 name对应的值,当name不存在时,则创建name=amount,否则,则自减 2 # 参数: 3 # name,Redis的name 4 # amount,自减数(整数)
append(key, value)
1 # 在redis name对应的值后面追加内容 2 3 # 参数: 4 # key, redis的name 5 # value, 要追加的字符串 6 append names ‘123abc‘
Hash操作
hash表现形式上有些像python中的dict,可以存储一组关联性较强的数据
hset(name, key, value)
1 # name对应的hash中设置一个键值对(不存在,则创建;否则,修改) 2 # 参数: 3 # name,redis的name 4 # key,name对应的hash中的key 5 # value,name对应的hash中的value 6 hset name ‘abc‘ ‘abc123‘ 7 # 注: 8 # hsetnx(name, key, value),当name对应的hash中不存在当前key时则创建(相当于添加)
hmset(name, mapping)
# 在name对应的hash中批量设置键值对 # 参数: # name,redis的name # mapping,字典,如:{‘k1‘:‘v1‘, ‘k2‘: ‘v2‘} hmset names ‘id‘ 123 ‘age‘ 22
hget(name, key)
1 # 在name对应的hash中获取根据key获取value 2 hget name ‘abc‘
hmget(name, keys, *args)
1 # 在name对应的hash中获取多个key的值 2 # 参数: 3 # name,reids对应的name 4 # keys,要获取key集合,如:[‘k1‘, ‘k2‘, ‘k3‘] 5 # *args,要获取的key,如:k1,k2,k3 6 hmget names ‘id‘ ‘age‘
hgetall(name)
1 # 获取name对应hash的所有键值 2 hgetall names
hlen(name)
1 # 获取name对应的hash中键值对的个数 2 hlen name
hkeys(name)
1 # 获取name对应的hash中所有的key的值 2 hkeys names
hvals(name)
1 # 获取name对应的hash中所有的value的值 2 hvals names
hexists(name, key)
1 # 检查name对应的hash是否存在当前传入的key 2 hexists name ‘abc‘
hdel(name, *keys)
1 # 将name对应的hash中指定key的键值对删除 2 hdel name ‘abc‘
hincrby(name, key, amount=1)
1 # 自增name对应的hash中的指定key的值,不存在则创建key=amount 2 # 参数: 3 # name,redis中的name 4 # key, hash对应的key 5 # amount,自增数(整数) 6 hincrby names ‘age‘ 5
hincrbyfloat(name, key, amount=1.0)
1 # 自增name对应的hash中的指定key的值,不存在则创建key=amount 2 # 参数: 3 # name,redis中的name 4 # key, hash对应的key 5 # amount,自增数(浮点数) 6 # 自增name对应的hash中的指定key的值,不存在则创建key=amount 7 hincrbyfloat names ‘id‘ 0.23
hscan(name, cursor=0, match=None, count=None)
1 # 增量式迭代获取,对于数据大的数据非常有用,hscan可以实现分片的获取数据, 2 # 并非一次性将数据全部获取完,从而放置内存被撑爆 3 # 参数: 4 # name,redis的name 5 # cursor,游标(基于游标分批取获取数据) 6 # match,匹配指定key,默认None 表示所有的key 7 # count,每次分片最少获取个数,默认None表示采用Redis的默认分片个数 8 hscan names 0 match a*
hscan_iter(name, match=None, count=None)
1 # 利用yield封装hscan创建生成器,实现分批去redis中获取数据 2 # 参数: 3 # match,匹配指定key,默认None 表示所有的key 4 # count,每次分片最少获取个数,默认None表示采用Redis的默认分片个数 5 for item in r.hscan_iter(‘xx‘): 6 print item
List
list操作,redis中的list在内存中按照一个name对应一个list来存储
lpush(name, values)
1 # 在name对应的list中添加元素,每个新的元素都添加到列表的最左边 2 # 如: 3 # r.lpush(‘oo‘, 11,22,33) 4 # 保存顺序为: 33,22,11 5 lpush abc 11 22 33 44 6 7 # 扩展: 8 # rpush(name, values) 表示从右向左操作
lpushx(name, value)
1 # 在name对应的list中添加元素,只有name已经存在时,值添加到列表的最左边 2 # 更多: 3 # rpushx(name, value) 表示从右向左操作 4 lpushx abc 55 66
llen(name)
1 # name对应的list元素的个数 2 llen abc
linsert(name, where, refvlue, value)
1 # 在name对应的列表的某一个值前或后插入一个新值 2 # 参数: 3 # name,redis的name 4 # where,BEFORE或AFTER 5 # refvalue,标杆值,即:在它前后插入数据 6 # value,要插入的数据 7 linsert abc before 33 123456
lset(name, index, value)
1 # 对name对应的list中的某一个索引位置重新赋值 2 # 参数: 3 # name,redis的name 4 # index,list的索引位置 5 # value,要设置的值 6 lset abc 3 00
lrem(name, num, value)
1 # 在name对应的list中删除指定的值 2 # 参数: 3 # name,redis的name 4 # num, num=0,删除列表中所有的指定值; 5 # num=2,从前到后,删除2个; 6 # num=-2,从后向前,删除2个 7 # value,要删除的值 8 lrem abc 0 00
lpop(name)
1 # 在name对应的列表的左侧获取第一个元素并在列表中移除,返回值则是第一个元素 2 lpop abc 3 4 # 更多: 5 # rpop(name) 表示从右向左操作
lindex(name, index)
1 # 在name对应的列表中根据索引获取列表元素 2 lindex abc 1
lrange(name, start, end)
1 # 在name对应的列表分片获取数据 2 # 参数: 3 # name,redis的name 4 # start,索引的起始位置 5 # end,索引结束位置 6 lrange abc 0 -1
ltrim(name, start, end)
1 # 在name对应的列表中移除没有在start-end索引之间的值 2 # 参数: 3 # name,redis的name 4 # start,索引的起始位置 5 # end,索引结束位置 6 ltrim abc 1 3
rpoplpush(src, dst)
1 # 从一个列表取出最右边的元素,同时将其添加至另一个列表的最左边 2 # 参数: 3 # src,要取数据的列表的name 4 # dst,要添加数据的列表的name 5 rpoplpush a b
blpop(keys, timeout)
1 # 将多个列表排列,按照从左到右去pop对应列表的元素 2 # 参数: 3 # keys,redis的name的集合 4 # timeout,超时时间,当元素所有列表的元素获取完之后, 5 # 阻塞等待列表内有数据的时间(秒), 0 表示永远阻塞 6 blpop b 4 7 8 # 更多: 9 # r.brpop(keys, timeout),从右向左获取数据
brpoplpush(src, dst, timeout=0)
1 # 从一个列表的右侧移除一个元素并将其添加到另一个列表的左侧 2 # 参数: 3 # src,取出并要移除元素的列表对应的name 4 # dst,要插入元素的列表对应的name 5 # timeout,当src对应的列表中没有数据时,阻塞等待其有数据的超时时间(秒),0 表示永远阻塞 6 brpoplpush a b 4
Set集合操作
set操作,set集合就是不允许重复的列表
sadd(name, values)
1 # name对应的集合中添加元素 2 sadd a 11 22 33 44 55
scard(name)
1 # 获取name对应的集合中元素个数 2 scard a
sdiff(key, *args)
1 # 在第一个name对应的集合中且不在其他name对应的集合的元素集合 2 sdiff a b
sdffstore(dest, keys, *args)
1 # 获取第一个name对应的集合中且不在其他name对应的集合, 2 # 再将其新加入到dest对应的集合中 3 sdiffstore c a b
sinter(keys, *args)
1 # 获取多一个name对应集合的交集 2 sinter a b
sinterstore(dest, keys, *args)
1 # 获取多一个name对应集合的并集,再讲其加入到dest对应的集合中 2 sinterstore n a b
sismember(name, value)
1 # 检查value是否是name对应的集合的成员 2 sismember a 22
smembers(name)
1 # 获取name对应的集合的所有成员 2 smembers a
smove(src, dst, value)
1 # 将某个成员从一个集合中移动到另外一个集合 2 smove a b 11
spop(name)
1 # 从集合的右侧(尾部)移除一个成员,并将其返回 2 spop a
srandmember(name, numbers)
1 # 从name对应的集合中随机获取 numbers 个元素 2 srandmember a 1
srem(name, values)
1 # 在name对应的集合中删除某些值 2 srem a 22 33
sunion(keys, *args)
1 # 获取多一个name对应的集合的并集 2 sunion a b
sunionstore(dest, keys, *args)
1 # 获取多一个name对应的集合的并集,并将结果保存到dest对应的集合中 2 sunionstore n a b
sscan(name, cursor=0, match=None, count=None)
sscan_iter(name, match=None, count=None)
1 # 同字符串的操作,用于增量迭代分批获取元素,避免内存消耗太大
有序集合,在集合的基础上,为每个元素排序;元素的排序需要根据另外一个值来进行比较,所以,对于有序集合,每一个元素有两个值,即:值和分数,分数专门用来做排序
zadd(name, *args, **kwargs)
1 # 在name对应的有序集合中添加元素 2 # 如: 3 # zadd(‘zz‘, ‘n1‘, 1, ‘n2‘, 2) 4 # 或 5 # zadd(‘zz‘, n1=11, n2=22) 6 zadd abc 5 k1 2 k2 5 k3 1 k4
zcard(name)
1 # 获取name对应的有序集合元素的数量 2 zcard abc
zcount(name, min, max)
1 # 获取name对应的有序集合中分数 在 [min,max] 之间的个数 2 zcount abc 1 3
zincrby(name, amount, value)
1 # 自增name对应的有序集合的 name 对应的分数 2 zincrby abc 2 k1
zrange(name, start, end, desc=False, withscores=False, score_cast_func=float)
1 # 按照索引范围获取name对应的有序集合的元素 2 # 参数: 3 # name,redis的name 4 # start,有序集合索引起始位置(非分数) 5 # end,有序集合索引结束位置(非分数) 6 # desc,排序规则,默认按照分数从小到大排序 7 # withscores,是否获取元素的分数,默认只获取元素的值 8 # score_cast_func,对分数进行数据转换的函数 9 zrange abc 0 -1 10 11 # 更多: 12 # 从大到小排序 13 # zrevrange(name, start, end, withscores=False, score_cast_func=float) 14 # 按照分数范围获取name对应的有序集合的元素 15 # zrangebyscore(name, min, max, start=None, num=None, withscores=False, score_cast_func=float) 16 # 从大到小排序 17 # zrevrangebyscore(name, max, min, start=None, num=None, withscores=False, score_cast_func=float)
zrank(name, value)
1 # 获取某个值在 name对应的有序集合中的排行(从 0 开始) 2 zrank abc k2 3 4 # 更多: 5 # zrevrank(name, value),从大到小排序
zrem(name, value)
1 # 删除name对应的有序集合中值是values的成员 2 zrem abc k1
zremrangebyrank(name, min, max)
1 # 根据排行范围删除 2 zremrangebyrank abc 2 3
zremrangebyscore(name, min, max)
1 # 根据分数范围删除 2 zremrangebyscore abc 1 2
zscore(name, value)
1 # 获取name对应有序集合中 value 对应的分数 2 zscore abc k2
zinterstore(dest, keys, aggregate=None)
1 # 获取两个有序集合的交集,如果遇到相同值,则按照aggregate进行操作 2 # aggregate的值为: SUM MIN MAX
zunionstore(dest, keys, aggregate=None)
1 # 获取两个有序集合的并集,如果遇到相同值,则按照aggregate进行操作 2 # aggregate的值为: SUM MIN MAX
zscan(name, cursor=0, match=None, count=None, score_cast_func=float)
zscan_iter(name, match=None, count=None, score_cast_func=float)
1 # 同字符串相似,相较于字符串新增score_cast_func,用来对分数进行操作
其他常用操作
delete(*names)
1 # 根据删除redis中的任意数据类型 2 del abc
exists(name)
1 # 检测redis的name是否存在 2 exists name
keys(pattern=‘*‘)
1 # 根据模型获取redis的name 2 keys * 3 4 # 更多: 5 # KEYS * 匹配数据库中所有 key 。 6 # KEYS h?llo 匹配 hello , hallo 和 hxllo 等。 7 # KEYS h*llo 匹配 hllo 和 heeeeello 等。 8 # KEYS h[ae]llo 匹配 hello 和 hallo ,但不匹配 hillo
expire(name, time)
1 # 为某个redis的某个name设置超时时间 2 expire name 1
rename(src, dst)
1 # 对redis的name重命名为 2 rename abc Abc
move(name, db)
1 # 将redis的某个值移动到指定的db下 2 move name 2
randomkey()
# 随机获取一个redis的name(不删除)
type(name)
1 # 获取name对应值的类型 2 type name
scan(cursor=0, match=None, count=None)
scan_iter(match=None, count=None)
1 # 同字符串操作,用于增量迭代获取key
3、管道
redis-py默认在执行每次请求都会创建(连接池申请连接)和断开(归还连接池)一次连接操作,如果想要在一次请求中指定多个命令,则可以使用pipline实现一次请求指定多个命令,并且默认情况下一次pipline 是原子性操作
1 import redis 2 3 pool = redis.ConnectionPool(host=‘10.0.0.10‘, port=6379) 4 r = redis.Redis(connection_pool=pool) 5 6 # pipe = r.pipeline(transaction=False) 7 pipe = r.pipeline(transaction=True) 8 9 r.set(‘name‘, ‘jack‘) 10 r.set(‘abc‘,‘123‘) 11 12 pipe.execute()
4、发布订阅
发布者:服务器
订阅者:Dashboad和数据处理
Demo:
1 import redis 2 3 4 class RedisHelper: 5 6 def __init__(self): 7 self.__conn = redis.Redis(host=‘10.0.0.10‘) 8 self.chan_sub = ‘fm104.5‘ 9 self.chan_pub = ‘fm104.5‘ 10 11 def public(self, msg): 12 self.__conn.publish(self.chan_pub, msg) 13 return True 14 15 def subscribe(self): 16 pub = self.__conn.pubsub() 17 pub.subscribe(self.chan_sub) 18 pub.parse_response() 19 return pub
订阅者:
1 from monitor.RedisHelper import RedisHelper 2 3 obj = RedisHelper() 4 redis_sub = obj.subscribe() 5 6 while True: 7 msg= redis_sub.parse_response() 8 print(msg)
发布者:
1 from monitor.RedisHelper import RedisHelper 2 3 obj = RedisHelper() 4 obj.public(‘hello‘)
更多参见:https://github.com/andymccurdy/redis-py/
http://doc.redisfans.com/