redis高级用法
Posted Go浪研习社
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了redis高级用法相关的知识,希望对你有一定的参考价值。
redis高级用法
Redis6.0已经低调的发布了稳定版,最大的变化就是支持I/O多线程,但旧版本就真的是单线程吗,事情往往不是这么简单,
这里的单线程指的是只有一个执行线程串行处理命令,再配合多路复用机制,实际上数据持久化、主从同步、连接释放等都
有其他线程来处理。既然6.0都出来了,之前的文章我也说了不少Redis相关的了,借此契机,我们就来看看Redis的一些高级用法吧。
目标:
-
Pipeline管道的使用 -
位图bitmap使用 -
Redis红锁原理 -
Redis事务 -
Lua脚本
1.Pipeline管道的使用
redis客户端执行一条命令分4个过程:
发送命令-〉命令排队-〉命令执行-〉返回结果
这个过程称为Round trip time(简称RTT, 往返时间),mget mset有效节约了RTT,但大部分命令(如hgetall,并没有mhgetall)不支持批量操作,需要消耗N次RTT ,这个时候需要pipeline来解决这个问题
-
使用pipeline组装的命令个数不能太多,不然数据量过大,增加客户端的等待时间,还可能造成网络阻塞,可以将大量命令的拆分多个小的pipeline命令完成。
-
高频命令场景下,应避免使用管道,因为需要先将全部执行命令放入管道,会耗时。另外,需要使用返回值的情况也不建议使用,
同步所有管道返回结果也是个耗时的过程!管道无法提供原子性/事务保障。
-
非pipeline和pipeline两者性能对比,明显看出使用pipeline性能要快很多
具体代码:
package main
import (
"fmt"
"github.com/go-redis/redis"
)
func main() {
redisDB := redis.NewClient(&redis.Options{
Addr: "127.0.0.1:6379",
Password: "",
DB: 0,
})
var (
zreCmd *redis.Cmd
ZscCmd *redis.Cmd
myScore int64
myGouLiang int64
)
if _, err := redisDB.Pipelined(func(pipe redis.Pipeliner) error {
zreCmd = pipe.Do("Incrby", "aaaaa", "1")
ZscCmd = pipe.Do("Incrby", "bbbbb", "2")
return nil
}); err != nil && err != redis.Nil {
return
}
if zreCmd.Err() == nil {
myScore, _ = zreCmd.Int64()
myScore += 1
}
if ZscCmd.Err() == nil {
myGouLiang, _ = ZscCmd.Int64()
}
fmt.Println(myGouLiang)
fmt.Println(myScore)
}
2.分布式锁
首先,一个好的分布式锁,应该具有以下特征:
-
互斥——同时刻只能有一个持有者; -
可重入——同一个持有者可多次进入; -
阻塞——多个锁请求者时,未获得锁的阻塞等待; -
无死锁——持有锁的客户端崩溃(crashed)或者网络被分裂(gets partitioned),锁仍然可以被获取; -
容错——只要大部分节点正常,仍然可以获取和释放锁;
单Redis实例下,通过SETNX命令来实现Redis分布式锁,已不推荐使用,如果说使用SET命令配合NX参数或许会让面试官更为满意,
因为SET命令可带有EX/PX/NX/XX
参数,更为强大,可以完成更复杂业务,但这些在此不表,我想说的是分布式多实例下的红锁算法:
在Redis的分布式环境中,我们假设有N(建议为5)个Redis master,这些节点完全互相独立,不存在主从复制或者其他集群协调机制。
获取一个红锁的步骤如下:
-
获取当前系统时间,毫秒为单位 -
依次(同时并发地)尝试从N个实例,使用相同的Key和随机值(全局唯一)获取锁 -
客户端使用当前时间减去开始获取锁时间(步骤1记录的时间)即为获取锁消耗的时间。当且仅当从大多数( N/2+1
)的Redis节点都获取到锁,并且消耗时间小于锁失效时间时,红锁才算获取成功 -
如果获取到了红锁,key真正的有效时间等于有效时间减去获取锁消耗的时间(步骤3中的计算结果) -
如果获取红锁失败(没有在至少 N/2+1
个Redis实例取到锁或者取锁时间超过了锁的有效时间),客户端应该在所有的Redis实例上进行解锁(即使某些Redis实例根本就没有加锁成功)
注:
A. 锁指单Redis实例上使用如SET命令获取的锁,红锁是将多个单Redis实例锁组合为一个锁来管理,从而避免单点故障
B. 步骤2中,当向Redis设置锁时,客户端应该设置一个网络连接和响应超时时间,这个超时时间应该小于锁的失效时间。例如你的锁自动失效时间为10秒,
则超时时间应该在5-50毫秒之间。这样可以避免服务器端Redis已经挂掉的情况下,客户端还在死死地等待响应结果。如果服务器端没有在规定时间内响应,
客户端应该尽快尝试另外一个Redis实例。
C. 当客户端无法取到红锁时,应该在一个随机延迟后重试,防止多个客户端在同时抢夺同一资源的锁(这样会导致脑裂,没有人会取到锁),
并且当客户端从大多数Redis实例获取锁失败时,应该尽快地释放(部分)已经成功取到的锁,这样其他的客户端就不必非得等到锁过完“有效时间”才能取到,
具体代码:
package main
import (
"fmt"
"github.com/go-redis/redis"
"time"
)
type redisClient redis.Client
func connRedisCluster(address []string, password string) *redis.ClusterClient {
conf := redis.ClusterOptions{
Addrs: address,
Password: password,
}
return redis.NewClusterClient(&conf)
}
func connRedisSingle(addr, password string) *redis.Client {
conf := redis.Options{
Addr: addr,
Password: password,
}
return redis.NewClient(&conf)
}
func (r *redisClient) lock(value string) (error, bool) {
ret := r.SetNX("hello", value, time.Second*10)
if err := ret.Err(); err != nil {
fmt.Printf("set value %s error: %v\n", value, err)
return err, false
}
return nil, ret.Val()
}
func (r *redisClient) unlock() bool {
ret := r.Del("hello")
if err := ret.Err(); err != nil {
fmt.Println("unlock error: ", err)
return false
}
return true
}
func (r *redisClient) retryLock() bool {
ok := false
for !ok {
err, t := r.getTTL()
if err != nil {
return false
}
if t > 0 {
fmt.Printf("锁被抢占, %f 秒后重试...\n", (t / 10).Seconds())
time.Sleep(t / 10)
}
err, ok = r.lock("Jan")
if err != nil {
return false
}
}
return ok
}
func (r *redisClient) getLock() (error, string) {
ret := r.Get("hello")
if err := ret.Err(); err != nil {
fmt.Println("get lock error: ", err)
return err, ""
}
rt, _ := ret.Bytes()
return nil, string(rt)
}
// 获取锁的过期剩余时间
func (r *redisClient) getTTL() (error, time.Duration) {
ret := r.TTL("hello")
if err := ret.Err(); err != nil {
fmt.Println("get TTL error: ", err)
return err, 0
}
return nil, ret.Val()
}
func (r *redisClient) threadLock(threadId string) {
for {
err, _ := r.getLock()
if err != nil && err.Error() == "redis: nil" {
// 没有获取到值,说明目前没有人持有锁
fmt.Printf("线程 %s 开始加锁\n", threadId)
err, ok := r.lock("Jan")
if err != nil {
return
}
if !ok {
if !r.retryLock() {
fmt.Printf("线程 %s 加锁失败\n", threadId)
return
}
}
fmt.Printf("线程 %s 已加锁\n", threadId)
// 加锁后执行相应操作
time.Sleep(5 * time.Second)
// 释放锁
r.unlock()
fmt.Printf("线程 %s 已释放锁\n", threadId)
return
} else if err != nil {
return
}
err, t := r.getTTL()
if err != nil {
return
}
if t > 0 {
fmt.Printf("线程 %s 锁被占用, %f 秒后重试\n", threadId, (t / 10).Seconds())
time.Sleep(t / 10)
}
}
}
func main() {
var r redisClient
address := "127.0.0.1:6379"
cl := connRedisSingle(address, "")
defer cl.Close()
r = redisClient(*cl)
// 线程1获取锁
go r.threadLock("1")
// time.Sleep(10 * time.Millisecond)
// 线程2获取锁
go r.threadLock("2")
select {}
}
3.位图
Redis中有一种特殊的存储类型,二进制位对象——位图(bitmap),存储上是按照”字符串”处理,如果看官知晓布隆过滤器,就应该十分熟悉位图了,其特点很明显,
就是每位只有0/1两种状态值,如下图示例,然后就是体积小,10M可存储8千万bit位,最大允许512M,可存储40亿bit位。
Redis-cli中只有6个命令:
-
SETBIT:设置或清除指定偏移量上的位(bit)。当 key 不存在时,自动生成一个新的字符串值。字符串会进行伸展(grown)以确保它可以将 value 保存在指定的偏移量上。当字符串值进行伸展时,空白位置以 0 填充。 -
GETBIT:对 key 所储存的字符串值,获取指定偏移量上的位(bit)。当 offset 比字符串值的长度大,或者 key 不存在时,返回 0。 -
BITCOUNT:计算给定字符串中,被设置为 1 的比特位的数量。对一个不存在的 key 进行 BITCOUNT 操作,结果为 0 。 -
BITPOS:返回位图中第一个值为 bit 的二进制位的位置。 -
BITTOP:对一个或多个保存二进制位的字符串 key 进行位元操作,并将结果保存到 destkey 上。 -
BITFIELD:将一个 Redis 字符串看作是一个由二进制位组成的数组, 并对这个数组中储存的长度不同的整数进行访问。
具体代码:
package main
import (
"fmt"
"github.com/go-redis/redis"
)
func main() {
redisDB := redis.NewClient(&redis.Options{
Addr: "127.0.0.1:6379",
Password: "",
DB: 0,
})
var (
getBitInt *redis.IntCmd
andBitInt *redis.IntCmd
posBitInt *redis.IntCmd
countBitInt *redis.IntCmd
countBit *redis.BitCount
)
redisDB.SetBit("bit_key", 1000, 1)
getBitInt = redisDB.GetBit("bit_key", 1000)
countBitInt = redisDB.BitCount("bit_key", countBit)
// 对"bit_key1","bit_key2"做AND位运算,并保存到"dest_key"中
andBitInt = redisDB.BitOpAnd("dest_key", "bit_key1", "bit_key2")
// redisDB.BitPos("bit_key",false);
posBitInt = redisDB.BitPos("bit_key", 1, 2)
fmt.Println(getBitInt)
fmt.Println(andBitInt)
fmt.Println(countBitInt)
fmt.Println(posBitInt)
}
4.事务
Redis使用MULTI标记一个事务块的开始。事务块内的多条命令会按照先后顺序被放进一个队列当中,最后由 EXEC 命令原子性(atomic)地执行。
具体代码:
package main
import (
"fmt"
"github.com/go-redis/redis"
"strings"
"time"
)
func main() {
redisDB := redis.NewClient(&redis.Options{
Addr: "127.0.0.1:6379",
Password: "",
DB: 0,
})
// 开启一个TxPipeline事务
pipe := redisDB.TxPipeline()
defer pipe.Close()
// 这里是放命令
pipe.SetNX("Freeze:DL201544a00007:a00002:a1:300", "200", 30*time.Second)
pipe.SetNX("Freeze:DL201544a00008:a00002:a1:400", "400", 30*time.Second)
pipe.SetNX("Freeze:DL201544a00009:a00002:a1:500", "500", 30*time.Second)
pipe.SetNX("Freeze:DL201544a0000991123:a000091123:a1:592100", "5923100", 30*time.Second)
// 通过Exec函数提交redis事务
r, err := pipe.Exec()
if err != nil {
// 取消提交
pipe.Discard()
}
// 这里调用exec执行刚刚加的命令,redis的事务和mysql事务是不一样的,一般情况下,这里的err出错是在编译期间出错,运行期间是很少出错的
// mysql的事务是具有原子性,一致性,隔离性 ,持久性
// redis事务三阶段:
//
// 开启:以MULTI开始一个事务
// 入队:将多个命令入队到事务中,接到这些命令并不会立即执行,而是放到等待执行的事务队列里面
// 执行:由EXEC命令触发事务
// redis事务三大特性:
//
// 单独的隔离操作:事务中的所有命令都会序列化、按顺序地执行。事务在执行的过程中,不会被其他客户端发送来的命令请求所打断。
// 没有隔离级别的概念:队列中的命令没有提交之前都不会实际的被执行,因为事务提交前任何指令都不会被实际执行,也就不存在”事务内的查询要看到事务里的更新,在事务外查询不能看到”这个让人万分头痛的问题
// 不保证原子性:redis同一个事务中如果有一条命令执行失败,其后的命令仍然会被执行,没有回滚
//
// 所以,如果出现,第一条数据处理是true,其他的处理是false,那需要把其中的为true的数据,撤销其操作,这里也只能手动去撤销
var resultmap []map[string]string
resultmap = make([]map[string]string, 0)
for _, v := range r {
params := fmt.Sprintf("%s", v)
res := strings.Split(params, " ")
fmt.Println("key=", res[1])
// 处理结果
fmt.Println("res=", res[6])
if res[6] == "true" {
var model map[string]string
model = make(map[string]string, 0)
model["key"] = res[1]
model["result"] = res[6]
resultmap = append(resultmap, model)
}
}
// 这堆代码是为了事务处理结果不一致导致的问题
if len(r) != len(resultmap) {
for _, vb := range resultmap {
redisDB.Del(vb["key"]).Result()
}
}
}
5.Lua脚本
Redis 使用单个 Lua 解释器去运行所有脚本,并且, Redis 也保证脚本会以原子性(atomic)的方式执行:当某个脚本正在运行的时候,
不会有其他脚本或 Redis 命令被执行。这和使用 MULTI / EXEC 包围的事务很类似。
Redis-cli模式,命令行格式:
EVAL script numkeys key [key …] arg [arg …]
Lua 中通过全局变量 KEYS 数组,用 1 为基址的形式访问键名参数;不是键名参数的附加参数 arg [arg ...] ,可以在 Lua 中通过全局变量 ARGV 数组访问。
在 Lua 脚本中,可以使用两个不同函数来执行 Redis 命令,它们分别是:redis.call()和redis.pcall(),这两个函数的唯一区别在于它们使用不同的方式处理
执行命令所产生的异常。对于Lua脚本,还有SCRIPT 命令进行管理,在此不表。
具体代码:
package main
import (
"github.com/garyburd/redigo/redis"
)
const SCRIPT_VOICE2 = `
local voiceGiftNumKey = tostring(KEYS[1])
local voiceTemKey = tostring(KEYS[2])
local giftCom = tostring(ARGV[1])
local gnum = tonumber(ARGV[2])
local guest = tonumber(ARGV[3])
local tmpInfomarshal = tostring(ARGV[4])
local keyValidity = tonumber(ARGV[5])
if giftCom ~= 'A' and giftCom ~= 'B' and giftCom ~= 'C'
then
return 3
end
redis.call('hincrby',voiceGiftNumKey,giftCom,gnum)
local giftNumArray = redis.call('HMGET',voiceGiftNumKey,'A','B','C')
local minGiftNum = 99999
for i = 1,3 do
local tmpNum = 0
if giftNumArray[i] ~= false
then
tmpNum = tonumber(giftNumArray[i])
end
if tmpNum <= 0
then
return 2
else
if tmpNum< minGiftNum
then
minGiftNum = tmpNum
end
end
end
local newArray={}
for i = 1,3 do
local tmpNum2 = tonumber(giftNumArray[i])
newArray[i]=tmpNum2-minGiftNum
end
redis.call('HMSET',voiceGiftNumKey,'A',newArray[1],'B',newArray[2],'C',newArray[3])
if keyValidity > 0
then
local voice= redis.call('SET',voiceTemKey,tmpInfomarshal,'EX',keyValidity,'NX')
if voice ~= false
then
return 1
end
end
return 0
`
var voiceScript2 = redis.NewScript(2, SCRIPT_VOICE2)
func main() {
c, err := redis.Dial("tcp", "127.0.0.1:6379")
if err != nil {
return
}
defer c.Close()
reply, err := redis.Int(voiceScript2.Do(c, "Voice:RoomID:{686}:SeatId:{7c871778e2c011ea98b600163e012514}:TemInfo", "voiceGiftNum:{86}:{86}", "A", 1, 86))
if reply == 0 {
return
}
}
以上是关于redis高级用法的主要内容,如果未能解决你的问题,请参考以下文章