python redis连接 线程安全么
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了python redis连接 线程安全么相关的知识,希望对你有一定的参考价值。
参考技术A 在ConnectionPool之前,如果需要连接redis,我都是用StrictRedis这个类,在源码中可以看到这个类的具体解释:redis.StrictRedis Implementation of the Redis protocol.This abstract class provides a Python interface to all Redis commands and an
implementation of the Redis protocol.Connection and Pipeline derive from this, implementing how the commands are sent and received to the Redis server
使用的方法:
?
1
2
r=redis.StrictRedis(host=xxxx, port=xxxx, db=xxxx)
r.xxxx()
有了ConnectionPool这个类之后,可以使用如下方法
?
1
2
pool = redis.ConnectionPool(host=xxx, port=xxx, db=xxxx)
r = redis.Redis(connection_pool=pool)
这里Redis是StrictRedis的子类
简单分析如下:
在StrictRedis类的__init__方法中,可以初始化connection_pool这个参数,其对应的是一个ConnectionPool的对象:
?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class StrictRedis(object):
........
def __init__(self, host='localhost', port=6379,
db=0, password=None, socket_timeout=None,
socket_connect_timeout=None,
socket_keepalive=None, socket_keepalive_options=None,
connection_pool=None, unix_socket_path=None,
encoding='utf-8', encoding_errors='strict',
charset=None, errors=None,
decode_responses=False, retry_on_timeout=False,
ssl=False, ssl_keyfile=None, ssl_certfile=None,
ssl_cert_reqs=None, ssl_ca_certs=None):
if not connection_pool:
..........
connection_pool = ConnectionPool(**kwargs)
self.connection_pool = connection_pool
在StrictRedis的实例执行具体的命令时会调用execute_command方法,这里可以看到具体实现是从连接池中获取一个具体的连接,然后执行命令,完成后释放连接:
?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# COMMAND EXECUTION AND PROTOCOL PARSING
def execute_command(self, *args, **options):
"Execute a command and return a parsed response"
pool = self.connection_pool
command_name = args[0]
connection = pool.get_connection(command_name, **options) #调用ConnectionPool.get_connection方法获取一个连接
try:
connection.send_command(*args) #命令执行,这里为Connection.send_command
return self.parse_response(connection, command_name, **options)
except (ConnectionError, TimeoutError) as e:
connection.disconnect()
if not connection.retry_on_timeout and isinstance(e, TimeoutError):
raise
connection.send_command(*args)
return self.parse_response(connection, command_name, **options)
finally:
pool.release(connection) #调用ConnectionPool.release释放连接
在来看看ConnectionPool类:
?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
class ConnectionPool(object):
...........
def __init__(self, connection_class=Connection, max_connections=None,
**connection_kwargs): #类初始化时调用构造函数
max_connections = max_connections or 2 ** 31
if not isinstance(max_connections, (int, long)) or max_connections < 0: #判断输入的max_connections是否合法
raise ValueError('"max_connections" must be a positive integer')
self.connection_class = connection_class #设置对应的参数
self.connection_kwargs = connection_kwargs
self.max_connections = max_connections
self.reset() #初始化ConnectionPool 时的reset操作
def reset(self):
self.pid = os.getpid()
self._created_connections = 0 #已经创建的连接的计数器
self._available_connections = [] #声明一个空的数组,用来存放可用的连接
self._in_use_connections = set() #声明一个空的集合,用来存放已经在用的连接
self._check_lock = threading.Lock()
.......
def get_connection(self, command_name, *keys, **options): #在连接池中获取连接的方法
"Get a connection from the pool"
self._checkpid()
try:
connection = self._available_connections.pop() #获取并删除代表连接的元素,在第一次获取connectiong时,因为_available_connections是一个空的数组,
会直接调用make_connection方法
except IndexError:
connection = self.make_connection()
self._in_use_connections.add(connection) #向代表正在使用的连接的集合中添加元素
return connection
def make_connection(self): #在_available_connections数组为空时获取连接调用的方法
"Create a new connection"
if self._created_connections >= self.max_connections: #判断创建的连接是否已经达到最大限制,max_connections可以通过参数初始化
raise ConnectionError("Too many connections")
self._created_connections += 1 #把代表已经创建的连接的数值+1
return self.connection_class(**self.connection_kwargs) #返回有效的连接,默认为Connection(**self.connection_kwargs)
def release(self, connection): #释放连接,链接并没有断开,只是存在链接池中
"Releases the connection back to the pool"
self._checkpid()
if connection.pid != self.pid:
return
self._in_use_connections.remove(connection) #从集合中删除元素
self._available_connections.append(connection) #并添加到_available_connections 的数组中
def disconnect(self): #断开所有连接池中的链接
"Disconnects all connections in the pool"
all_conns = chain(self._available_connections,
self._in_use_connections)
for connection in all_conns:
connection.disconnect()
execute_command最终调用的是Connection.send_command方法,关闭链接为 Connection.disconnect方法,而Connection类的实现:
?
1
2
3
4
5
6
7
class Connection(object):
"Manages TCP communication to and from a Redis server"
def __del__(self): #对象删除时的操作,调用disconnect释放连接
try:
self.disconnect()
except Exception:
pass
核心的链接建立方法是通过socket模块实现:
?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
def _connect(self):
err = None
for res in socket.getaddrinfo(self.host, self.port, 0,
socket.SOCK_STREAM):
family, socktype, proto, canonname, socket_address = res
sock = None
try:
sock = socket.socket(family, socktype, proto)
# TCP_NODELAY
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
# TCP_KEEPALIVE
if self.socket_keepalive: #构造函数中默认 socket_keepalive=False,因此这里默认为短连接
sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
for k, v in iteritems(self.socket_keepalive_options):
sock.setsockopt(socket.SOL_TCP, k, v)
# set the socket_connect_timeout before we connect
sock.settimeout(self.socket_connect_timeout) #构造函数中默认socket_connect_timeout=None,即连接为blocking的模式
# connect
sock.connect(socket_address)
# set the socket_timeout now that we're connected
sock.settimeout(self.socket_timeout) #构造函数中默认socket_timeout=None
return sock
except socket.error as _:
err = _
if sock is not None:
sock.close()
.....
关闭链接的方法:
?
1
2
3
4
5
6
7
8
9
10
11
def disconnect(self):
"Disconnects from the Redis server"
self._parser.on_disconnect()
if self._sock is None:
return
try:
self._sock.shutdown(socket.SHUT_RDWR) #先shutdown再close
self._sock.close()
except socket.error:
pass
self._sock = None
可以小结如下
1)默认情况下每创建一个Redis实例都会构造出一个ConnectionPool实例,每一次访问redis都会从这个连接池得到一个连接,操作完成后会把该连接放回连接池(连接并没有释放),可以构造一个统一的ConnectionPool,在创建Redis实例时,可以将该ConnectionPool传入,那么后续的操作会从给定的ConnectionPool获得连接,不会再重复创建ConnectionPool。
2)默认情况下没有设置keepalive和timeout,建立的连接是blocking模式的短连接。
3)不考虑底层tcp的情况下,连接池中的连接会在ConnectionPool.disconnect中统一销毁。本回答被提问者采纳
使用python操作redis(管道)
一、redis连接
redis提供两个类Redis和StrictRedis用于实现Redis的命令,StrictRedis用于实现大部分官方的命令,并使用官方的语法和命令,Redis是StrictRedis的子类,用于向后兼容旧版本的redis-py。
redis连接实例是线程安全的,可以直接将redis连接实例设置为一个全局变量,直接使用。如果需要另一个Redis实例(or Redis数据库)时,就需要重新创建redis连接实例来获取一个新的连接。
安装redis
pip install redis
连接redis,加上decode_responses=True,写入的键值对中的value为str类型,不加这个参数写入的则为字节类型。
二、连接池连接
redis-py使用connection pool来管理对一个redis server的所有连接,避免每次建立、释放连接的开销。默认,每个Redis实例都会维护一个自己的连接池。可以直接建立一个连接池,然后作为参数Redis,这样就可以实现多个Redis实例共享一个连接池
连接池:redis_pool.py
import redis
from redis_pool import POOl
conn = redis.Redis(connection_pool=POOl)
conn.set(‘name‘, ‘LinWOW‘)
print(conn.get(‘name‘))
三、redis-py 操作redis
python操作redis的命令和命令行几乎一致,除了del,因为和关键字重叠,所以用delete
1、字符串类型操作:
set----键key
con.set(‘key‘,‘values‘)
get---获取数值
con.get(‘com‘)
append --- 追加
con.append(‘keys‘,‘values‘)
delete---删除
con.delete(‘keys‘)
2、List类型:
rpush 、lpush----添加数据
con.rpush(‘keys‘,‘values‘)
con.lpush(‘keys‘,‘values‘)
lrange---查看数据
con.lrange(‘list‘,0,11)
lset---修改数据
con.lset(‘list_F‘,1,‘Fyn‘)---返回布尔值
rpop ---- 随机删除一个数据
con.rpop(‘keys‘)
lrem ---- 指定删除一个数据时
con.lrem(‘list_f‘,0,‘okl‘)
3、Hash类型
hmget --- 添加多条数据,也可插入单条数据
以字典的形式插入
con.hmste(‘keys‘,{‘name‘:‘‘age})#插入结果返回布尔值
hget , hgetall , hvals,hkeys查看数据
con.hget(‘ok‘,‘name‘)
con.hgetall(‘ok‘)#结果返回字典
con.hvals(‘ok‘)#获取所有的键值
con.hkeys(‘ok‘)#获取所有的键
hdel 、 del----删除某个字段,删除整个hash
con.hdel(‘ok‘,age)#指定删除某个字段 ,成功的话返回1,否则返回0
con.del(‘ok‘)#删除整个hash
4、set
sadd---添加数据
con.sadd(‘mk‘,‘oi‘.‘Hwelo‘)
smembers ---- 查看所有数据
con.smembers(‘mk‘)
spop---随机删除一个元素
con.spop(‘con.spop(‘mk‘)‘)
srem--- 指定删除某个元素
con.srem(‘mk‘,‘oi‘)
5、sorted set
zadd---添加数据
con.zadd(‘set_f‘,{‘name‘:1.1,‘age‘:2,‘sex‘:3})
zrange ---- 查询数据
con.zrange(‘set_f‘,0,11)
zrem ---指定删除某个元素
con.zrem(‘set_f‘,‘age‘)
四、redis-py管道操作 piplines
管道是redis的子类,它支持在一个请求中款冲多个命令到服务器
管道使redis的读写速度更加的快速。秒级取值1000+的数据。
并且使用pipline实现一次请求指定多个命令,并且默认情况下一次pipline 是原子性操作。
#1、创建一个管道
pipe = con.pipeline()
#缓冲多个命令
pipe.keys(‘*‘)
pipe.set(‘name‘,‘Mkl‘)
#执行命令
res = pipe.execute()#返回列表
print(res[0])
print(res[1])
管道的命令可以写在一起,如:
pipe.set(‘hello‘, ‘redis‘).sadd(‘faz‘, ‘baz‘).incr(‘num‘).execute()
print(r.get("name"))
print(r.get("role"))
print(r.get("num"))
以上是关于python redis连接 线程安全么的主要内容,如果未能解决你的问题,请参考以下文章
ConcurrentHashMap是如何保证线程安全的,你知道么?