先说结论。使用连接池的情况下,每一条Redis命令都将从连接池中获得一个连接,执行完后随即回收。因此在做切库操作时,使用Pipline来必须保证前后几条命令在同一个库执行。
一,现象
某个微服务中,我们的Redis key 集中在11库,因此连接池的默认库为11。由于历史原因,当需要获取设备信息时,需要切换到1库。
最初代码如下:
单独请求这个路由时,完全没有任何问题。一切按照预期的执行。
但在并发的时候,一些原来在默认库的操作未能取得正确结果,通过查看日志发现,Redis库被切到1了,因此导致错误。
二,推演过程
既然被切库了,一定是在某一时刻将切库的后连接放回了资源池。
最初误认为一个请求周期使用的是同一个连接池。
通过调试发现,Redis的每一个命令都会重新取得一个连接,执行后立即回收,而且回收到资源池的顺序类似于堆。
问题重现:执行切库到1,回收到资源池。当另一个使用默认库的请求刚好拿到这个切换到1库的连接,继续执行11库的操作,发生错误。
三,解决方案
定位到问题以后,我们要做的就是怎样保持切库前后的操作都使用同一个连接,Redis提供的Pipline刚好可以完成这样的操作。
改造后的代码如下:
rdb := models.RedisCon
pipe := rdb.Pipeline()
k := "device:" + udid
pipe.Do("select", 1)
_, _ = pipe.Get(k).Result()
pipe.Do("select", 11)
cmders, err := pipe.Exec()
如果需要获取执行后的结果,还需要解析
strMap := redis.GetCmdResult(cmders)
did, _ = strMap[1].(string)
func GetCmdResult(cmders []redis.Cmder) map[int]interface{}
func GetCmdResult(cmders []redis.Cmder) map[int]interface{} {
strMap := make(map[int]interface{}, len(cmders))
for idx, cmder := range cmders {
//*ClusterSlotsCmd 未实现
switch reflect.TypeOf(cmder).String() {
case "*redis.Cmd":
cmd := cmder.(*redis.Cmd)
strMap[idx], _ = cmd.Result()
break
case "*redis.StringCmd":
cmd := cmder.(*redis.StringCmd)
strMap[idx], _ = cmd.Result()
break
case "*redis.SliceCmd":
cmd := cmder.(*redis.SliceCmd)
strMap[idx], _ = cmd.Result()
break
case "*redis.StringSliceCmd":
cmd := cmder.(*redis.StringSliceCmd)
strMap[idx], _ = cmd.Result()
break
case "*redis.StringStringMapCmd":
cmd := cmder.(*redis.StringStringMapCmd)
strMap[idx], _ = cmd.Result()
break
case "*redis.StringIntMapCmd":
cmd := cmder.(*redis.StringIntMapCmd)
strMap[idx], _ = cmd.Result()
break
case "*redis.BoolCmd":
cmd := cmder.(*redis.BoolCmd)
strMap[idx], _ = cmd.Result()
break
case "*redis.BoolSliceCmd":
cmd := cmder.(*redis.BoolSliceCmd)
strMap[idx], _ = cmd.Result()
break
case "*redis.IntCmd":
cmd := cmder.(*redis.IntCmd)
strMap[idx], _ = cmd.Result()
break
case "*redis.FloatCmd":
cmd := cmder.(*redis.FloatCmd)
strMap[idx], _ = cmd.Result()
break
case "*redis.StatusCmd":
cmd := cmder.(*redis.StatusCmd)
strMap[idx], _ = cmd.Result()
break
case "*redis.TimeCmd":
cmd := cmder.(*redis.TimeCmd)
strMap[idx], _ = cmd.Result()
break
case "*redis.DurationCmd":
cmd := cmder.(*redis.DurationCmd)
strMap[idx], _ = cmd.Result()
break
case "*redis.StringStructMapCmd":
cmd := cmder.(*redis.StringStructMapCmd)
strMap[idx], _ = cmd.Result()
break
case "*redis.XMessageSliceCmd":
cmd := cmder.(*redis.XMessageSliceCmd)
strMap[idx], _ = cmd.Result()
break
case "*redis.XStreamSliceCmd":
cmd := cmder.(*redis.XStreamSliceCmd)
strMap[idx], _ = cmd.Result()
break
case "*redis.XPendingCmd":
cmd := cmder.(*redis.XPendingCmd)
strMap[idx], _ = cmd.Result()
break
case "*redis.XPendingExtCmd":
cmd := cmder.(*redis.XPendingExtCmd)
strMap[idx], _ = cmd.Result()
break
case "*redis.ZSliceCmd":
cmd := cmder.(*redis.ZSliceCmd)
strMap[idx], _ = cmd.Result()
break
case "*redis.ZWithKeyCmd":
cmd := cmder.(*redis.ZWithKeyCmd)
strMap[idx], _ = cmd.Result()
break
case "*redis.CommandsInfoCmd":
cmd := cmder.(*redis.CommandsInfoCmd)
strMap[idx], _ = cmd.Result()
break
case "*redis.GeoLocationCmd":
cmd := cmder.(*redis.GeoLocationCmd)
strMap[idx], _ = cmd.Result()
break
case "*redis.GeoPosCmd":
cmd := cmder.(*redis.GeoPosCmd)
strMap[idx], _ = cmd.Result()
break
}
}
return strMap
}
更新,用这种新的取值方式更方便