分布式锁的实现方式
Posted 三名狂客
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了分布式锁的实现方式相关的知识,希望对你有一定的参考价值。
一、redis分布式锁
官方叫做 RedLock 算法,是 redis 官方支持的分布式锁算法。
分布式锁有 3 个重要的考量点:
(1)互斥(只能有一个客户端获取锁)
(2)不能死锁
(3)容错(只要大部分 redis 节点创建了这把锁就可以)
/**
* 操作redis缓存类
*/
@Slf4j
@Service
public class RedisLockService
// 锁过期时间
private static final long REDIS_LOCK_EXPIRE = 5000L;
// Redis获取锁单次重试等待时间
private static final long REDIS_LOCK_WAIT = 100L;
@Resource
private RedisClient<?> redisClient;
/**
* @param key RedisKey
* @param ttl 过期时间
* @return 是否成功获取锁
*/
public boolean getRedisLock(String key, long ttl)
return getRedisLock(key, 1, ttl);
/**
* @param key RedisKey
* @param retryTimes 重试次数
* @param ttl 过期时间
* @return 是否成功获取锁
*/
public boolean getRedisLock(String key, int retryTimes, long ttl)
if (StringUtils.isBlank(key))
return false;
for (int i = 0; i < retryTimes; i++)
boolean exists = redisClient.setNxDistributedLock(key, key, ttl);
if (exists)
return true;
try
TimeUnit.MILLISECONDS.sleep(REDIS_LOCK_WAIT);
catch (Exception e)
log.error("sleep fail", e);
return false;
/**
* @param key RedisKey
* @param retryTimes 重试次数
* @return 是否成功获取锁
*/
public boolean getRedisLock(String key, int retryTimes)
return getRedisLock(key, retryTimes, REDIS_LOCK_EXPIRE);
/**
* 查询key是否过期
*
* @param key RedisKey
* @return 是否过期
*/
public boolean isExpire(String key)
Long ttl = redisClient.ttl(key);
if (ttl == null)
return true;
return ttl != -1;
/**
* 删除RedisKey
*
* @param key RedisKey
*/
public void delRedisLock(String key)
if (StringUtils.isBlank(key))
return;
redisClient.del(key);
/**
* 设置key的过期时间为5000ms
*
* @param key RedisKey
*/
public void pExpire(String key)
if (StringUtils.isBlank(key))
return;
redisClient.pExpire(key, REDIS_LOCK_EXPIRE);
/**
* 设置key的过期时间(ms)
*
* @param key RedisKey
* @param millis 过期时间
*/
public void pExpire(String key, Long millis)
if (StringUtils.isBlank(key) || millis == null || millis <= 0)
return;
redisClient.pExpire(key, millis);
import lombok.Getter;
import lombok.Setter;
import org.springframework.data.redis.connection.RedisStringCommands;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.types.Expiration;
import java.io.Serializable;
import java.util.List;
public class RedisClient<V extends Serializable>
@Getter
@Setter
private RedisTemplate<Serializable, V> redisTemplate = null;
public byte[] doSerialize(String str)
return redisTemplate.getStringSerializer().serialize(str);
public String doDeSerialize(byte[] bytes)
return redisTemplate.getStringSerializer().deserialize(bytes);
/**
* 根据key设置元素(String)的过期时间
*/
public Boolean pExpire(final String key, final Long millis)
return redisTemplate.execute((RedisCallback<Boolean>) connection ->
if (millis > 0)
return connection.pExpire(doSerialize(key), millis);
return false;
);
/**
* 增加key value,并设置存活时间(秒),当存活时间为0时,会被删除
* 如果key已经有值,则新值覆盖旧值
*/
public void set(final String key, final String value)
set(key, value, 0L);
public void set(final String key, final String value, final Long liveTime)
redisTemplate.execute((RedisCallback<Long>) connection ->
connection.set(doSerialize(key), doSerialize(value));
if (liveTime > 0)
connection.expire(doSerialize(key), liveTime);
return 1L;
);
/**
* 根据key获取元素(String)
*/
public String get(final String key)
return redisTemplate.execute((RedisCallback<String>) connection ->
String getStr = null;
byte[] value = connection.get(doSerialize(key));
if (value != null)
getStr = doDeSerialize(value);
return getStr;
);
/**
* 在指定的 key 不存在时,为 key 设置指定的值
*/
public Boolean setnx(final String key, final String obj)
return redisTemplate.execute(
(RedisCallback<Boolean>) connection -> connection.setNX(doSerialize(key), doSerialize(obj)));
/**
* 删除指定key对应的元素,key不存在时会被忽略
* 返回被删除key的数量
*/
public Long del(final String key)
return redisTemplate.execute((RedisCallback<Long>) connection -> connection.del(doSerialize(key)));
/**
* 压栈,并设置超时时间,单位 秒
*/
public void lPush(String key, byte[] bytes, long liveTime)
redisTemplate.execute((RedisCallback<Integer>) connection ->
connection.lPush(doSerialize(key), bytes);
connection.expire(doSerialize(key), liveTime);
return 0;
);
/**
* 根据key返回对应的list
*/
public List<byte[]> lRange(String key)
return redisTemplate.execute(
(RedisCallback<List<byte[]>>) connection -> connection.lRange(doSerialize(key), 0, -1));
/**
* 向队列尾部添加数据
*/
public void rPush(String key, String value)
redisTemplate.execute((RedisCallback<Integer>) connection ->
connection.rPush(doSerialize(key), doSerialize(value));
return 0;
);
/**
* 从队列首部删除并获取一条数据
*/
public String lPop(String key)
return redisTemplate.execute((RedisCallback<String>) connection ->
byte[] value = connection.lPop(doSerialize(key));
if (value != null)
return doDeSerialize(value);
return null;
);
/**
* 修剪(删除)key所对应的0 ~ length之间的元素
* example:
* key : 1
* key : 2
* key : 3
* lTrim("key",2) 即只保留前三个元素,第1~3个元素将被保留,其余被删除,0代表第一个元素
* length 为-1,标识最后一个元素,-2为倒数第二个元素,以此类推
*/
public void lTrim(String key, long length)
redisTemplate.execute((RedisCallback<Integer>) connection ->
connection.lTrim(doSerialize(key), 0, length);
return 0;
);
public void lRem(String key, Object value)
lRem(key, 0, value);
public void lRem(String key, int count, Object value)
redisTemplate.execute((RedisCallback<Integer>) connection ->
connection.lRem(doSerialize(key), count, ProtostuffSerialize.serialize(value));
return 0;
);
public Object getObjectByProtostuff(final String key, Class<?> cls)
return redisTemplate.execute((RedisCallback<Object>) connection ->
byte[] value = connection.get(doSerialize(key));
if (value != null)
return ProtostuffSerialize.deserialize(value, cls);
return null;
);
public Boolean setObjectByProtostuff(final String key, final Object obj, final Long liveTime)
return redisTemplate.execute((RedisCallback<Boolean>) connection ->
Boolean flag = connection.set(doSerialize(key), ProtostuffSerialize.serialize(obj));
if (liveTime > 0)
connection.expire(doSerialize(key), liveTime);
return flag;
);
public Boolean setObjectByProtostuffExpireAt(final String key, final Object obj, final Long expireAt)
return redisTemplate.execute((RedisCallback<Boolean>) connection ->
Boolean flag = connection.set(doSerialize(key), ProtostuffSerialize.serialize(obj));
connection.expireAt(doSerialize(key), expireAt);
return flag;
);
public Boolean setNx(final String key, final Object object, final Long liveTime)
return redisTemplate.execute((RedisCallback<Boolean>) connection ->
boolean flag = connection.setNX(doSerialize(key), ProtostuffSerialize.serialize(object));
if (flag && liveTime > 0)
connection.expire(doSerialize(key), liveTime);
return flag;
);
public Long ttl(final String key)
return redisTemplate.execute((RedisCallback<Long>) connection -> connection.ttl(doSerialize(key)));
/**
* 分布式锁专用获取锁
*
* @param key 锁关键字
* @param uniqueId 锁唯一标识
* @param liveTime 过期时间
* @return
*/
public Boolean setNxDistributedLock(final String key, final String uniqueId, final Long liveTime)
return redisTemplate.execute(
(RedisCallback<Boolean>) connection -> connection.set(doSerialize(key), doSerialize(uniqueId),
Expiration.milliseconds(liveTime), RedisStringCommands.SetOption.ifAbsent()));
/**
* Set集合添加元素
*
* @param key
* @param value
* @return
*/
public Long sAdd(final String key, final String value)
return redisTemplate.execute((RedisCallback<Long>) connection ->
return connection.sAdd(doSerialize(key), doSerialize(value));
);
/**
* Set集合添加多个元素
*
* @param key
* @param values
* @return
*/
public Long sAdds(final String key, final String[] values)
return redisTemplate.execute((RedisCallback<Long>) connection ->
long success = 0;
for (int i = 0; i < values.length; i++)
success += connection.sAdd(doSerialize(key), doSerialize(values[i]));
return success;
);
/**
* Set集合取出1个元素并删除
*
* @param key
* @return
*/
public String sPop(final String key)
return redisTemplate.execute((RedisCallback<String>) connection ->
byte[] value = connection.sPop(doSerialize(key));
if (value != null)
return doDeSerialize(value);
return null;
);
/**
* Set集合取出count个元素并删除
*
* @param key
* @param count
* @return
*/
public List<byte[]> sPops(final String key, final int count)
return redisTemplate.execute((RedisCallback<List<byte[]>>) connection ->
return connection.sPop(doSerialize(key), count);
);
二、zk分布式锁
zk 分布式锁,其实可以做的比较简单,就是某个节点尝试创建临时 znode,此时创建成功了就获取了这个锁;这个时候别的客户端来创建锁会失败,只能注册个监听器监听这个锁。释放锁就是删除这个 znode,一旦释放掉就会通知客户端,然后有一个等待着的客户端就可以再次重新加锁。如果有一把锁,被多个人给竞争,此时多个人会排队,第一个拿到锁的人会执行,然后释放锁;后面的每个人都会去监听排在自己前面的那个人创建的 node 上,一旦某个人释放了锁,排在自己后面的人就会被 zookeeper 给通知,一旦被通知了之后,就 ok 了,自己就获取到了锁,就可以执行代码了。
public class ZooKeeperDistributedLock implements Watcher
private ZooKeeper zk;
private String locksRoot = "/locks";
private String productId;
private String waitNode;
private String lockNode;
private CountDownLatch latch;
private CountDownLatch connectedLatch = new CountDownLatch(1);
private int sessionTimeout = 30000;
public ZooKeeperDistributedLock(String productId)
this.productId = productId;
try
String address = "192.168.31.187:2181,192.168.31.19:2181,192.168.31.227:2181";
zk = new ZooKeeper(address, sessionTimeout, this);
connectedLatch.await();
catch (IOException e)
throw new LockException(e);
catch (KeeperException e)
throw new LockException(e);
catch (InterruptedException e)
throw new LockException(e);
public void process(WatchedEvent event)
if (event.getState() == KeeperState.SyncConnected)
connectedLatch.countDown();
return;
if (this.latch != null)
this.latch.countDown();
public void acquireDistributedLock()
try
if (this.tryLock())
return;
else
waitForLock(waitNode, sessionTimeout);
catch (KeeperException e)
throw new LockException(e);
catch (InterruptedException e)
throw new LockException(e);
public boolean tryLock()
try
// 传入进去的locksRoot + “/” + productId
// 假设productId代表了一个商品id,比如说1
// locksRoot = locks
// /locks/10000000000,/locks/10000000001,/locks/10000000002
lockNode = zk.create(locksRoot + "/" + productId, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
// 看看刚创建的节点是不是最小的节点
// locks:10000000000,10000000001,10000000002
List<String> locks = zk.getChildren(locksRoot, false);
Collections.sort(locks);
if(lockNode.equals(locksRoot+"/"+ locks.get(0)))
//如果是最小的节点,则表示取得锁
return true;
//如果不是最小的节点,找到比自己小1的节点
int previousLockIndex = -1;
for(int i = 0; i < locks.size(); i++)
if(lockNode.equals(locksRoot + “/” + locks.get(i)))
previousLockIndex = i - 1;
break;
this.waitNode = locks.get(previousLockIndex);
catch (KeeperException e)
throw new LockException(e);
catch (InterruptedException e)
throw new LockException(e);
return false;
private boolean waitForLock(String waitNode, long waitTime) throws InterruptedException, KeeperException
Stat stat = zk.exists(locksRoot + "/" + waitNode, true);
if (stat != null)
this.latch = new CountDownLatch(1);
this.latch.await(waitTime, TimeUnit.MILLISECONDS);
this.latch = null;
return true;
public void unlock()
try
// 删除/locks/10000000000节点
// 删除/locks/10000000001节点
System.out.println("unlock " + lockNode);
zk.delete(lockNode, -1);
lockNode = null;
zk.close();
catch (InterruptedException e)
e.printStackTrace();
catch (KeeperException e)
e.printStackTrace();
public class LockException extends RuntimeException
private static final long serialVersionUID = 1L;
public LockException(String e)
super(e);
public LockException(Exception e)
super(e);
三、二者比较(redis 分布式锁和 zk 分布式锁的对比)
redis 分布式锁,其实需要自己不断去尝试获取锁,比较消耗性能。
zk 分布式锁,获取不到锁,注册个监听器即可,不需要不断主动尝试获取锁,性能开销较小。
另外一点就是,如果是 redis 获取锁的那个客户端 出现 bug 挂了,那么只能等待超时时间之后才能释放锁;而 zk 的话,因为创建的是临时 znode,只要客户端挂了,znode 就没了,此时就自动释放锁。
redis 分布式锁大家没发现好麻烦吗?遍历上锁,计算时间等等zk 的分布式锁语义清晰实现简单。
所以先不分析太多的东西,就说这两点,我个人实践认为 zk 的分布式锁比 redis 的分布式锁牢靠、而且模型简单易用。
以上是关于分布式锁的实现方式的主要内容,如果未能解决你的问题,请参考以下文章