分布式锁的实现方式

Posted 后端开发者中心

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了分布式锁的实现方式相关的知识,希望对你有一定的参考价值。

分布式锁的实现方式

分布式锁常见的三种实现方式

  • 基于redis的分布式锁

  • 基于zookeeper的分布式锁

  • 数据库乐观锁

可靠性

为了确保分布式锁可用,我们要确保锁的实现满足以下四个条件:

  1. 互斥性。即在任意时刻,只有一个客户端能持有锁。

  2. 不会发生死锁。即使有一个客户端在持有锁期间崩溃而没有主动解锁,也能保证后续其他客户端能加锁。

  3. 解铃还需系铃人。加锁和解锁必须是同一个客户端。

  4. 具有容错性[集群环境]。只要大部分的redis节点正常运行,客户端就可以加锁和解锁。

以下介绍比较常用的前两种实现方式,基于缓存redis以及zookeeper实现分布式锁。

1.基于缓存redis实现

组件依赖

 
   
   
 
  1. <dependency>

  2.    <groupId>redis.clients</groupId>

  3.    <artifactId>jedis</artifactId>

  4.    <version>2.9.0</version>

  5. </dependency>

加锁代码

 
   
   
 
  1. public class RedisTool {

  2.    private static final String LOCK_SUCCESS = "OK";

  3.    private static final String SET_IF_NOT_EXIST = "NX";

  4.    private static final String SET_WITH_EXPIRE_TIME = "PX";

  5.    /**

  6.     * 尝试获取分布式锁

  7.     * @param jedis Redis客户端

  8.     * @param lockKey 锁

  9.     * @param requestId 请求标识

  10.     * @param expireTime 超期时间

  11.     * @return 是否获取成功

  12.     */

  13.    public static boolean tryGetDistributedLock(Jedis jedis, String lockKey, String requestId, int expireTime) {

  14.        String result = jedis.set(lockKey, requestId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime);

  15.        if (LOCK_SUCCESS.equals(result)) {

  16.            return true;

  17.        }

  18.        return false;

  19.    }

  20. }

加锁就一行代码:jedis.set(String key, String value, String nxxx, String expx, int time),这个set()方法一共有五个形参:

  • 第一个为key,我们使用key来当锁,因为key是唯一的。

  • 第二个为value,我们传的是requestId,通过给value赋值为requestId,我们就知道这把锁是哪个请求加的了,在解锁的时候就可以有依据。requestId可以使用UUID.randonUUID().toString()方法生成。

  • 第三个为nxxx,这个参数我们填的是NX,意思是SET IF NOT EXIST,即当key不存在时,进行set操作;若key已经存在,则不做任何操作。

  • 第四个为expx,这个参数我们传的是PX,意思是我们要给这个key加一个过期的设置,具体时间由第五个参数决定。

  • 第五个为time,与第四个参数呼应,代表key的过期时间。

执行上面的set()方法就只会导致两种结果:1.当前没有锁(key不存在),就进行加锁操作,并对锁设置有效期,同时value表示加锁的客户端。2.已有锁存在,不做任何操作。

我们加锁的代码满足可靠性里描述的三个条件:

  • set()加入了NX参数,保证如果已有key存在,则方法不会调用成功,即同一时间只有一个客户端能持有锁,满足互斥性。

  • 由于对锁设置了过期时间,即使锁的持有者后续发生崩溃而没有解锁,锁也会因为到了过期时间而自动解锁,不会发生死锁。

  • 将value赋值为requestId,代表加锁的客户端请求标识,那么在客户端解锁的时候就可以进行校验是否是同一个客户端。

  • 本方法只考虑redis单机部署的场景,所以容错性暂不考虑。可以尝试使用Redisson实现。

解锁代码

 
   
   
 
  1. public class RedisTool {

  2.    private static final Long RELEASE_SUCCESS = 1L;

  3.    /**

  4.     * 释放分布式锁

  5.     * @param jedis Redis客户端

  6.     * @param lockKey 锁

  7.     * @param requestId 请求标识

  8.     * @return 是否释放成功

  9.     */

  10.    public static boolean releaseDistributedLock(Jedis jedis, String lockKey, String requestId) {

  11.        String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";

  12.        Object result = jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(requestId));

  13.        if (RELEASE_SUCCESS.equals(result)) {

  14.            return true;

  15.        }

  16.        return false;

  17.    }

  18. }

第一行代码写了一个简单的Lua脚本代码。第二行将Lua代码传到jedis.eval()方法里,并使参数KEYS[1]赋值为lockKey,ARGV[1]赋值为requestId。eval()方法是将Lua代码交给redis服务端执行。 这段Lua代码的功能是,首先获取锁对应的value值,检查是否与requestId相等,如果相等则删除锁(解锁)。使用Lua语言实现可以保证上述操作是原子性的。执行eval()方法可以确保原子性。简单来说,就是在eval命令执行Lua代码的时候,Lua代码将被当成一个命令去执行,并且直到eval命令执行完成,redis才会执行其他命令。

2.基于zookeeper实现

这里利用zookeeper的EPHEMERAL_SEQUENTIAL(临时顺序)类型节点及watcher机制,来简单实现分布式锁。

主要思想:

  • 开启10个线程,在disLocks节点下各自创建名为sub的EPHEMERAL_SEQUENTIAL节点;

  • 获取disLocks节点下所有子节点,排序,如果自己的节点编号最小,则获取锁;

  • 否则watch排在自己前面的节点,监听到其删除后,进入第2步(重新检测排序是防止监听的节点发生连接失效,导致的节点删除情况);

  • 删除自身sub节点,释放连接。

tip:zookeeper的4种类型节点

  • PERSISTENT -- 持久节点,节点创建后会一直存在,不会因为客户端会话失效而删除;

  • PERSISTENT_SEQUENTIAL -- 持久顺序节点,zk会在其名字后自动追加一个单调增长的数字后缀,作为新的节点名;

  • EPHEMERAL -- 临时节点,客户端会话失效或连接关闭后,该节点会被自动删除,且不能在临时节点下创建子节点;

  • EPHEMERAL_SEQUENTIAL -- 临时顺序节点,特性同上。

代码实现

 
   
   
 
  1. public class DistributedLock implements Watcher {

  2.    private int threadId;

  3.    private ZooKeeper zk = null;

  4.    private String selfPath;

  5.    private String waitPath;

  6.    private String LOG_PREFIX_OF_THREAD;

  7.    private static final int SESSION_TIMEOUT = 10000;

  8.    private static final String GROUP_PATH = "/disLocks";

  9.    private static final String SUB_PATH = "/disLocks/sub";

  10.    private static final String CONNECTION_STRING = "192.168.136.86:2181";

  11.    private static final int THREAD_NUM = 10;

  12.    //确保连接zk成功;

  13.    private CountDownLatch connectedSemaphore = new CountDownLatch(1);

  14.    //确保所有线程运行结束;

  15.    private static final CountDownLatch threadSemaphore = new CountDownLatch(THREAD_NUM);

  16.    private static final Logger LOG = LoggerFactory.getLogger(DistributedLock.class);

  17.    public DistributedLock(int id) {

  18.        this.threadId = id;

  19.        LOG_PREFIX_OF_THREAD = "【第"+threadId+"个线程】";

  20.    }

  21.    public static void main(String[] args) {

  22.        for(int i=0; i < THREAD_NUM; i++){

  23.            final int threadId = i+1;

  24.            new Thread(){

  25.                @Override

  26.                public void run() {

  27.                    try{

  28.                        DistributedLock dc = new DistributedLock(threadId);

  29.                        dc.createConnection(CONNECTION_STRING, SESSION_TIMEOUT);

  30.                        //GROUP_PATH不存在的话,由一个线程创建即可;

  31.                        //可不加锁,zk创建节点是阻塞的,无并发问题

  32.                        //synchronized (threadSemaphore){

  33.                            dc.createPath(GROUP_PATH, "该节点由线程" + threadId + "创建", true);

  34.                        //}

  35.                        dc.getLock();

  36.                    } catch (Exception e){

  37.                        LOG.error("【第"+threadId+"个线程】 抛出的异常:");

  38.                        e.printStackTrace();

  39.                    }

  40.                }

  41.            }.start();

  42.        }

  43.        try {

  44.            threadSemaphore.await();

  45.            LOG.info("所有线程运行结束!");

  46.        } catch (InterruptedException e) {

  47.            e.printStackTrace();

  48.        }

  49.    }

  50.    /**

  51.     * 获取锁

  52.     * @return

  53.     */

  54.    private void getLock() throws KeeperException, InterruptedException {

  55.        selfPath = zk.create(SUB_PATH,null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);

  56.        LOG.info(LOG_PREFIX_OF_THREAD+"创建锁路径:"+selfPath);

  57.        if(checkMinPath()){

  58.            getLockSuccess();

  59.        }

  60.    }

  61.    /**

  62.     * 创建节点

  63.     * @param path 节点path

  64.     * @param data 初始数据内容

  65.     * @return

  66.     */

  67.    public boolean createPath( String path, String data, boolean needWatch) throws KeeperException, InterruptedException {

  68.        if(zk.exists(path, needWatch)==null){

  69.            LOG.info( LOG_PREFIX_OF_THREAD + "节点创建成功, Path: "

  70.                    + this.zk.create( path,

  71.                    data.getBytes(),

  72.                    ZooDefs.Ids.OPEN_ACL_UNSAFE,

  73.                    CreateMode.PERSISTENT )

  74.                    + ", content: " + data );

  75.        }

  76.        return true;

  77.    }

  78.    /**

  79.     * 创建ZK连接

  80.     * @param sessionTimeout Session超时时间

  81.     */

  82.    public void createConnection( String connectString, int sessionTimeout ) throws IOException, InterruptedException {

  83.        zk = new ZooKeeper( connectString, sessionTimeout, this);

  84.        connectedSemaphore.await();

  85.    }

  86.    /**

  87.     * 获取锁成功

  88.     */

  89.    public void getLockSuccess() throws KeeperException, InterruptedException {

  90.        if(zk.exists(this.selfPath,false) == null){

  91.            LOG.error(LOG_PREFIX_OF_THREAD+"本节点已不在了...");

  92.            return;

  93.        }

  94.        LOG.info(LOG_PREFIX_OF_THREAD + "获取锁成功,赶紧干活!");

  95.        Thread.sleep(2000);

  96.        LOG.info(LOG_PREFIX_OF_THREAD + "删除本节点:"+selfPath);

  97.        zk.delete(this.selfPath, -1);

  98.        releaseConnection();

  99.        threadSemaphore.countDown();

  100.    }

  101.    /**

  102.     * 关闭ZK连接

  103.     */

  104.    public void releaseConnection() {

  105.        if ( this.zk !=null ) {

  106.            try {

  107.                this.zk.close();

  108.            } catch ( InterruptedException e ) {}

  109.        }

  110.        LOG.info(LOG_PREFIX_OF_THREAD + "释放连接");

  111.    }

  112.    /**

  113.     * 检查自己是不是最小的节点

  114.     * @return

  115.     */

  116.    public boolean checkMinPath() throws KeeperException, InterruptedException {

  117.        List<String> subNodes = zk.getChildren(GROUP_PATH, false);

  118.        Collections.sort(subNodes);

  119.        int index = subNodes.indexOf( selfPath.substring(GROUP_PATH.length()+1));

  120.        switch (index){

  121.            case -1:{

  122.                LOG.error(LOG_PREFIX_OF_THREAD+"本节点已不在了..."+selfPath);

  123.                return false;

  124.            }

  125.            case 0:{

  126.                LOG.info(LOG_PREFIX_OF_THREAD+"子节点中,我果然是老大"+selfPath);

  127.                return true;

  128.            }

  129.            default:{

  130.                this.waitPath = GROUP_PATH +"/"+ subNodes.get(index - 1);

  131.                LOG.info(LOG_PREFIX_OF_THREAD+"获取子节点中,排在我前面的"+waitPath);

  132.                try{

  133.                    zk.getData(waitPath, true, new Stat());

  134.                    return false;

  135.                }catch(KeeperException e){

  136.                    if(zk.exists(waitPath,false) == null){

  137.                        LOG.info(LOG_PREFIX_OF_THREAD+"子节点中,排在我前面的"+waitPath+"已失踪,幸福来得太突然?");

  138.                        return checkMinPath();

  139.                    }else{

  140.                        throw e;

  141.                    }

  142.                }

  143.            }

  144.        }

  145.    }

  146.    public void process(WatchedEvent event) {

  147.        if(event == null){

  148.            return;

  149.        }

  150.        Event.KeeperState keeperState = event.getState();

  151.        Event.EventType eventType = event.getType();

  152.        if ( Event.KeeperState.SyncConnected == keeperState) {

  153.            if ( Event.EventType.None == eventType ) {

  154.                LOG.info( LOG_PREFIX_OF_THREAD + "成功连接上ZK服务器" );

  155.                connectedSemaphore.countDown();

  156.            }else if (event.getType() == Event.EventType.NodeDeleted && event.getPath().equals(waitPath)) {

  157.                LOG.info(LOG_PREFIX_OF_THREAD + "收到情报,排我前面的家伙已挂,我是不是可以出山了?");

  158.                try {

  159.                    if(checkMinPath()){

  160.                        getLockSuccess();

  161.                    }

  162.                } catch (KeeperException e) {

  163.                    e.printStackTrace();

  164.                } catch (InterruptedException e) {

  165.                    e.printStackTrace();

  166.                }

  167.            }

  168.        }else if ( Event.KeeperState.Disconnected == keeperState ) {

  169.            LOG.info( LOG_PREFIX_OF_THREAD + "与ZK服务器断开连接" );

  170.        }/* else if ( Event.KeeperState.AuthFailed == keeperState ) {

  171.            LOG.info( LOG_PREFIX_OF_THREAD + "权限检查失败" );

  172.        }*/ else if ( Event.KeeperState.Expired == keeperState ) {

  173.            LOG.info( LOG_PREFIX_OF_THREAD + "会话失效" );

  174.        }

  175.    }

  176. }







以上是关于分布式锁的实现方式的主要内容,如果未能解决你的问题,请参考以下文章

Redis分布式锁的实现方式

分布式锁的实现方式和优缺点&Java代码实现

80% 人不知道的 Redis 分布式锁的正确实现方式(Java 版)

分布式锁三种解决方案

详解锁,分布式锁的几种实现方式

分布式锁的两种实现方式(基于redis和基于zookeeper)