分布式锁实现
Posted tarolord
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了分布式锁实现相关的知识,希望对你有一定的参考价值。
为什么要用分布式锁?
分析下此场景:
- 某个小型电商网站,有个生成唯一订单号的服务。
- 如果同时多个用户下单,如何保障订单号的唯一性?
是的,加个锁可以解决。示范代码如下:
public class OrderServiceImplWithLock implements OrderService { private OrderCodeGenerator ocg = new OrderCodeGenerator(); private Lock lock = new ReentrantLock(); // 创建订单接口 @Override public void createOrder() { String orderCode = null; try { lock.lock(); // 获取订单号 orderCode = ocg.getOrderCode(); } finally { lock.unlock(); } // ……业务代码,此处省略100行代码 } }
- 随着业务增长,用户量增加,单台服务器已经无法满足了~~那么只能集群解决了!做了集群后,订单生成也改进为共享服务:
在各tomcat中使用了各自的锁(jvm的锁),但不能保障订单号唯一。
- 此时就需要用到分布式锁(用途:在分布式环境下协同共享资源的使用)
分布式锁有哪些实现方式?
- 分布式锁大致有如下3种常用实现方式:
- 基于zookeeper实现,注意:zookeeper有个特效是同父级下的节点不重名。
使用的临时节点,其特性是:当客户端与zookeeper断开连接后,临时节点会自动删除(那么锁也就释放了)。示范代码实现如下:
public class ZKDistributeLock implements Lock { private String lockPath; private ZkClient client; public ZKDistributeLock(String lockPath) { super(); this.lockPath = lockPath; client = new ZkClient("localhost:2181"); client.setZkSerializer(new MyZkSerializer()); } @Override public boolean tryLock() {
// 不会阻塞 // 创建节点 try { client.createEphemeral(lockPath); } catch (ZkNodeExistsException e) { return false; } return true; } @Override public void unlock() { client.delete(lockPath); }
@Override public void lock() { // 如果获取不到锁,阻塞等待 if (!tryLock()) { // 没获得锁,阻塞自己 waitForLock(); // 再次尝试 lock(); } } private void waitForLock() { CountDownLatch cdl = new CountDownLatch(1); IZkDataListener listener = new IZkDataListener() { @Override public void handleDataDeleted(String dataPath) throws Exception { System.out.println("----收到节点被删除了-------------"); cdl.countDown(); } @Override public void handleDataChange(String dataPath, Object data) throws Exception { } }; client.subscribeDataChanges(lockPath, listener); // 阻塞自己 if (this.client.exists(lockPath)) { try { cdl.await(); } catch (InterruptedException e) { e.printStackTrace(); } } // 取消注册 client.unsubscribeDataChanges(lockPath, listener); } //省略部分不重要代码 }
此时出现一个问题:zookeeper的watch机制会实时通知用户,那么假如有1000个用户同事竞争锁,只有1个用户获得了锁,业务处理完后释放了锁,剩余的999个用户会同时收到watch通知...从而引起一个惊群效应!!!
- 惊群效应在较大规模的环境中带来如下危害:
- 那么采用临时顺序节点:
使用零时顺序节点,示例代码如下:
public class ZKDistributeImproveLock implements Lock { /* * 利用临时顺序节点来实现分布式锁 * 获取锁:取排队号(创建自己的临时顺序节点),然后判断自己是否是最小号,如是,则获得锁;不是,则注册前一节点的watcher,阻塞等待 * 释放锁:删除自己创建的临时顺序节点 */ private String LockPath; private ZkClient client; private String currentPath; private String beforePath; public ZKDistributeImproveLock(String lockPath) { super(); LockPath = lockPath; client = new ZkClient("localhost:2181"); client.setZkSerializer(new MyZkSerializer()); if (!this.client.exists(LockPath)) { try { this.client.createPersistent(LockPath); } catch (ZkNodeExistsException e) { } } } @Override public boolean tryLock() { if (this.currentPath == null) { currentPath = this.client.createEphemeralSequential(LockPath + "/", "aaa"); } // 获得所有的子 List<String> children = this.client.getChildren(LockPath); // 排序list Collections.sort(children); // 判断当前节点是否是最小的 if (currentPath.equals(LockPath + "/" + children.get(0))) { return true; } else { // 取到前一个 // 得到字节的索引号 int curIndex = children.indexOf(currentPath.substring(LockPath.length() + 1)); beforePath = LockPath + "/" + children.get(curIndex - 1); } return false; } @Override public void lock() { if (!tryLock()) { // 阻塞等待 waitForLock(); // 再次尝试加锁 lock(); } } private void waitForLock() { CountDownLatch cdl = new CountDownLatch(1); // 注册watcher IZkDataListener listener = new IZkDataListener() { @Override public void handleDataDeleted(String dataPath) throws Exception { System.out.println("-----监听到节点被删除"); cdl.countDown(); } @Override public void handleDataChange(String dataPath, Object data) throws Exception { } }; client.subscribeDataChanges(this.beforePath, listener); // 怎么让自己阻塞 if (this.client.exists(this.beforePath)) { try { cdl.await(); } catch (InterruptedException e) { e.printStackTrace(); } } // 醒来后,取消watcher client.unsubscribeDataChanges(this.beforePath, listener); } @Override public void unlock() { // 删除节点 this.client.delete(this.currentPath); } //省略不重要代码 }
- 基于Redis实现
示例代码如下:
public class RedisLockImpl implements Lock { StringRedisTemplate stringRedisTemplate; // redis工具类 String resourceName = null; int timeout = 10; ThreadLocal<String> lockRandomValue = new ThreadLocal<>(); /** * 构建一把锁 * * @param resourceName 资源唯一标识 * @param timeout 资源锁定超时时间~防止资源死锁,单位秒 */ public RedisLockImpl(String resourceName, int timeout, StringRedisTemplate stringRedisTemplate) { this.resourceName = "lock_" + resourceName; this.timeout = timeout; this.stringRedisTemplate = stringRedisTemplate; } @Override public boolean tryLock() { Boolean lockResult = stringRedisTemplate.execute(new RedisCallback<Boolean>() { @Override public Boolean doInRedis(RedisConnection connection) throws DataAccessException { // 随机设置一个值 lockRandomValue.set(UUID.randomUUID().toString()); Boolean status = connection.set(resourceName.getBytes(), lockRandomValue.get().getBytes(), Expiration.seconds(timeout), RedisStringCommands.SetOption.SET_IF_ABSENT); return status; } }); return lockResult; } Lock lock = new ReentrantLock(); // 多台机器的情况下,会出现大量的等待,加重redis的压力。 在lock方法上,加入同步关键字。单机同步,多机用redis @Override public void lock() { lock.lock(); try { while (!tryLock()) { // 监听,锁删除的通知 stringRedisTemplate.execute(new RedisCallback<Boolean>() { @Override public Boolean doInRedis(RedisConnection connection) throws DataAccessException { try { CountDownLatch waiter = new CountDownLatch(1); // 等待通知结果 connection.subscribe((message, pattern) -> { // 收到通知,不管结果,立刻再次抢锁 waiter.countDown(); }, (resourceName + "_unlock_channel").getBytes()); // 等待一段时间,超过这个时间都没收到消息,肯定有问题 waiter.await(timeout, TimeUnit.SECONDS); } catch (InterruptedException e) { e.printStackTrace(); } return true; //随便返回一个值都没问题 } }); } } finally { lock.unlock(); } } @Override public void unlock() { // 释放锁。(底层框架开发者要防止被API的调用者误调用) // 错误示范 stringRedisTemplate.delete(resourceName); // 1、 要比对内部的值,同一个线程,才能够去释放锁。 2、 同时发出通知 if (lockRandomValue.get().equals(stringRedisTemplate.opsForValue().get(resourceName))) { stringRedisTemplate.delete(resourceName); // 删除资源 stringRedisTemplate.execute(new RedisCallback<Long>() { @Override public Long doInRedis(RedisConnection connection) throws DataAccessException { // 发送通知 Long received = connection.publish((resourceName + "_unlock_channel").getBytes(), "".getBytes()); return received; } }); } } //省略不重要代码 }
总的来说:
以上是关于分布式锁实现的主要内容,如果未能解决你的问题,请参考以下文章