基于python3封装的redis stream操作

Posted 5ycode

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了基于python3封装的redis stream操作相关的知识,希望对你有一定的参考价值。

基于python3封装的redis stream操作。
主要有两块

  • 基于redis封装的RedisService用来获取redis客户端

    • 实现连接池
    • 实现直连
  • 基于哨兵封装的SentinelService继承自RedisService

    • 重写获取连接池,增加连接池缓存(master不变使用缓存)
    • 重写直连(根据哨兵解析出master节点)
  • 基于常用的stream命令的封装RedisStreamService

第三个,可以自己扩展封装其他的命令

看下类图:

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# @Time    : 2022/4/02 7:35 下午
# @FileName: redisService.py

import redis,logging
from redis.client import *
from redis.sentinel import Sentinel
import json

class RedisService(object):
    """
    直接操作redis
    """
    def __init__(self,host=None,port=None,pwd=None,db=0,max_connections=10):
        """
        redis链接初始化
        :param host: redis实例的地址
        :param port:  端口
        :param pwd: 密码
        :param db:  使用的db
        :param max_connections: 连接池大小
        """
        self.redis_pool = None
        self.host = host
        self.port = port
        self.pwd = pwd
        self.db = db
        self.max_connections = max_connections

    def __getPool(self,flag):
        """
        获取连接池(主从切换以后把flag设置为True,重新初始化连接池)
        :param flag: 为False不用重新获取,为True需要重新获取
        :param max_connections: 连接池大小
        :return:
        """
        try:
            if not flag:
                return self.pool
            # 缓存连接池   
            self.pool =  redis.ConnectionPool(host=self.host, port=self.port, db=self.db, password=self.pwd, max_connections=self.max_connections)
            return self.pool
        except Exception as e:
            logging.error(e)
            raise e
    def getRedisFromPool(self,flag = False) -> Redis:
        """
        从链接池获取redis链接
        :param flag: 是否重新获取(主从切换后)
        :return:
        """
        return redis.Redis(connection_pool=self.__getPool(flag))
    def getRedis(self) -> Redis:
        """
        直接获取redis链接
        :return:
        """
        if self.pwd:
            return redis.Redis(host=self.host, port=self.port, db=self.db, password=self.pwd)
        return redis.Redis(host=self.host, port=self.port, db=self.db)


class SentinelService(RedisService):
    """
    通过哨兵操作redis
    """
    def __init__(self,sentinel_nodes_str=None,service_name=None,pwd = None,max_connections=10):
        """
        初始化哨兵服务
        :param sentinel_nodes_str: 哨兵地址
        :param service_name: 集群服务名称
        :param pwd:  密码
        :param max_connections:连接池大小
        """
        super(SentinelService, self).__init__(pwd = pwd,max_connections=max_connections)
        #缓存的master的地址(如果一台机器不同端口,请拼接)
        self.master_host = None
        self.sentinel_nodes_str = sentinel_nodes_str
        self.service_name = service_name
        self.sentinel_nodes = self.__parseNodesStr()

    def __parseNodesStr(self):
        """
        哨兵参数解析
        :return:
        """
        try:
            list = []
            for sentinelInfo in self.sentinel_nodes_str.split(','):
                kv = sentinelInfo.split(":")
                list.append((kv[0], kv[1]))
            logging.info("from sentinel: %s", list)
            return list
        except Exception as e:
            logging.error(e)
            raise e

    def __getMaster(self):
        """
        从哨兵节点获取master节点
        :return:
        """
        # 链接哨兵节点
        sentinel = Sentinel(self.sentinel_nodes, socket_timeout=0.1)
        # 获取master节点
        master = sentinel.discover_master(self.service_name)
        if master:
            # 赋值给RedisService
            self.host = master[0]
            self.port = master[1]
        return master

    def getRedisFromPool(self, flag=False) -> Redis:
        """
        从链接池获取redis客户端
        :param flag:
        :return:
        """
        # 初始化master节点
        master = self.__getMaster()
        # 没有主从切换,直接从连接池里获取
        if self.master_host in master:
            return super().getRedisFromPool()
        # 主从切换了,更新缓存
        self.master_host = master[0]
        return super().getRedisFromPool(True)
    def getRedis(self) -> Redis:
        """
        直接获取redis客户端
        :return:
        """
        self.__getMaster()
        return super().getRedis()

class RedisStreamService(object):
    """
    redis stream封装
    """
    def __init__(self,redis,stream_name,consumer_group):
        """
        :param redis:redis的客户端
        :param stream_name:stream的名称
        :param consumer_group:消费组名称
        """
        self.redis = redis
        self.stream_name = stream_name
        self.consumer_group = consumer_group

    def __streamInit(self,data,id="0",maxlen=20000,target = None):
        """
        初始化stream,上线之前手动调用下即可,不用在项目里调用
        :param data: 业务数据  测试数据
        :param id: 0 从开始消费, $ 从创建以后新进来的开始消费
        :param maxlen: 队里最大长度
        :return:
        """
        if not self.redis.exists(self.stream_name):
            """
            不存在消费者,直接创建消费者和消费组
            """
            self.redis.xadd(self.stream_name, self.__dataWrap(data), maxlen=maxlen)
            self.xgroup_create(id)
            self.redis.xadd(self.stream_name, self.__dataWrap(data), maxlen=maxlen)
        self.consumer("robot",count=2,target=target)
    def xgroup_create(self,id):
        """
        创建消费组
        :param id:
        :return:
        """
        self.redis.xgroup_create(self.stream_name, self.consumer_group, id=id)
    def __dataWrap(self,data) -> dict:
        """
        包装数据
        :param data:
        :return:
        """
        return  "bizData":json.dumps(data)
    def xack(self,msgId):
        """
        ack
        :param msgId:
        :return:
        """
        self.redis.xack( self.stream_name,  self.consumer_group, msgId)

    def __getBizData(self,data):
        """
        从消息流中获取业务数据
        :param item:
        :return:
        """
        if not data or not data[0]:
            return None, None
        msgId = str(data[0], 'utf-8')
        data = str(key, 'utf-8'): str(val, 'utf-8') for key, val in data[1].items()
        return msgId, data["bizData"]
    def add(self,data):
        """
        新增数据
        :param stream_name:
        :param data:
        :return:
        """
        self.redis.xadd( self.stream_name, self.__dataWrap(data))
    def consumer(self,consumer_name,id=">",block=60000, count=1,target=None):
        """
        消费数据
        :param consumer_name: 消费者名称,建议传递ip
        :param id: 从哪开始消费
        :param block: 无消息阻塞时间,毫秒,默认60秒,在60秒内有消息直接消费
        :param count: 消费多少条,默认1
        :param target: 业务处理回调方法
        :return:
        """
        # block 0 时阻塞等待, 其他数值表示读取超时时间
        streams = self.stream_name: id
        rst = self.redis.xreadgroup( self.consumer_group, consumer_name, streams, block=block, count=count)
        print(f'消费到的数据 rst')
        if not rst or not rst[0] or not rst[0][1]:
           return None
        # 遍历获取到的列表信息(可以消费多条,根据count)
        for item in rst[0][1]:
           try:
               #解析数据
               msgId, data = self.__getBizData(item)
               """
               执行回调函数target,成功后ack
               """
               if target and target(msgId,data):
                   # 将处理完成的消息标记,类似于kafka的offset
                   self.redis.xack( self.stream_name,  self.consumer_group, msgId)
           except Exception as e:
               # 消费失败,下次从头消费(消费成功的都已经提交ack了,可以先不处理,以后再处理)
               logging.error("consumer is error:",e)

真实使用场景

def biz_execute(msgId,data):
    """
    业务处理,不建议多个场景共用一个stream,建议分开,
    如果数据量比较少,通过工厂处理分发
    :param msgId:
    :param data:
    :return:
    """
    print(f'业务执行msgId=msgId bizData=data')
    return True

host = "127.0.0.1"
port = 6379
pwd = "123456"
#集群名称
service_name = "sentinel-test"
#哨兵节点
sentinel_nodes_str = "10.21.41.1:6381,10.21.41.2:6382,10.21.41.3:6381"
# stream对应的key
stream_name = "stream:bizName"
# 消费组
consumer_group = "biz_group"
def subscribe():
    #基于redis的
    service = RedisService(host=host, port=port, pwd=pwd)
    #基于哨兵的
    # service = SentinelService(sentinel_nodes_str, service_name, pwd=pwd)
    while True:
        r = service.getRedisFromPool()
        stream = RedisStreamService(r, stream_name, consumer_group)
        stream.consumer("当前ip",target=biz_execute)

#起一个后台线程执行消费,防止阻塞主线程
from threading import *
t1 = Thread(target=subscribe)
t1.start()


#测试代码(测试和后台线程二选一)
def test():
    service = RedisService(host=host,port=port,pwd=pwd)
    r = service.getRedisFromPool()
    stream =  RedisStreamService(r,stream_name,consumer_group)
    data = "key1":"init@init","key2":"val2"
    stream.__streamInit(data,id="0",maxlen=10,target=biz_execute)
    # service = SentinelService(sentinel_nodes_str,service_name,pwd = pwd)
    # r = service.getRedisFromPool()
if __name__ == '__main__':
    test()

输出

 python3 redisService.py
消费到的数据 [[b'stream:bizName', [(b'1648806192012-0', b'bizData': b'"key1": "init@init", "key2": "val2"'), (b'1648806192012-1', b'bizData': b'"key1": "xxxxy2": "val2"')]]]
业务执行msgId=1648806192012-0 bizData="key1": "init@init", "key2": "val2"
业务执行msgId=1648806192012-1 bizData="key1": "xxxx", "key2": "val2"

以上是关于基于python3封装的redis stream操作的主要内容,如果未能解决你的问题,请参考以下文章

使用java jedis封装Redis Stream操作案例

基于Stream的Redis消息队列

Redis基于(ListPubSubStream消费者组)实现消息队列,基于Stream结构实现异步秒杀下单

Redis基于Stream的消息队列 - 消费者组模式

Redis进阶学习04---秒杀优化和消息队列

Redis的安装和Redis的基本数据库操作实操详解