redis高级用法

Posted Go浪研习社

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了redis高级用法相关的知识,希望对你有一定的参考价值。

redis高级用法

Redis6.0已经低调的发布了稳定版,最大的变化就是支持I/O多线程,但旧版本就真的是单线程吗,事情往往不是这么简单,

这里的单线程指的是只有一个执行线程串行处理命令,再配合多路复用机制,实际上数据持久化、主从同步、连接释放等都

有其他线程来处理。既然6.0都出来了,之前的文章我也说了不少Redis相关的了,借此契机,我们就来看看Redis的一些高级用法吧。

目标:

  1. Pipeline管道的使用
  2. 位图bitmap使用
  3. Redis红锁原理
  4. Redis事务
  5. Lua脚本

1.Pipeline管道的使用

redis客户端执行一条命令分4个过程:

  发送命令-〉命令排队-〉命令执行-〉返回结果

这个过程称为Round trip time(简称RTT, 往返时间),mget mset有效节约了RTT,但大部分命令(如hgetall,并没有mhgetall)不支持批量操作,需要消耗N次RTT ,这个时候需要pipeline来解决这个问题

  • 使用pipeline组装的命令个数不能太多,不然数据量过大,增加客户端的等待时间,还可能造成网络阻塞,可以将大量命令的拆分多个小的pipeline命令完成。

  • 高频命令场景下,应避免使用管道,因为需要先将全部执行命令放入管道,会耗时。另外,需要使用返回值的情况也不建议使用,

    同步所有管道返回结果也是个耗时的过程!管道无法提供原子性/事务保障。

  • 非pipeline和pipeline两者性能对比,明显看出使用pipeline性能要快很多

file

具体代码:

package main

import (
 "fmt"
 "github.com/go-redis/redis"
)

func main() {
 redisDB := redis.NewClient(&redis.Options{
  Addr:     "127.0.0.1:6379",
  Password: "",
  DB:       0,
 })

 var (
  zreCmd     *redis.Cmd
  ZscCmd     *redis.Cmd
  myScore    int64
  myGouLiang int64
 )
 if _, err := redisDB.Pipelined(func(pipe redis.Pipeliner) error {
  zreCmd = pipe.Do("Incrby""aaaaa""1")
  ZscCmd = pipe.Do("Incrby""bbbbb""2")
  return nil
 }); err != nil && err != redis.Nil {
  return
 }

 if zreCmd.Err() == nil {
  myScore, _ = zreCmd.Int64()
  myScore += 1
 }
 if ZscCmd.Err() == nil {
  myGouLiang, _ = ZscCmd.Int64()
 }

 fmt.Println(myGouLiang)
 fmt.Println(myScore)
}

2.分布式锁

首先,一个好的分布式锁,应该具有以下特征:

  • 互斥——同时刻只能有一个持有者;
  • 可重入——同一个持有者可多次进入;
  • 阻塞——多个锁请求者时,未获得锁的阻塞等待;
  • 无死锁——持有锁的客户端崩溃(crashed)或者网络被分裂(gets partitioned),锁仍然可以被获取;
  • 容错——只要大部分节点正常,仍然可以获取和释放锁;

单Redis实例下,通过SETNX命令来实现Redis分布式锁,已不推荐使用,如果说使用SET命令配合NX参数或许会让面试官更为满意,

因为SET命令可带有EX/PX/NX/XX参数,更为强大,可以完成更复杂业务,但这些在此不表,我想说的是分布式多实例下的红锁算法:

在Redis的分布式环境中,我们假设有N(建议为5)个Redis master,这些节点完全互相独立,不存在主从复制或者其他集群协调机制。

获取一个红锁的步骤如下:

  1. 获取当前系统时间,毫秒为单位
  2. 依次(同时并发地)尝试从N个实例,使用相同的Key和随机值(全局唯一)获取锁
  3. 客户端使用当前时间减去开始获取锁时间(步骤1记录的时间)即为获取锁消耗的时间。当且仅当从大多数( N/2+1)的Redis节点都获取到锁,并且消耗时间小于锁失效时间时,红锁才算获取成功
  4. 如果获取到了红锁,key真正的有效时间等于有效时间减去获取锁消耗的时间(步骤3中的计算结果)
  5. 如果获取红锁失败(没有在至少 N/2+1个Redis实例取到锁或者取锁时间超过了锁的有效时间),客户端应该在所有的Redis实例上进行解锁(即使某些Redis实例根本就没有加锁成功)

注:

A. 锁指单Redis实例上使用如SET命令获取的锁,红锁是将多个单Redis实例锁组合为一个锁来管理,从而避免单点故障

B. 步骤2中,当向Redis设置锁时,客户端应该设置一个网络连接和响应超时时间,这个超时时间应该小于锁的失效时间。例如你的锁自动失效时间为10秒,

则超时时间应该在5-50毫秒之间。这样可以避免服务器端Redis已经挂掉的情况下,客户端还在死死地等待响应结果。如果服务器端没有在规定时间内响应,

客户端应该尽快尝试另外一个Redis实例。

C. 当客户端无法取到红锁时,应该在一个随机延迟后重试,防止多个客户端在同时抢夺同一资源的锁(这样会导致脑裂,没有人会取到锁),

并且当客户端从大多数Redis实例获取锁失败时,应该尽快地释放(部分)已经成功取到的锁,这样其他的客户端就不必非得等到锁过完“有效时间”才能取到,

具体代码:

package main

import (
 "fmt"
 "github.com/go-redis/redis"
 "time"
)

type redisClient redis.Client

func connRedisCluster(address []string, password string) *redis.ClusterClient {
 conf := redis.ClusterOptions{
  Addrs:    address,
  Password: password,
 }
 return redis.NewClusterClient(&conf)
}

func connRedisSingle(addr, password string) *redis.Client {
 conf := redis.Options{
  Addr:     addr,
  Password: password,
 }
 return redis.NewClient(&conf)
}

func (r *redisClient) lock(value string) (error, bool) {
 ret := r.SetNX("hello", value, time.Second*10)
 if err := ret.Err(); err != nil {
  fmt.Printf("set value %s error: %v\n", value, err)
  return err, false
 }
 return nil, ret.Val()
}

func (r *redisClient) unlock() bool {
 ret := r.Del("hello")
 if err := ret.Err(); err != nil {
  fmt.Println("unlock error: ", err)
  return false
 }
 return true
}

func (r *redisClient) retryLock() bool {
 ok := false
 for !ok {
  err, t := r.getTTL()
  if err != nil {
   return false
  }
  if t > 0 {
   fmt.Printf("锁被抢占, %f 秒后重试...\n", (t / 10).Seconds())
   time.Sleep(t / 10)
  }
  err, ok = r.lock("Jan")
  if err != nil {
   return false
  }
 }
 return ok
}

func (r *redisClient) getLock() (error, string) {
 ret := r.Get("hello")
 if err := ret.Err(); err != nil {
  fmt.Println("get lock error: ", err)
  return err, ""
 }
 rt, _ := ret.Bytes()
 return nil, string(rt)
}

// 获取锁的过期剩余时间
func (r *redisClient) getTTL() (error, time.Duration) {
 ret := r.TTL("hello")
 if err := ret.Err(); err != nil {
  fmt.Println("get TTL error: ", err)
  return err, 0
 }
 return nil, ret.Val()
}

func (r *redisClient) threadLock(threadId string) {
 for {
  err, _ := r.getLock()
  if err != nil && err.Error() == "redis: nil" {
   // 没有获取到值,说明目前没有人持有锁
   fmt.Printf("线程 %s 开始加锁\n", threadId)
   err, ok := r.lock("Jan")
   if err != nil {
    return
   }
   if !ok {
    if !r.retryLock() {
     fmt.Printf("线程 %s 加锁失败\n", threadId)
     return
    }
   }
   fmt.Printf("线程 %s 已加锁\n", threadId)
   // 加锁后执行相应操作
   time.Sleep(5 * time.Second)
   // 释放锁
   r.unlock()
   fmt.Printf("线程 %s 已释放锁\n", threadId)
   return
  } else if err != nil {
   return
  }
  err, t := r.getTTL()
  if err != nil {
   return
  }
  if t > 0 {
   fmt.Printf("线程 %s 锁被占用, %f 秒后重试\n", threadId, (t / 10).Seconds())
   time.Sleep(t / 10)
  }
 }
}

func main() {
 var r redisClient
 address := "127.0.0.1:6379"
 cl := connRedisSingle(address, "")
 defer cl.Close()
 r = redisClient(*cl)
 // 线程1获取锁
 go r.threadLock("1")
 // time.Sleep(10 * time.Millisecond)
 // 线程2获取锁
 go r.threadLock("2")
 select {}
}

3.位图

Redis中有一种特殊的存储类型,二进制位对象——位图(bitmap),存储上是按照”字符串”处理,如果看官知晓布隆过滤器,就应该十分熟悉位图了,其特点很明显,

就是每位只有0/1两种状态值,如下图示例,然后就是体积小,10M可存储8千万bit位,最大允许512M,可存储40亿bit位。

Redis-cli中只有6个命令:

  • SETBIT:设置或清除指定偏移量上的位(bit)。当 key 不存在时,自动生成一个新的字符串值。字符串会进行伸展(grown)以确保它可以将 value 保存在指定的偏移量上。当字符串值进行伸展时,空白位置以 0 填充。
  • GETBIT:对 key 所储存的字符串值,获取指定偏移量上的位(bit)。当 offset 比字符串值的长度大,或者 key 不存在时,返回 0。
  • BITCOUNT:计算给定字符串中,被设置为 1 的比特位的数量。对一个不存在的 key 进行 BITCOUNT 操作,结果为 0 。
  • BITPOS:返回位图中第一个值为 bit 的二进制位的位置。
  • BITTOP:对一个或多个保存二进制位的字符串 key 进行位元操作,并将结果保存到 destkey 上。
  • BITFIELD:将一个 Redis 字符串看作是一个由二进制位组成的数组, 并对这个数组中储存的长度不同的整数进行访问。

具体代码:

package main

import (
 "fmt"
 "github.com/go-redis/redis"
)

func main() {
 redisDB := redis.NewClient(&redis.Options{
  Addr:     "127.0.0.1:6379",
  Password: "",
  DB:       0,
 })
 var (
  getBitInt   *redis.IntCmd
  andBitInt   *redis.IntCmd
  posBitInt   *redis.IntCmd
  countBitInt *redis.IntCmd
  countBit    *redis.BitCount
 )

 redisDB.SetBit("bit_key", 1000, 1)
 getBitInt = redisDB.GetBit("bit_key", 1000)
 countBitInt = redisDB.BitCount("bit_key", countBit)
 // 对"bit_key1","bit_key2"做AND位运算,并保存到"dest_key"
 andBitInt = redisDB.BitOpAnd("dest_key""bit_key1""bit_key2")
 // redisDB.BitPos("bit_key",false);
 posBitInt = redisDB.BitPos("bit_key", 1, 2)

 fmt.Println(getBitInt)
 fmt.Println(andBitInt)
 fmt.Println(countBitInt)
 fmt.Println(posBitInt)
}

4.事务

Redis使用MULTI标记一个事务块的开始。事务块内的多条命令会按照先后顺序被放进一个队列当中,最后由 EXEC 命令原子性(atomic)地执行。

具体代码:

package main

import (
 "fmt"
 "github.com/go-redis/redis"
 "strings"
 "time"
)

func main() {
 redisDB := redis.NewClient(&redis.Options{
  Addr:     "127.0.0.1:6379",
  Password: "",
  DB:       0,
 })

 // 开启一个TxPipeline事务
 pipe := redisDB.TxPipeline()
 defer pipe.Close()

 // 这里是放命令
 pipe.SetNX("Freeze:DL201544a00007:a00002:a1:300""200", 30*time.Second)
 pipe.SetNX("Freeze:DL201544a00008:a00002:a1:400""400", 30*time.Second)
 pipe.SetNX("Freeze:DL201544a00009:a00002:a1:500""500", 30*time.Second)
 pipe.SetNX("Freeze:DL201544a0000991123:a000091123:a1:592100""5923100", 30*time.Second)
 // 通过Exec函数提交redis事务
 r, err := pipe.Exec()
 if err != nil {
  // 取消提交
  pipe.Discard()
 }
 // 这里调用exec执行刚刚加的命令,redis的事务和mysql事务是不一样的,一般情况下,这里的err出错是在编译期间出错,运行期间是很少出错的
 // mysql的事务是具有原子性,一致性,隔离性 ,持久性

 // redis事务三阶段:
 //
 // 开启:以MULTI开始一个事务
 // 入队:将多个命令入队到事务中,接到这些命令并不会立即执行,而是放到等待执行的事务队列里面
 // 执行:由EXEC命令触发事务
 // redis事务三大特性:
 //
 // 单独的隔离操作:事务中的所有命令都会序列化、按顺序地执行。事务在执行的过程中,不会被其他客户端发送来的命令请求所打断。
 // 没有隔离级别的概念:队列中的命令没有提交之前都不会实际的被执行,因为事务提交前任何指令都不会被实际执行,也就不存在”事务内的查询要看到事务里的更新,在事务外查询不能看到”这个让人万分头痛的问题
 // 不保证原子性:redis同一个事务中如果有一条命令执行失败,其后的命令仍然会被执行,没有回滚
 //
 // 所以,如果出现,第一条数据处理是true,其他的处理是false,那需要把其中的为true的数据,撤销其操作,这里也只能手动去撤销

 var resultmap []map[string]string
 resultmap = make([]map[string]string, 0)
 for _, v := range r {
  params := fmt.Sprintf("%s", v)
  res := strings.Split(params, " ")
  fmt.Println("key=", res[1])
  // 处理结果
  fmt.Println("res=", res[6])

  if res[6] == "true" {
   var model map[string]string
   model = make(map[string]string, 0)
   model["key"] = res[1]
   model["result"] = res[6]
   resultmap = append(resultmap, model)
  }
 }
 // 这堆代码是为了事务处理结果不一致导致的问题
 if len(r) != len(resultmap) {
  for _, vb := range resultmap {
   redisDB.Del(vb["key"]).Result()
  }
 }
}

5.Lua脚本

Redis 使用单个 Lua 解释器去运行所有脚本,并且, Redis 也保证脚本会以原子性(atomic)的方式执行:当某个脚本正在运行的时候,

不会有其他脚本或 Redis 命令被执行。这和使用 MULTI / EXEC 包围的事务很类似。

Redis-cli模式,命令行格式:

EVAL script numkeys key [key …] arg [arg …]

Lua 中通过全局变量 KEYS 数组,用 1 为基址的形式访问键名参数;不是键名参数的附加参数 arg [arg ...] ,可以在 Lua 中通过全局变量 ARGV 数组访问。

在 Lua 脚本中,可以使用两个不同函数来执行 Redis 命令,它们分别是:redis.call()和redis.pcall(),这两个函数的唯一区别在于它们使用不同的方式处理

执行命令所产生的异常。对于Lua脚本,还有SCRIPT 命令进行管理,在此不表。

具体代码:

package main

import (
 "github.com/garyburd/redigo/redis"
)

const SCRIPT_VOICE2 = `
local voiceGiftNumKey = tostring(KEYS[1])
local voiceTemKey = tostring(KEYS[2])
local giftCom = tostring(ARGV[1])
local gnum = tonumber(ARGV[2])
local guest = tonumber(ARGV[3])
local tmpInfomarshal = tostring(ARGV[4])
local keyValidity = tonumber(ARGV[5])

if giftCom ~= 'A' and giftCom ~= 'B' and giftCom ~= 'C'
then 
  return 3
end

redis.call('hincrby',voiceGiftNumKey,giftCom,gnum)

local giftNumArray = redis.call('HMGET',voiceGiftNumKey,'A','B','C')

local minGiftNum = 99999
for i = 1,3 do
  local tmpNum = 0
  if giftNumArray[i] ~= false
  then
    tmpNum = tonumber(giftNumArray[i])
  end

  if tmpNum <= 0
  then
    return 2
  else
    if tmpNum< minGiftNum
    then
      minGiftNum = tmpNum
    end
  end
end

local newArray={}
for i = 1,3 do
  local tmpNum2 = tonumber(giftNumArray[i])
  newArray[i]=tmpNum2-minGiftNum
end
redis.call('HMSET',voiceGiftNumKey,'A',newArray[1],'B',newArray[2],'C',newArray[3])
if keyValidity > 0
then
 local voice= redis.call('SET',voiceTemKey,tmpInfomarshal,'EX',keyValidity,'NX')
 if voice ~= false
 then
  return 1
 end
end
return 0
`

var voiceScript2 = redis.NewScript(2, SCRIPT_VOICE2)

func main() {

 c, err := redis.Dial("tcp""127.0.0.1:6379")
 if err != nil {
  return
 }
 defer c.Close()

 reply, err := redis.Int(voiceScript2.Do(c, "Voice:RoomID:{686}:SeatId:{7c871778e2c011ea98b600163e012514}:TemInfo""voiceGiftNum:{86}:{86}""A", 1, 86))

 if reply == 0 {
  return
 }
}


以上是关于redis高级用法的主要内容,如果未能解决你的问题,请参考以下文章

redis高级用法

Redis高级用法

Redis高级用法

Python切片中的误区与高级用法

前端框架怎么用??用的好处是什么?

c_cpp 加载源图像固定用法(代码片段,不全)