用Redis实现分布式锁 与 实现任务队列

Posted 架构师

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了用Redis实现分布式锁 与 实现任务队列相关的知识,希望对你有一定的参考价值。

架构师(JiaGouX)
我们都是架构师!



  这一次总结和分享用Redis实现分布式锁 与 实现任务队列 这两大强大的功能。先扯点个人观点,之前我看了一篇博文说博客园的文章大部分都是分享代码,博文里强调说分享思路比分享代码更重要(貌似大概是这个意思,若有误请谅解),但我觉得,分享思路固然重要,但有了思路,却没有实现的代码,那会让人觉得很浮夸的,在工作中的程序猿都知道,你去实现一个功能模块,一段代码,虽然你有了思路,但是实现的过程也是很耗时的,特别是代码调试,还有各种测试等等。所以我认为,思路+代码,才是一篇好博文的主要核心。

  直接进入主题。

  一、前言

  双十一刚过不久,大家都知道在天猫、京东、苏宁等等电商网站上有很多秒杀活动,例如在某一个时刻抢购一个原价1999现在秒杀价只要999的手机时,会迎来一个用户请求的高峰期,可能会有几十万几百万的并发量,来抢这个手机,在高并发的情形下会对数据库服务器或者是文件服务器应用服务器造成巨大的压力,严重时说不定就宕机了,另一个问题是,秒杀的东西都是有量的,例如一款手机只有10台的量秒杀,那么,在高并发的情况下,成千上万条数据更新数据库(例如10台的量被人抢一台就会在数据集某些记录下 减1),那次这个时候的先后顺序是很乱的,很容易出现10台的量,抢到的人就不止10个这种严重的问题。那么,以后所说的问题我们该如何去解决呢? 接下来我所分享的技术就可以拿来处理以上的问题: 分布式锁 和 任务队列。

  二、实现思路

  1.Redis实现分布式锁思路

    思路很简单,主要用到的redis函数是setnx(),这个应该是实现分布式锁最主要的函数。首先是将某一任务标识名(这里用Lock:order作为标识名的例子)作为键存到redis里,并为其设个过期时间,如果是还有Lock:order请求过来,先是通过setnx()看看是否能将Lock:order插入到redis里,可以的话就返回true,不可以就返回false。当然,在我的代码里会比这个思路复杂一些,我会在分析代码时进一步说明。

  2.Redis实现任务队列

    这里的实现会用到上面的Redis分布式的锁机制,主要是用到了Redis里的有序集合这一数据结构。例如入队时,通过zset的add()函数进行入队,而出对时,可以用到zset的getScore()函数。另外还可以弹出顶部的几个任务。

  以上就是实现 分布式锁 和 任务队列 的简单思路,如果你看完有点模棱两可,那请看接下来的代码实现。

  三、代码分析

  (一)先来分析Redis分布式锁的代码实现  

    (1)为避免特殊原因导致锁无法释放,在加锁成功后,锁会被赋予一个生存时间(通过lock方法的参数设置或者使用默认值),超出生存时间锁会被自动释放锁的生存时间默认比较短(秒级),因此,若需要长时间加锁,可以通过expire方法延长锁的生存时间为适当时间,比如在循环内。

    (2)系统级的锁当进程无论何种原因时出现crash时,操作系统会自己回收锁,所以不会出现资源丢失,但分布式锁不用,若一次性设置很长时间,一旦由于各种原因出现进程crash 或者其他异常导致unlock未被调用时,则该锁在剩下的时间就会变成垃圾锁,导致其他进程或者进程重启后无法进入加锁区域。

    先看加锁的实现代码:这里需要主要两个参数,一个是$timeout,这个是循环获取锁的等待时间,在这个时间内会一直尝试获取锁知道超时,如果为0,则表示获取锁失败后直接返回而不再等待;另一个重要参数的$expire,这个参数指当前锁的最大生存时间,以秒为单位的,它必须大于0,如果超过生存时间锁仍未被释放,则系统会自动强制释放。这个参数的最要作用请看上面的(1)里的解释。

    这里先取得当前时间,然后再获取到锁失败时的等待超时的时刻(是个时间戳),再获取到锁的最大生存时刻是多少。这里redis的key用这种格式:"Lock:锁的标识名",这里就开始进入循环了,先是插入数据到redis里,使用setnx()函数,这函数的意思是,如果该键不存在则插入数据,将最大生存时刻作为值存储,假如插入成功,则对该键进行失效时间的设置,并将该键放在$lockedName数组里,返回true,也就是上锁成功;如果该键存在,则不会插入操作了,这里有一步严谨的操作,那就是取得当前键的剩余时间,假如这个时间小于0,表示key上没有设置生存时间(key是不会不存在的,因为前面setnx会自动创建)如果出现这种状况,那就是进程的某个实例setnx成功后 crash 导致紧跟着的expire没有被调用,这时可以直接设置expire并把锁纳为己用。如果没设置锁失败的等待时间 或者 已超过最大等待时间了,那就退出循环,反之则 隔 $waitIntervalUs 后继续 请求。 这就是加锁的整一个代码分析。

/**

* 加锁

* @param [type] $name 锁的标识名

* @param integer $timeout 循环获取锁的等待超时时间,在此时间内会一直尝试获取锁直到超时,为0表示失败后直接返回不等待

* @param integer $expire 当前锁的最大生存时间(秒),必须大于0,如果超过生存时间锁仍未被释放,则系统会自动强制释放

* @param integer $waitIntervalUs 获取锁失败后挂起再试的时间间隔(微秒)

* @return [type] [description]

*/

public function lock($name, $timeout = 0, $expire = 15, $waitIntervalUs = 100000) {

if ($name == null) return false;


//取得当前时间

$now = time();

//获取锁失败时的等待超时时刻

$timeoutAt = $now + $timeout;

//锁的最大生存时刻

$expireAt = $now + $expire;


$redisKey = "Lock:{$name}";

while (true) {

//将rediskey的最大生存时刻存到redis里,过了这个时刻该锁会被自动释放

$result = $this->redisString->setnx($redisKey, $expireAt);


if ($result != false) {

//设置key的失效时间

$this->redisString->expire($redisKey, $expireAt);

//将锁标志放到lockedNames数组里

$this->lockedNames[$name] = $expireAt;

return true;

}


//以秒为单位,返回给定key的剩余生存时间

$ttl = $this->redisString->ttl($redisKey);


//ttl小于0 表示key上没有设置生存时间(key是不会不存在的,因为前面setnx会自动创建)

//如果出现这种状况,那就是进程的某个实例setnx成功后 crash 导致紧跟着的expire没有被调用

//这时可以直接设置expire并把锁纳为己用

if ($ttl < 0) {

$this->redisString->set($redisKey, $expireAt);

$this->lockedNames[$name] = $expireAt;

return true;

}


/*****循环请求锁部分*****/

//如果没设置锁失败的等待时间 或者 已超过最大等待时间了,那就退出

if ($timeout <= 0 || $timeoutAt < microtime(true)) break;


//隔 $waitIntervalUs 后继续 请求

usleep($waitIntervalUs);


}


return false;

}

    接着看解锁的代码分析:解锁就简单多了,传入参数就是锁标识,先是判断是否存在该锁,存在的话,就从redis里面通过deleteKey()函数删除掉锁标识即可。

/**

* 解锁

* @param [type] $name [description]

* @return [type] [description]

*/

public function unlock($name) {

//先判断是否存在此锁

if ($this->isLocking($name)) {

//删除锁

if ($this->redisString->deleteKey("Lock:$name")) {

//清掉lockedNames里的锁标志

unset($this->lockedNames[$name]);

return true;

}

}

return false;

}

   在贴上删除掉所有锁的方法,其实都一个样,多了个循环遍历而已。

/**

* 释放当前所有获得的锁

* @return [type] [description]

*/

public function unlockAll() {

//此标志是用来标志是否释放所有锁成功

$allSuccess = true;

foreach ($this->lockedNames as $name => $expireAt) {

if (false === $this->unlock($name)) {

$allSuccess = false;

}

}

return $allSuccess;

}

  以上就是用Redis实现分布式锁的整一套思路和代码实现的总结和分享,这里我附上正一个实现类的代码,代码里我基本上对每一行进行了注释,方便大家快速看懂并且能模拟应用。想要深入了解的请看整个类的代码:

/**

*在redis上实现分布式锁

*/

class RedisLock {

private $redisString;

private $lockedNames = [];


public function __construct($param = NULL) {

$this->redisString = RedisFactory::get($param)->string;

}


/**

* 加锁

* @param [type] $name 锁的标识名

* @param integer $timeout 循环获取锁的等待超时时间,在此时间内会一直尝试获取锁直到超时,为0表示失败后直接返回不等待

* @param integer $expire 当前锁的最大生存时间(秒),必须大于0,如果超过生存时间锁仍未被释放,则系统会自动强制释放

* @param integer $waitIntervalUs 获取锁失败后挂起再试的时间间隔(微秒)

* @return [type] [description]

*/

public function lock($name, $timeout = 0, $expire = 15, $waitIntervalUs = 100000) {

if ($name == null) return false;


//取得当前时间

$now = time();

//获取锁失败时的等待超时时刻

$timeoutAt = $now + $timeout;

//锁的最大生存时刻

$expireAt = $now + $expire;


$redisKey = "Lock:{$name}";

while (true) {

//将rediskey的最大生存时刻存到redis里,过了这个时刻该锁会被自动释放

$result = $this->redisString->setnx($redisKey, $expireAt);


if ($result != false) {

//设置key的失效时间

$this->redisString->expire($redisKey, $expireAt);

//将锁标志放到lockedNames数组里

$this->lockedNames[$name] = $expireAt;

return true;

}


//以秒为单位,返回给定key的剩余生存时间

$ttl = $this->redisString->ttl($redisKey);


//ttl小于0 表示key上没有设置生存时间(key是不会不存在的,因为前面setnx会自动创建)

//如果出现这种状况,那就是进程的某个实例setnx成功后 crash 导致紧跟着的expire没有被调用

//这时可以直接设置expire并把锁纳为己用

if ($ttl < 0) {

$this->redisString->set($redisKey, $expireAt);

$this->lockedNames[$name] = $expireAt;

return true;

}


/*****循环请求锁部分*****/

//如果没设置锁失败的等待时间 或者 已超过最大等待时间了,那就退出

if ($timeout <= 0 || $timeoutAt < microtime(true)) break;


//隔 $waitIntervalUs 后继续 请求

usleep($waitIntervalUs);


}


return false;

}


/**

* 解锁

* @param [type] $name [description]

* @return [type] [description]

*/

public function unlock($name) {

//先判断是否存在此锁

if ($this->isLocking($name)) {

//删除锁

if ($this->redisString->deleteKey("Lock:$name")) {

//清掉lockedNames里的锁标志

unset($this->lockedNames[$name]);

return true;

}

}

return false;

}


/**

* 释放当前所有获得的锁

* @return [type] [description]

*/

public function unlockAll() {

//此标志是用来标志是否释放所有锁成功

$allSuccess = true;

foreach ($this->lockedNames as $name => $expireAt) {

if (false === $this->unlock($name)) {

$allSuccess = false;

}

}

return $allSuccess;

}


/**

* 给当前所增加指定生存时间,必须大于0

* @param [type] $name [description]

* @return [type] [description]

*/

public function expire($name, $expire) {

//先判断是否存在该锁

if ($this->isLocking($name)) {

//所指定的生存时间必须大于0

$expire = max($expire, 1);

//增加锁生存时间

if ($this->redisString->expire("Lock:$name", $expire)) {

return true;

}

}

return false;

}


/**

* 判断当前是否拥有指定名字的所

* @param [type] $name [description]

* @return boolean [description]

*/

public function isLocking($name) {

//先看lonkedName[$name]是否存在该锁标志名

if (isset($this->lockedNames[$name])) {

//从redis返回该锁的生存时间

return (string)$this->lockedNames[$name] = (string)$this->redisString->get("Lock:$name");

}


return false;

}


}


Redis实现分布式锁



来源:博客园

原文:http://www.cnblogs.com/it-cen/p/4984272.html

转载文章,向原作者致敬!如有侵权或不周之处,敬请劳烦联系若飞(微信:13511421494)马上删除,谢谢!

·END·






架构师

我们都是架构师!


架构师订阅号,关注获取更多技术分享


现已开通多个微信群,有兴趣交流学习的同学

可加若飞微信:13511421494进群

合作邮箱:admin@137x.com


以上是关于用Redis实现分布式锁 与 实现任务队列的主要内容,如果未能解决你的问题,请参考以下文章

Redis实现分布式锁与任务队列的思路

基于Redis实现分布式锁以及任务队列

(高级篇)基于Redis实现分布式锁以及任务队列

分布式定时任务的redis锁实现

分布式锁实现,与分布式定时任务

redis 分布式锁