JAVA分布式锁介绍
Posted tuacy
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了JAVA分布式锁介绍相关的知识,希望对你有一定的参考价值。
上一篇咱们讲到同一个进程,不同的线程之间我们可以通过synchronized、ReentrantLock、ReadWriteLock、Semaphore、CountDownLatch这些来实现锁机制。现在情况不一样了,咱们的程序高大上了,咱们可以部署多个服务端了,上了分布式系统了。在这个时候锁就要上升一个档次了,现在就叫分布式锁了。
分布式锁就是在分布式系统中(多服务端,多进程)中保证数据的最终唯一性。当在分布式模型下,数据只有一份(或有限制),此时需要利用锁的技术控制某一时刻修改数据的进程数。现在市面上常见的有三种实现分布式锁的方案:基于数据库做分布式锁、基于redis做分布式锁、基于zookeeper做分布式锁。
一 基于数据库做分布式锁
1.1 基于主键的唯一性做分布式锁(悲观锁)
既然是通过数据库来做分布式锁,那咱们的先建一张表先,稍后我们在做解释。
unionkeylock表(resource_name唯一索引)
CREATE TABLE `unionkeylock` (
`id` int(11) unsigned NOT NULL AUTO_INCREMENT,
`resource_name` varchar(128) NOT NULL DEFAULT '' COMMENT '锁定的资源',
`node_info` varchar(128) DEFAULT NULL COMMENT '机器信息',
`count` int(11) NOT NULL DEFAULT '0' COMMENT '锁的次数,统计可重入锁',
`des` varchar(128) DEFAULT null comment '额外的描述信息',
`update_time` timestamp NULL DEFAULT NULL COMMENT '更新时间',
`create_time` timestamp null default null comment '创建时间',
primary key (`id`),
unique key (`resource_name`)
) engine=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='数据库分布式锁表-主键唯一';;
unionkeylock表,我们对resource_name做了唯一性约束,如果有多个请求同时提交到数据库的话,数据库会保证只有一个操作可以成功,在这种情况下我们认为操作成功的那个线程获得了该方法的锁。
我们干脆一点,直接上代码。
public class DbDistributedUnionKeyLock extends AbstractDbDistributedLock
/**
* 因为数据库里面限制了最大长度了
*/
private static final int NODE_INFO_MAX_LENGTH = 120;
/**
* 计算机唯一标识 为可重入锁做准备(一定要确保每个机器的值不同)
*/
private static final String COMPUTER_UUID = ComputerIdentifierUtil.getComputerIdentifier();
/**
* 线程变量
*/
private ThreadLocal<String> threadFlag = new ThreadLocal<>();
/**
* 操作数据库的dao
*/
private IUnionKeyLockDao unionKeyLockDao;
public DbDistributedUnionKeyLock(IUnionKeyLockDao unionKeyLockDao)
this.unionKeyLockDao = unionKeyLockDao;
/**
* 加锁
*/
@Override
public boolean lock(String key, int retryTimes, long sleepMillis)
boolean lockSuccess = false;
// 机器码+线程uuid -- 唯一标识(保证同一台电脑的同一个线程是一样的)
if (threadFlag.get() == null || threadFlag.get().isEmpty())
String nodeTemp = COMPUTER_UUID + "#" + String.format("%08x", UUID.randomUUID().hashCode()) + "#" + Thread.currentThread().getId();
if (nodeTemp.length() > NODE_INFO_MAX_LENGTH)
nodeTemp = nodeTemp.substring(0, NODE_INFO_MAX_LENGTH);
threadFlag.set(nodeTemp);
int retry = 0;
while (!lockSuccess && retry < retryTimes)
try
UnionKeyLock lockInfo = unionKeyLockDao.getLockInfoByResourceName(key);
if (lockInfo == null)
// 当前资源没有被加锁
lockSuccess = unionKeyLockDao.insertLockInfo(key, threadFlag.get());
else
// 当前资源已经被加锁,这个时候需要考虑是否可重入
// 可重入锁
if (lockInfo.getNodeInfo() != null && lockInfo.getNodeInfo().equals(threadFlag.get()))
// 重入
lockSuccess = unionKeyLockDao.reentrantLock(lockInfo.getResourceName(), threadFlag.get(), lockInfo.getCount());
else
// 如果出现这种情况代表数据已经有问题了
lockSuccess = false;
if (!lockSuccess)
// 等待200毫秒
Uninterruptibles.sleepUninterruptibly(sleepMillis, TimeUnit.MILLISECONDS);
catch (Exception e)
// 等待指定时间
Uninterruptibles.sleepUninterruptibly(sleepMillis, TimeUnit.MILLISECONDS);
finally
retry++;
return lockSuccess;
/**
* 释放
*/
@Override
public void unlock(String key)
if (threadFlag.get() == null || threadFlag.get().isEmpty())
return;
boolean unlockSuccess = false;
while (!unlockSuccess)
try
UnionKeyLock lockInfo = unionKeyLockDao.getLockInfoByResourceName(key);
if (lockInfo == null)
return;
if (lockInfo.getNodeInfo() == null)
unionKeyLockDao.deleteLockInfo(lockInfo.getResourceName(), lockInfo.getNodeInfo());
return;
if (!lockInfo.getNodeInfo().equals(threadFlag.get()))
return;
// 可重入锁
if (lockInfo.getCount() == 1)
unionKeyLockDao.deleteLockInfo(lockInfo.getResourceName(), threadFlag.get());
unlockSuccess = true;
else
if (lockInfo.getNodeInfo() != null && lockInfo.getNodeInfo().equals(threadFlag.get()))
// 重入
unlockSuccess = unionKeyLockDao.reentrantUnLock(lockInfo.getResourceName(), threadFlag.get(), lockInfo.getCount());
else
// 如果出现这种情况,代表这时候数据已经有问题了
unionKeyLockDao.deleteLockInfo(lockInfo.getResourceName(), lockInfo.getNodeInfo());
unlockSuccess = true;
if (!unlockSuccess)
// 等待200毫秒
Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
catch (Exception e)
// e.printStackTrace();
// 等待200毫秒
Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
因为悲观锁大多数情况下依靠数据库的锁机制实现,以保证操作最大程度的独占性。如果加锁的时间过长,其他用户长时间无法访问,影响了程序的并发访问性,同时这样对数据库性能开销影响也很大,特别是对长事务而言,这样的开销往往无法承受。所以与悲观锁相对的,我们有了乐观锁。
1.2 基于版本号字段做分布式锁(乐观锁)
基于表字段版本号做分布式锁(乐观锁):很乐观,每次去拿数据的时候都认为别人不会修改,所以不会上锁,当读取数据时,将version字段的值一同读出,数据每更新一次,对此version值加一。当我们提交更新的时候,需要数据库表对应记录的当前版本信息与第一次取出来的version值进行比对,如果数据库表当前版本号与第一次取出来的version值相等,则予以更新,否则重试。乐观锁适用于读多写少的应用场景,这样可以提高吞吐量。
通过版本号做分布式锁我们不太好去抽象出lock()、unlock()这样的函数了。得把他们写到业务逻辑里面去。下面我们以一个简单的例子来说明。比如有这么个场景:我们模拟一个存钱的场景,我们可以在各个地方存钱(分布式)。我们创建一个表,字段value就表示我们的余额。
CREATE TABLE `optimisticlock` (
`id` int(11) unsigned NOT NULL AUTO_INCREMENT,
`resource_name` varchar(128) NOT NULL DEFAULT '' COMMENT '锁定的资源',
`value` int NOT NULL COMMENT '锁定的资源对应的值',
`version` int NOT NULL COMMENT '版本信息',
PRIMARY KEY (`id`),
UNIQUE KEY `uiq_idx_resource` (`resource_name`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='数据库分布式锁表-乐观锁';
直接上代码。(可以在https://github.com/tuacy/java-study里面找到更加完整的代码,注意哦代码在distributedlock模块下面哦)
public class SqlOptimisticLock
/**
* 因为数据库里面限制了最大长度了
*/
private static final int LOCK_LOOP_TIME_WAIT = 200;
/**
* lock对应的资源名字
*/
private final String resourceName;
/**
* 操作数据库的dao
*/
private IOptimisticLockDao optimisticLockDao;
/**
* 用于注入无法自动装配的Bean,处理@Autowired无效的问题
*/
private void inject()
if (null == this.optimisticLockDao)
this.optimisticLockDao = ResourceApplicationContext.getApplicationContext().getBean(OptimisticLockDaoImpl.class);
/**
* 构造函数
*
* @param resourceName lock对应资源名字
*/
public SqlOptimisticLock(String resourceName)
this.resourceName = resourceName;
inject();
/**
* 我们简单的模拟一个存钱的操作
*
* @param money 存入金额
*/
public void depositMoney(int money)
boolean success = false;
while (!success)
try
// 第一步:版本号信息很重要,从表里面取出数据(包括版本号)
OptimisticLock dbItemInfo = optimisticLockDao.selectLockResourceInfo(resourceName);
// 第二步:todo:做相应的逻辑处理
Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS);
// 第三步:把数据存到数据库里面去
if (dbItemInfo == null)
success = optimisticLockDao.insertLockResourceValue(resourceName, money);
else
// 更新的时候会去做下版本的判断,相同才更新,不相同不更新
success = optimisticLockDao.updateLockResourceValue(resourceName, dbItemInfo.getVersion(), dbItemInfo.getValue() + money);
if (!success)
Uninterruptibles.sleepUninterruptibly(LOCK_LOOP_TIME_WAIT, TimeUnit.MILLISECONDS);
catch (Exception e)
// e.printStackTrace();
success = false;
Uninterruptibles.sleepUninterruptibly(LOCK_LOOP_TIME_WAIT, TimeUnit.MILLISECONDS);
1.3 基于数据库排他锁做分布式锁(悲观锁)
借助数据中自带的排他锁来实现分布式的锁。在查询语句后面增加for update,数据库会在查询过程中给数据库表增加排他锁。当某条记录被加上排他锁之后,其他线程无法再在该行记录上增加排他锁。特别不建议用这种方式来实现,对数据库影响太大了。
正对这种情况,我们也写一个简单的代码,还是先创建表,建表语句如下。
CREATE TABLE `exclusivelock` (
`id` int(11) unsigned NOT NULL AUTO_INCREMENT,
`resource_name` varchar(128) NOT NULL DEFAULT '' COMMENT '锁定的资源',
PRIMARY KEY (`id`),
UNIQUE KEY `uiq_idx_resource` (`resource_name`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='数据库分布式锁表-排他锁';
加锁,释放锁代码实现。(可以在https://github.com/tuacy/java-study里面找到更加完整的代码,注意哦代码在distributedlock模块下面哦)
public class SqlExclusiveLock implements IDistributedLock
/**
* 因为数据库里面限制了最大长度了
*/
private static final int LOCK_LOOP_TIME_WAIT = 200;
private JdbcTemplate jdbcTemplate;
/**
* lock对应的资源名字
*/
private final String resourceName;
private Connection sqlConnection;
/**
* 用于注入无法自动装配的Bean,处理@Autowired无效的问题
*/
private void inject()
if (null == this.jdbcTemplate)
this.jdbcTemplate = ResourceApplicationContext.getApplicationContext().getBean(JdbcTemplate.class);
/**
* 构造函数
*
* @param resourceName lock对应资源名字
*/
public SqlExclusiveLock(String resourceName)
this.resourceName = resourceName;
inject();
@Override
public void lock()
if (jdbcTemplate.getDataSource() == null)
throw new IllegalArgumentException("数据库配置失败!");
boolean success = false;
while (!success)
PreparedStatement preparedStatement = null;
try
sqlConnection = jdbcTemplate.getDataSource().getConnection();
sqlConnection.setAutoCommit(false);//设置手动提交
String prepareSql = "select * from exclusivelock where resource_name = ? for update";
preparedStatement = sqlConnection.prepareStatement(prepareSql);
preparedStatement.setString(1, this.resourceName);
success = preparedStatement.executeQuery() != null;
if (!success)
Uninterruptibles.sleepUninterruptibly(LOCK_LOOP_TIME_WAIT, TimeUnit.MILLISECONDS);
catch (Exception e)
e.printStackTrace();
success = false;
Uninterruptibles.sleepUninterruptibly(LOCK_LOOP_TIME_WAIT, TimeUnit.MILLISECONDS);
finally
if (preparedStatement != null)
try
preparedStatement.close();
catch (SQLException e)
e.printStackTrace();
@Override
public void unlock()
if (sqlConnection != null)
try
sqlConnection.commit();
catch (Exception e)
// ignore
e.printStackTrace();
@Override
public boolean tryLock()
return false;
@Override
public boolean tryLock(long time, TimeUnit unit)
return false;
二 基于Redis做分布式锁
基于Redis做分布式锁。Redis里面的记录可以设置为当记录不存在的时候才可以插入进入。记录存在则插入不进去。Redis分布式锁就是用这个做的。当获取锁的时候则在Redis里面插入一条这样的记录。释放锁的时候就删除这条记录。如果锁没有释放(记录没有被删除)其他的让你再去获取锁的时候(插入记录)是不会成功的。而且为了防止僵尸我们可以给这条记录设置过期时间。
讲什么都没带来来的实在点。我们就直接贴代码了(可以在https://github.com/tuacy/java-study里面找到更加完整的代码,注意哦代码在distributedlock模块下面哦)
public class RedisDistributedLockImpl extends AbstractDistributedLock
private final Logger logger = LoggerFactory.getLogger(RedisDistributedLockImpl.class);
private RedisTemplate<Object, Object> redisTemplate;
private ThreadLocal<String> lockFlag = new ThreadLocal<>();
/**
* Lua
*/
private static final String UNLOCK_LUA;
static
StringBuilder sb = new StringBuilder();
sb.append("if redis.call(\\"get\\",KEYS[1]) == ARGV[1] ");
sb.append("then ");
sb.append(" return redis.call(\\"del\\",KEYS[1]) ");
sb.append("else ");
sb.append(" return 0 ");
sb.append("end ");
UNLOCK_LUA = sb.toString();
/**
* 构造函数
*/
public RedisDistributedLockImpl(RedisTemplate<Object, Object> redisTemplate)
super();
this.redisTemplate = redisTemplate;
/**
* 加锁
*/
@Override
public boolean lock(String key, long expire, int retryTimes, long sleepMillis)
boolean result = setRedis(key, expire);
// 如果获取锁失败,按照传入的重试次数进行重试
while ((!result) && retryTimes-- > 0)
try
logger.debug("lock failed, retrying..." + retryTimes);
Thread.sleep(sleepMillis);
catch (InterruptedException e)
return false;
result = setRedis(key, expire);
return result;
private boolean setRedis(String key, long expire)
try
String result = redisTemplate.execute((RedisCallback<String>) connection ->
JedisCommands commands = (JedisCommands) connection.getNativeConnection();
// value生成
String uuid = UUID.randomUUID().toString();
lockFlag.set(uuid);
/**
* 存储数据到缓存中,并指定过期时间和当Key存在时是否覆盖。
*
* @param key 键
* @param key 键值
* @param nxxx
* nxxx的值只能取NX或者XX,如果是NX的时候,则只有当key不存在是才进行set,如果是XX,则只有当key已经存在时才进行set
*
* @param expx expx的值只能取EX或者PX,代表数据过期时间的单位,EX代表秒,PX代表毫秒。
* @param time 过期时间,单位是expx所代表的单位。
* @return 成功返回“ok”,失败则返回 null。
*/
return commands.set(key, uuid, "NX", "PX", expire);
);
return !StringUtils.isEmpty(result);
catch (Exception e)
logger.error("set redis occured an exception", e);
return false;
/**
* 释放锁
*/
@Override
public boolean unlock(String key)
// 释放锁的时候,有可能因为持锁之后方法执行时间大于锁的有效期,此时有可能已经被另外一个线程持有锁,所以不能直接删除
try
List<String> keys = new ArrayList<>();
keys.add(key);
List<String> args = new ArrayList<>();
args.add(lockFlag.get());
// 使用lua脚本删除redis中匹配value的key,可以避免由于方法执行时间过长而redis锁自动过期失效的时候误删其他线程的锁
// spring自带的执行脚本方法中,集群模式直接抛出不支持执行脚本的异常,所以只能拿到原redis的connection来执行脚本
Long result = redisTemplate.execute((RedisCallback<Long>) connection ->
Object nativeConnection = connection.getNativeConnection();
// 集群模式和单机模式虽然执行脚本的方法一样,但是没有共同的接口,所以只能分开执行
// 集群模式
if (nativeConnection instanceof JedisCluster)
// Redis Eval 命令使用 Lua 解释器执行脚本
return (Long) ((JedisCluster) nativeConnection).eval(UNLOCK_LUA, keys, args);
// 单机模式
else if (nativeConnection instanceof Jedis)
// Redis Eval 命令使用 Lua 解释器执行脚本
return (Long) ((Jedis) nativeConnection).eval(UNLOCK_LUA, keys, args);
return 0L;
);
return result != null && result > 0;
catch (Exception e)
logger.error("release lock occured an exception", e);
return false;
三 基于ZooKeeper做分布式锁
Zookeeper实现分布式锁的流程,假设锁空间的根节点为/zklock:
- 客户端连接zookeeper,并在/zklock下创建临时的且有序的子节点。
第一个客户端对应的子节点为:/zklock/test_lock_0000000000,第二个为:/zklock/test_lock_0000000001。以此类推。 - 客户端获取/zklock下的子节点列表,判断自己创建的子节点是否为当前子节点列表中序号最小的子节点,如果是则认为获得锁,否则监听/zklock的子节点变更消息,获得子节点变更通知后重复此步骤直至获得锁;
- 执行业务代码。
- 完成业务流程后,删除对应的子节点并释放锁。
这里我不准备用zookeeper的原生api来实现这一套流程来实现分布式锁。因为我找到更简单的方式,这里我准备用Curator(Curator是Netflix公司开源的一套zookeeper客户端框架)来实现分布式锁。因为Curator已经帮我们实现了分布式锁InterProcessMutex的实现。关于Curator的使用大家有兴趣可以参考下ZooKeeper客户端Curator使用
直接代码实现,(可以在https://github.com/tuacy/java-study里面找到更加完整的代码,注意哦代码在distributedlock模块下面哦)
public class ZookeeperDistributedLockImpl implements IZookeeperDistributedLock
/**
* curator给封住好的一个分布式锁对象和ReentrantLock类似
*/
private CuratorFramework curatorFramework;
public ZookeeperDistributedLockImpl(ZkClient zkClient)
curatorFramework = zkClient.getClient();
@Override
public boolean lock(String key)
try
InterProcessMutex interProcessMutex = new InterProcessMutex(curatorFramework, key);
interProcessMutex.acquire();
return true;
catch (Exception e)
e.printStackTrace();
return false;
@Override
public boolean lock(String key, long time, TimeUnit unit)
try
InterProcessMutex interProcessMutex = new InterProcessMutex(curatorFramework, key);
interProcessMutex.acquire(time, unit);
return true;
catch (Exception e)
e.printStackTrace();
return false;
@Override
public void unlock(String key)
try
InterProcessMutex interProcessMutex = new InterProcessMutex(curatorFramework, key);
interProcessMutex.release();
catch (Exception e)
e.printStackTrace();
// ignore
分布式锁相关的内容就介绍到这里,每种分布式锁的时候我们都写了对应的实现代码。大家可以在https://github.com/tuacy/java-study里面找到更加完整的代码,在distributedlock模块下面。而且每种锁的实现都用想用的AOP。
以上是关于JAVA分布式锁介绍的主要内容,如果未能解决你的问题,请参考以下文章