基于python3封装的redis stream操作
Posted 5ycode
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了基于python3封装的redis stream操作相关的知识,希望对你有一定的参考价值。
基于python3封装的redis stream操作。
主要有两块
-
基于redis封装的RedisService用来获取redis客户端
- 实现连接池
- 实现直连
-
基于哨兵封装的SentinelService继承自RedisService
- 重写获取连接池,增加连接池缓存(master不变使用缓存)
- 重写直连(根据哨兵解析出master节点)
-
基于常用的stream命令的封装RedisStreamService
第三个,可以自己扩展封装其他的命令
看下类图:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# @Time : 2022/4/02 7:35 下午
# @FileName: redisService.py
import redis,logging
from redis.client import *
from redis.sentinel import Sentinel
import json
class RedisService(object):
"""
直接操作redis
"""
def __init__(self,host=None,port=None,pwd=None,db=0,max_connections=10):
"""
redis链接初始化
:param host: redis实例的地址
:param port: 端口
:param pwd: 密码
:param db: 使用的db
:param max_connections: 连接池大小
"""
self.redis_pool = None
self.host = host
self.port = port
self.pwd = pwd
self.db = db
self.max_connections = max_connections
def __getPool(self,flag):
"""
获取连接池(主从切换以后把flag设置为True,重新初始化连接池)
:param flag: 为False不用重新获取,为True需要重新获取
:param max_connections: 连接池大小
:return:
"""
try:
if not flag:
return self.pool
# 缓存连接池
self.pool = redis.ConnectionPool(host=self.host, port=self.port, db=self.db, password=self.pwd, max_connections=self.max_connections)
return self.pool
except Exception as e:
logging.error(e)
raise e
def getRedisFromPool(self,flag = False) -> Redis:
"""
从链接池获取redis链接
:param flag: 是否重新获取(主从切换后)
:return:
"""
return redis.Redis(connection_pool=self.__getPool(flag))
def getRedis(self) -> Redis:
"""
直接获取redis链接
:return:
"""
if self.pwd:
return redis.Redis(host=self.host, port=self.port, db=self.db, password=self.pwd)
return redis.Redis(host=self.host, port=self.port, db=self.db)
class SentinelService(RedisService):
"""
通过哨兵操作redis
"""
def __init__(self,sentinel_nodes_str=None,service_name=None,pwd = None,max_connections=10):
"""
初始化哨兵服务
:param sentinel_nodes_str: 哨兵地址
:param service_name: 集群服务名称
:param pwd: 密码
:param max_connections:连接池大小
"""
super(SentinelService, self).__init__(pwd = pwd,max_connections=max_connections)
#缓存的master的地址(如果一台机器不同端口,请拼接)
self.master_host = None
self.sentinel_nodes_str = sentinel_nodes_str
self.service_name = service_name
self.sentinel_nodes = self.__parseNodesStr()
def __parseNodesStr(self):
"""
哨兵参数解析
:return:
"""
try:
list = []
for sentinelInfo in self.sentinel_nodes_str.split(','):
kv = sentinelInfo.split(":")
list.append((kv[0], kv[1]))
logging.info("from sentinel: %s", list)
return list
except Exception as e:
logging.error(e)
raise e
def __getMaster(self):
"""
从哨兵节点获取master节点
:return:
"""
# 链接哨兵节点
sentinel = Sentinel(self.sentinel_nodes, socket_timeout=0.1)
# 获取master节点
master = sentinel.discover_master(self.service_name)
if master:
# 赋值给RedisService
self.host = master[0]
self.port = master[1]
return master
def getRedisFromPool(self, flag=False) -> Redis:
"""
从链接池获取redis客户端
:param flag:
:return:
"""
# 初始化master节点
master = self.__getMaster()
# 没有主从切换,直接从连接池里获取
if self.master_host in master:
return super().getRedisFromPool()
# 主从切换了,更新缓存
self.master_host = master[0]
return super().getRedisFromPool(True)
def getRedis(self) -> Redis:
"""
直接获取redis客户端
:return:
"""
self.__getMaster()
return super().getRedis()
class RedisStreamService(object):
"""
redis stream封装
"""
def __init__(self,redis,stream_name,consumer_group):
"""
:param redis:redis的客户端
:param stream_name:stream的名称
:param consumer_group:消费组名称
"""
self.redis = redis
self.stream_name = stream_name
self.consumer_group = consumer_group
def __streamInit(self,data,id="0",maxlen=20000,target = None):
"""
初始化stream,上线之前手动调用下即可,不用在项目里调用
:param data: 业务数据 测试数据
:param id: 0 从开始消费, $ 从创建以后新进来的开始消费
:param maxlen: 队里最大长度
:return:
"""
if not self.redis.exists(self.stream_name):
"""
不存在消费者,直接创建消费者和消费组
"""
self.redis.xadd(self.stream_name, self.__dataWrap(data), maxlen=maxlen)
self.xgroup_create(id)
self.redis.xadd(self.stream_name, self.__dataWrap(data), maxlen=maxlen)
self.consumer("robot",count=2,target=target)
def xgroup_create(self,id):
"""
创建消费组
:param id:
:return:
"""
self.redis.xgroup_create(self.stream_name, self.consumer_group, id=id)
def __dataWrap(self,data) -> dict:
"""
包装数据
:param data:
:return:
"""
return "bizData":json.dumps(data)
def xack(self,msgId):
"""
ack
:param msgId:
:return:
"""
self.redis.xack( self.stream_name, self.consumer_group, msgId)
def __getBizData(self,data):
"""
从消息流中获取业务数据
:param item:
:return:
"""
if not data or not data[0]:
return None, None
msgId = str(data[0], 'utf-8')
data = str(key, 'utf-8'): str(val, 'utf-8') for key, val in data[1].items()
return msgId, data["bizData"]
def add(self,data):
"""
新增数据
:param stream_name:
:param data:
:return:
"""
self.redis.xadd( self.stream_name, self.__dataWrap(data))
def consumer(self,consumer_name,id=">",block=60000, count=1,target=None):
"""
消费数据
:param consumer_name: 消费者名称,建议传递ip
:param id: 从哪开始消费
:param block: 无消息阻塞时间,毫秒,默认60秒,在60秒内有消息直接消费
:param count: 消费多少条,默认1
:param target: 业务处理回调方法
:return:
"""
# block 0 时阻塞等待, 其他数值表示读取超时时间
streams = self.stream_name: id
rst = self.redis.xreadgroup( self.consumer_group, consumer_name, streams, block=block, count=count)
print(f'消费到的数据 rst')
if not rst or not rst[0] or not rst[0][1]:
return None
# 遍历获取到的列表信息(可以消费多条,根据count)
for item in rst[0][1]:
try:
#解析数据
msgId, data = self.__getBizData(item)
"""
执行回调函数target,成功后ack
"""
if target and target(msgId,data):
# 将处理完成的消息标记,类似于kafka的offset
self.redis.xack( self.stream_name, self.consumer_group, msgId)
except Exception as e:
# 消费失败,下次从头消费(消费成功的都已经提交ack了,可以先不处理,以后再处理)
logging.error("consumer is error:",e)
真实使用场景
def biz_execute(msgId,data):
"""
业务处理,不建议多个场景共用一个stream,建议分开,
如果数据量比较少,通过工厂处理分发
:param msgId:
:param data:
:return:
"""
print(f'业务执行msgId=msgId bizData=data')
return True
host = "127.0.0.1"
port = 6379
pwd = "123456"
#集群名称
service_name = "sentinel-test"
#哨兵节点
sentinel_nodes_str = "10.21.41.1:6381,10.21.41.2:6382,10.21.41.3:6381"
# stream对应的key
stream_name = "stream:bizName"
# 消费组
consumer_group = "biz_group"
def subscribe():
#基于redis的
service = RedisService(host=host, port=port, pwd=pwd)
#基于哨兵的
# service = SentinelService(sentinel_nodes_str, service_name, pwd=pwd)
while True:
r = service.getRedisFromPool()
stream = RedisStreamService(r, stream_name, consumer_group)
stream.consumer("当前ip",target=biz_execute)
#起一个后台线程执行消费,防止阻塞主线程
from threading import *
t1 = Thread(target=subscribe)
t1.start()
#测试代码(测试和后台线程二选一)
def test():
service = RedisService(host=host,port=port,pwd=pwd)
r = service.getRedisFromPool()
stream = RedisStreamService(r,stream_name,consumer_group)
data = "key1":"init@init","key2":"val2"
stream.__streamInit(data,id="0",maxlen=10,target=biz_execute)
# service = SentinelService(sentinel_nodes_str,service_name,pwd = pwd)
# r = service.getRedisFromPool()
if __name__ == '__main__':
test()
输出
python3 redisService.py
消费到的数据 [[b'stream:bizName', [(b'1648806192012-0', b'bizData': b'"key1": "init@init", "key2": "val2"'), (b'1648806192012-1', b'bizData': b'"key1": "xxxxy2": "val2"')]]]
业务执行msgId=1648806192012-0 bizData="key1": "init@init", "key2": "val2"
业务执行msgId=1648806192012-1 bizData="key1": "xxxx", "key2": "val2"
以上是关于基于python3封装的redis stream操作的主要内容,如果未能解决你的问题,请参考以下文章