Redis高级用法与删除方式
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Redis高级用法与删除方式相关的知识,希望对你有一定的参考价值。
参考技术A 一、redis为什么这么快1、完全基于内存,绝大部分请求是纯粹的内存操作,非常快速。数据存在内存中,类似于HashMap,HashMap的优势就是查找和操作的时间复杂度都是O(1);
2、数据结构简单,对数据操作也简单,Redis中的数据结构是专门进行设计的;
3、采用单线程,避免了不必要的上下文切换和竞争条件,也不存在多进程或者多线程导致的切换而消耗 CPU,不用去考虑各种锁的问题,不存在加锁释放锁操作,没有因为可能出现死锁而导致的性能消耗;
4、使用多路I/O复用模型,非阻塞IO;
5、使用底层模型不同,它们之间底层实现方式以及与客户端之间通信的应用协议不一样,Redis直接自己构建了VM 机制 ,因为一般的系统调用系统函数的话,会浪费一定的时间去移动和请求;
以上几点都比较好理解,下边我们针对多路 I/O 复用模型进行简单的探讨:
(1)多路 I/O 复用模型
多路I/O复用模型是利用 select、poll、epoll 可以同时监察多个流的 I/O 事件的能力,在空闲的时候,会把当前线程阻塞掉,当有一个或多个流有 I/O 事件时,就从阻塞态中唤醒,于是程序就会轮询一遍所有的流(epoll 是只轮询那些真正发出了事件的流),并且只依次顺序的处理就绪的流,这种做法就避免了大量的无用操作。
这里“多路”指的是多个网络连接,“复用”指的是复用同一个线程。采用多路 I/O 复用技术可以让单个线程高效的处理多个连接请求(尽量减少网络 IO 的时间消耗),且 Redis 在内存中操作数据的速度非常快,也就是说内存内的操作不会成为影响Redis性能的瓶颈,主要由以上几点造就了 Redis 具有很高的吞吐量。
采用单线程,避免了不必要的上下文切换和竞争条件,也不存在多进程或者多线程导致的切换而消耗 CPU,不用去考虑各种锁的问题,不存在加锁释放锁操作,没有因为可能出现死锁而导致的性能消耗;完全基于内存,没有磁盘IO的限制。在redis中对数据进行持久化的时候
resp通信协议
使用底层模型不同,它们之间底层实现方式以及与客户端之间通信的应用协议不一样,Redis直接自己构建了VM 机制 ,因为一般的系统调用系统函数的话,会浪费一定的时间去移动和请求;
二、高级用法
1、位图
bitmap:体积小,10M可存储8千万bit位,最大允许访问512M,可存储40亿bit位
2、布隆过滤器
由一串很长的二进制向量组成,可以将其看成一个二进制数组(里面存放的都为0或者1,初始默认值为0)。向里面添加元素时,会使用多个hash函数进行计算取模得出一个数组的下标,每个hash都会算出不同的位置,再把这些位置都置为1,就完成了add操作
3、GEO
将指定的地理空间位置(经纬度及名称)添加到指定的key中,必须经度在纬度之前。(场景:微信摇一摇;周边餐饮;快递距离等)
4、HyperLogLog
用来做基数统计的算法,HyperLogLog 的优点是,在输入元素的数量或者体积非常非常大时,计算基数所需的空间总是固定 的、并且是很小的。
HyperLogLog 可以接受多个元素作为输入,并给出输入元素的基数估算值:
• 基数:集合中不同元素的数量。比如 'apple', 'banana', 'cherry', 'banana', 'apple' 的基数就是 3 。
• 估算值:算法给出的基数并不是精确的,可能会比实际稍微多一些或者稍微少一些,但会控制在合
理的范围之内。
HyperLogLog 的优点是,即使输入元素的数量或者体积非常非常大,计算基数所需的空间总是固定
的、并且是很小的。
在 Redis 里面,每个 HyperLogLog 键只需要花费 12 KB 内存,就可以计算接近 2^64 个不同元素的基
数。这和计算基数时,元素越多耗费内存就越多的集合形成鲜明对比。
但是,因为 HyperLogLog 只会根据输入元素来计算基数,而不会储存输入元素本身,所以
HyperLogLog 不能像集合那样,返回输入的各个元素。
提问:为什么不使用redis替代MQ
1、redis无法对消息持久化存储
2、redis没有提供消息传输保障
3、redis协议支持较少
三、删除方式
1、被动删除:惰性删除
客户端访问 key 的时候,redis 对 key 的过期时间进行检查,如果过期了就立即删除,不会给你返回任何东西。
2、主动删除:定期删除
redis 会将每个设置了过期时间的 key 放入到一个独立的字典中,以后会定期遍历这个字典来删除到期的 key。
3、主动删除:redis会周期性的随机测试一批设置了过期时间的key并进行处理(redis-3.0.0中默认值是10,代表每秒钟调用10次后台任务)
4、LRU:这个缓存算法将最近使用的条目存放到靠近缓存顶部的位置。当一个新条目被访问时,LRU将它放置到缓存的顶部。当缓存达到极限时,较早之前访问的条目将从缓存底部开始被移除。这里会使用到昂贵的算法,而且它需要记录“年龄位”来精确显示条目是何时被访问的。此外,当一个LRU缓存算法删除某个条目后,“年龄位”将随其他条目发生改变。
5、LFU:这个缓存算法使用一个计数器来记录条目被访问的频率。通过使用LFU缓存算法,最低访问数的条目首先被移除。这个方法并不经常使用,因为它无法对一个拥有最初高访问率之后长时间没有被访问的条目缓存负责。
redis高级用法
redis高级用法
Redis6.0已经低调的发布了稳定版,最大的变化就是支持I/O多线程,但旧版本就真的是单线程吗,事情往往不是这么简单,
这里的单线程指的是只有一个执行线程串行处理命令,再配合多路复用机制,实际上数据持久化、主从同步、连接释放等都
有其他线程来处理。既然6.0都出来了,之前的文章我也说了不少Redis相关的了,借此契机,我们就来看看Redis的一些高级用法吧。
目标:
-
Pipeline管道的使用 -
位图bitmap使用 -
Redis红锁原理 -
Redis事务 -
Lua脚本
1.Pipeline管道的使用
redis客户端执行一条命令分4个过程:
发送命令-〉命令排队-〉命令执行-〉返回结果
这个过程称为Round trip time(简称RTT, 往返时间),mget mset有效节约了RTT,但大部分命令(如hgetall,并没有mhgetall)不支持批量操作,需要消耗N次RTT ,这个时候需要pipeline来解决这个问题
-
使用pipeline组装的命令个数不能太多,不然数据量过大,增加客户端的等待时间,还可能造成网络阻塞,可以将大量命令的拆分多个小的pipeline命令完成。
-
高频命令场景下,应避免使用管道,因为需要先将全部执行命令放入管道,会耗时。另外,需要使用返回值的情况也不建议使用,
同步所有管道返回结果也是个耗时的过程!管道无法提供原子性/事务保障。
-
非pipeline和pipeline两者性能对比,明显看出使用pipeline性能要快很多
具体代码:
package main
import (
"fmt"
"github.com/go-redis/redis"
)
func main() {
redisDB := redis.NewClient(&redis.Options{
Addr: "127.0.0.1:6379",
Password: "",
DB: 0,
})
var (
zreCmd *redis.Cmd
ZscCmd *redis.Cmd
myScore int64
myGouLiang int64
)
if _, err := redisDB.Pipelined(func(pipe redis.Pipeliner) error {
zreCmd = pipe.Do("Incrby", "aaaaa", "1")
ZscCmd = pipe.Do("Incrby", "bbbbb", "2")
return nil
}); err != nil && err != redis.Nil {
return
}
if zreCmd.Err() == nil {
myScore, _ = zreCmd.Int64()
myScore += 1
}
if ZscCmd.Err() == nil {
myGouLiang, _ = ZscCmd.Int64()
}
fmt.Println(myGouLiang)
fmt.Println(myScore)
}
2.分布式锁
首先,一个好的分布式锁,应该具有以下特征:
-
互斥——同时刻只能有一个持有者; -
可重入——同一个持有者可多次进入; -
阻塞——多个锁请求者时,未获得锁的阻塞等待; -
无死锁——持有锁的客户端崩溃(crashed)或者网络被分裂(gets partitioned),锁仍然可以被获取; -
容错——只要大部分节点正常,仍然可以获取和释放锁;
单Redis实例下,通过SETNX命令来实现Redis分布式锁,已不推荐使用,如果说使用SET命令配合NX参数或许会让面试官更为满意,
因为SET命令可带有EX/PX/NX/XX
参数,更为强大,可以完成更复杂业务,但这些在此不表,我想说的是分布式多实例下的红锁算法:
在Redis的分布式环境中,我们假设有N(建议为5)个Redis master,这些节点完全互相独立,不存在主从复制或者其他集群协调机制。
获取一个红锁的步骤如下:
-
获取当前系统时间,毫秒为单位 -
依次(同时并发地)尝试从N个实例,使用相同的Key和随机值(全局唯一)获取锁 -
客户端使用当前时间减去开始获取锁时间(步骤1记录的时间)即为获取锁消耗的时间。当且仅当从大多数( N/2+1
)的Redis节点都获取到锁,并且消耗时间小于锁失效时间时,红锁才算获取成功 -
如果获取到了红锁,key真正的有效时间等于有效时间减去获取锁消耗的时间(步骤3中的计算结果) -
如果获取红锁失败(没有在至少 N/2+1
个Redis实例取到锁或者取锁时间超过了锁的有效时间),客户端应该在所有的Redis实例上进行解锁(即使某些Redis实例根本就没有加锁成功)
注:
A. 锁指单Redis实例上使用如SET命令获取的锁,红锁是将多个单Redis实例锁组合为一个锁来管理,从而避免单点故障
B. 步骤2中,当向Redis设置锁时,客户端应该设置一个网络连接和响应超时时间,这个超时时间应该小于锁的失效时间。例如你的锁自动失效时间为10秒,
则超时时间应该在5-50毫秒之间。这样可以避免服务器端Redis已经挂掉的情况下,客户端还在死死地等待响应结果。如果服务器端没有在规定时间内响应,
客户端应该尽快尝试另外一个Redis实例。
C. 当客户端无法取到红锁时,应该在一个随机延迟后重试,防止多个客户端在同时抢夺同一资源的锁(这样会导致脑裂,没有人会取到锁),
并且当客户端从大多数Redis实例获取锁失败时,应该尽快地释放(部分)已经成功取到的锁,这样其他的客户端就不必非得等到锁过完“有效时间”才能取到,
具体代码:
package main
import (
"fmt"
"github.com/go-redis/redis"
"time"
)
type redisClient redis.Client
func connRedisCluster(address []string, password string) *redis.ClusterClient {
conf := redis.ClusterOptions{
Addrs: address,
Password: password,
}
return redis.NewClusterClient(&conf)
}
func connRedisSingle(addr, password string) *redis.Client {
conf := redis.Options{
Addr: addr,
Password: password,
}
return redis.NewClient(&conf)
}
func (r *redisClient) lock(value string) (error, bool) {
ret := r.SetNX("hello", value, time.Second*10)
if err := ret.Err(); err != nil {
fmt.Printf("set value %s error: %v\n", value, err)
return err, false
}
return nil, ret.Val()
}
func (r *redisClient) unlock() bool {
ret := r.Del("hello")
if err := ret.Err(); err != nil {
fmt.Println("unlock error: ", err)
return false
}
return true
}
func (r *redisClient) retryLock() bool {
ok := false
for !ok {
err, t := r.getTTL()
if err != nil {
return false
}
if t > 0 {
fmt.Printf("锁被抢占, %f 秒后重试...\n", (t / 10).Seconds())
time.Sleep(t / 10)
}
err, ok = r.lock("Jan")
if err != nil {
return false
}
}
return ok
}
func (r *redisClient) getLock() (error, string) {
ret := r.Get("hello")
if err := ret.Err(); err != nil {
fmt.Println("get lock error: ", err)
return err, ""
}
rt, _ := ret.Bytes()
return nil, string(rt)
}
// 获取锁的过期剩余时间
func (r *redisClient) getTTL() (error, time.Duration) {
ret := r.TTL("hello")
if err := ret.Err(); err != nil {
fmt.Println("get TTL error: ", err)
return err, 0
}
return nil, ret.Val()
}
func (r *redisClient) threadLock(threadId string) {
for {
err, _ := r.getLock()
if err != nil && err.Error() == "redis: nil" {
// 没有获取到值,说明目前没有人持有锁
fmt.Printf("线程 %s 开始加锁\n", threadId)
err, ok := r.lock("Jan")
if err != nil {
return
}
if !ok {
if !r.retryLock() {
fmt.Printf("线程 %s 加锁失败\n", threadId)
return
}
}
fmt.Printf("线程 %s 已加锁\n", threadId)
// 加锁后执行相应操作
time.Sleep(5 * time.Second)
// 释放锁
r.unlock()
fmt.Printf("线程 %s 已释放锁\n", threadId)
return
} else if err != nil {
return
}
err, t := r.getTTL()
if err != nil {
return
}
if t > 0 {
fmt.Printf("线程 %s 锁被占用, %f 秒后重试\n", threadId, (t / 10).Seconds())
time.Sleep(t / 10)
}
}
}
func main() {
var r redisClient
address := "127.0.0.1:6379"
cl := connRedisSingle(address, "")
defer cl.Close()
r = redisClient(*cl)
// 线程1获取锁
go r.threadLock("1")
// time.Sleep(10 * time.Millisecond)
// 线程2获取锁
go r.threadLock("2")
select {}
}
3.位图
Redis中有一种特殊的存储类型,二进制位对象——位图(bitmap),存储上是按照”字符串”处理,如果看官知晓布隆过滤器,就应该十分熟悉位图了,其特点很明显,
就是每位只有0/1两种状态值,如下图示例,然后就是体积小,10M可存储8千万bit位,最大允许512M,可存储40亿bit位。
Redis-cli中只有6个命令:
-
SETBIT:设置或清除指定偏移量上的位(bit)。当 key 不存在时,自动生成一个新的字符串值。字符串会进行伸展(grown)以确保它可以将 value 保存在指定的偏移量上。当字符串值进行伸展时,空白位置以 0 填充。 -
GETBIT:对 key 所储存的字符串值,获取指定偏移量上的位(bit)。当 offset 比字符串值的长度大,或者 key 不存在时,返回 0。 -
BITCOUNT:计算给定字符串中,被设置为 1 的比特位的数量。对一个不存在的 key 进行 BITCOUNT 操作,结果为 0 。 -
BITPOS:返回位图中第一个值为 bit 的二进制位的位置。 -
BITTOP:对一个或多个保存二进制位的字符串 key 进行位元操作,并将结果保存到 destkey 上。 -
BITFIELD:将一个 Redis 字符串看作是一个由二进制位组成的数组, 并对这个数组中储存的长度不同的整数进行访问。
具体代码:
package main
import (
"fmt"
"github.com/go-redis/redis"
)
func main() {
redisDB := redis.NewClient(&redis.Options{
Addr: "127.0.0.1:6379",
Password: "",
DB: 0,
})
var (
getBitInt *redis.IntCmd
andBitInt *redis.IntCmd
posBitInt *redis.IntCmd
countBitInt *redis.IntCmd
countBit *redis.BitCount
)
redisDB.SetBit("bit_key", 1000, 1)
getBitInt = redisDB.GetBit("bit_key", 1000)
countBitInt = redisDB.BitCount("bit_key", countBit)
// 对"bit_key1","bit_key2"做AND位运算,并保存到"dest_key"中
andBitInt = redisDB.BitOpAnd("dest_key", "bit_key1", "bit_key2")
// redisDB.BitPos("bit_key",false);
posBitInt = redisDB.BitPos("bit_key", 1, 2)
fmt.Println(getBitInt)
fmt.Println(andBitInt)
fmt.Println(countBitInt)
fmt.Println(posBitInt)
}
4.事务
Redis使用MULTI标记一个事务块的开始。事务块内的多条命令会按照先后顺序被放进一个队列当中,最后由 EXEC 命令原子性(atomic)地执行。
具体代码:
package main
import (
"fmt"
"github.com/go-redis/redis"
"strings"
"time"
)
func main() {
redisDB := redis.NewClient(&redis.Options{
Addr: "127.0.0.1:6379",
Password: "",
DB: 0,
})
// 开启一个TxPipeline事务
pipe := redisDB.TxPipeline()
defer pipe.Close()
// 这里是放命令
pipe.SetNX("Freeze:DL201544a00007:a00002:a1:300", "200", 30*time.Second)
pipe.SetNX("Freeze:DL201544a00008:a00002:a1:400", "400", 30*time.Second)
pipe.SetNX("Freeze:DL201544a00009:a00002:a1:500", "500", 30*time.Second)
pipe.SetNX("Freeze:DL201544a0000991123:a000091123:a1:592100", "5923100", 30*time.Second)
// 通过Exec函数提交redis事务
r, err := pipe.Exec()
if err != nil {
// 取消提交
pipe.Discard()
}
// 这里调用exec执行刚刚加的命令,redis的事务和mysql事务是不一样的,一般情况下,这里的err出错是在编译期间出错,运行期间是很少出错的
// mysql的事务是具有原子性,一致性,隔离性 ,持久性
// redis事务三阶段:
//
// 开启:以MULTI开始一个事务
// 入队:将多个命令入队到事务中,接到这些命令并不会立即执行,而是放到等待执行的事务队列里面
// 执行:由EXEC命令触发事务
// redis事务三大特性:
//
// 单独的隔离操作:事务中的所有命令都会序列化、按顺序地执行。事务在执行的过程中,不会被其他客户端发送来的命令请求所打断。
// 没有隔离级别的概念:队列中的命令没有提交之前都不会实际的被执行,因为事务提交前任何指令都不会被实际执行,也就不存在”事务内的查询要看到事务里的更新,在事务外查询不能看到”这个让人万分头痛的问题
// 不保证原子性:redis同一个事务中如果有一条命令执行失败,其后的命令仍然会被执行,没有回滚
//
// 所以,如果出现,第一条数据处理是true,其他的处理是false,那需要把其中的为true的数据,撤销其操作,这里也只能手动去撤销
var resultmap []map[string]string
resultmap = make([]map[string]string, 0)
for _, v := range r {
params := fmt.Sprintf("%s", v)
res := strings.Split(params, " ")
fmt.Println("key=", res[1])
// 处理结果
fmt.Println("res=", res[6])
if res[6] == "true" {
var model map[string]string
model = make(map[string]string, 0)
model["key"] = res[1]
model["result"] = res[6]
resultmap = append(resultmap, model)
}
}
// 这堆代码是为了事务处理结果不一致导致的问题
if len(r) != len(resultmap) {
for _, vb := range resultmap {
redisDB.Del(vb["key"]).Result()
}
}
}
5.Lua脚本
Redis 使用单个 Lua 解释器去运行所有脚本,并且, Redis 也保证脚本会以原子性(atomic)的方式执行:当某个脚本正在运行的时候,
不会有其他脚本或 Redis 命令被执行。这和使用 MULTI / EXEC 包围的事务很类似。
Redis-cli模式,命令行格式:
EVAL script numkeys key [key …] arg [arg …]
Lua 中通过全局变量 KEYS 数组,用 1 为基址的形式访问键名参数;不是键名参数的附加参数 arg [arg ...] ,可以在 Lua 中通过全局变量 ARGV 数组访问。
在 Lua 脚本中,可以使用两个不同函数来执行 Redis 命令,它们分别是:redis.call()和redis.pcall(),这两个函数的唯一区别在于它们使用不同的方式处理
执行命令所产生的异常。对于Lua脚本,还有SCRIPT 命令进行管理,在此不表。
具体代码:
package main
import (
"github.com/garyburd/redigo/redis"
)
const SCRIPT_VOICE2 = `
local voiceGiftNumKey = tostring(KEYS[1])
local voiceTemKey = tostring(KEYS[2])
local giftCom = tostring(ARGV[1])
local gnum = tonumber(ARGV[2])
local guest = tonumber(ARGV[3])
local tmpInfomarshal = tostring(ARGV[4])
local keyValidity = tonumber(ARGV[5])
if giftCom ~= 'A' and giftCom ~= 'B' and giftCom ~= 'C'
then
return 3
end
redis.call('hincrby',voiceGiftNumKey,giftCom,gnum)
local giftNumArray = redis.call('HMGET',voiceGiftNumKey,'A','B','C')
local minGiftNum = 99999
for i = 1,3 do
local tmpNum = 0
if giftNumArray[i] ~= false
then
tmpNum = tonumber(giftNumArray[i])
end
if tmpNum <= 0
then
return 2
else
if tmpNum< minGiftNum
then
minGiftNum = tmpNum
end
end
end
local newArray={}
for i = 1,3 do
local tmpNum2 = tonumber(giftNumArray[i])
newArray[i]=tmpNum2-minGiftNum
end
redis.call('HMSET',voiceGiftNumKey,'A',newArray[1],'B',newArray[2],'C',newArray[3])
if keyValidity > 0
then
local voice= redis.call('SET',voiceTemKey,tmpInfomarshal,'EX',keyValidity,'NX')
if voice ~= false
then
return 1
end
end
return 0
`
var voiceScript2 = redis.NewScript(2, SCRIPT_VOICE2)
func main() {
c, err := redis.Dial("tcp", "127.0.0.1:6379")
if err != nil {
return
}
defer c.Close()
reply, err := redis.Int(voiceScript2.Do(c, "Voice:RoomID:{686}:SeatId:{7c871778e2c011ea98b600163e012514}:TemInfo", "voiceGiftNum:{86}:{86}", "A", 1, 86))
if reply == 0 {
return
}
}
以上是关于Redis高级用法与删除方式的主要内容,如果未能解决你的问题,请参考以下文章