从阅读ReentrantLock 源码到实现自己的分布式锁

Posted 左沩

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了从阅读ReentrantLock 源码到实现自己的分布式锁相关的知识,希望对你有一定的参考价值。

    由于公司现在的架构师微服务,每个服务都需要进行分布式部署,对于一些功能,可能就需要考虑用分布式锁,分布式锁的实现方案有很多种,为了更升入的理解,楼主考虑深度的学习下jdk的可重入锁ReentrantLock

   打开ReentrantLock的源码便发现 它内部实现了aqs,通过继承aqs实现了公平锁Sync,非公平锁NonfairSync

说到这我介绍下什么是aqs

aqs 是一个抽象队列同步器,设计模式是模板模式。
核心数据结构:双向链表 + state(锁状态)
底层操作:CAS

首先介绍以下它的设计模式,我们加锁的时候调用 lock()方法,如下代码

  final void lock() 
            acquire(1);
        
    public final void acquire(int arg) 
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    

 protected boolean tryAcquire(int arg) 
        throw new UnsupportedOperationException();
    

通过代码跟踪,我们发现acquire 里面的tryAcquire 方法并没有实现,但定义了一个完整的流程,tryacquire 是在子类里面实现的

我们通过跟踪代码来看一下 公平锁的实现

   static final class FairSync extends Sync 
        private static final long serialVersionUID = -3000897897090466540L;

        final void lock() 
            acquire(1);
        

        protected final boolean tryAcquire(int acquires) 
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) 
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) 
                    setExclusiveOwnerThread(current);
                    return true;
                
            
            else if (current == getExclusiveOwnerThread()) 
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            
            return false;
        
    

这段代码我们看到了我们上面提到的state, state是干嘛用的呢?

 通过设置state状态标识锁有没有被占用,当state>0 时,表示锁被占用,当state=0 时,表示锁空闲,可以被获取,state是一个全局变量,设计到可见性问题,因此我们看源码

private volatile int state; 定义state 用了 volatile 保证可见性

我们在研究这段代码 当state=0 时,并且当前线程在头节点时,便可以获取锁,并设置新的state 状态, compareAndSetState(0, acquires)),我们深入这个代码

   /**
     * Atomically sets synchronization state to the given updated
     * value if the current state value equals the expected value.
     * This operation has memory semantics of a @code volatile read
     * and write.
     *
     * @param expect the expected value
     * @param update the new value
     * @return @code true if successful. False return indicates that the actual
     *         value was not equal to the expected value.
     */
    protected final boolean compareAndSetState(int expect, int update) 
        // See below for intrinsics setup to support this
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    

通过这段代码我们发现设置state 的状态使用的就是乐观锁,当前值是否与预期值相同,相同就设置成功。

在acquire 方法中,当获取锁失败时,我们知道一般会做持续获取锁,直到超时时间,才会返回获取锁失败,

那么看下本次获取锁失败后执行的方法是acquireQueued(addWaiter(Node.EXCLUSIVE), arg))

方法,我们通过源码来看下,源码是如何操作的

    private Node addWaiter(Node mode) 
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) 
            node.prev = pred;
            if (compareAndSetTail(pred, node)) 
                pred.next = node;
                return node;
            
        
        enq(node);
        return node;
      
    private Node enq(final Node node) 
        for (;;) 
            Node t = tail;
            if (t == null)  // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
             else 
                node.prev = t;
                if (compareAndSetTail(t, node)) 
                    t.next = node;
                    return t;
                
            
        
      

final boolean acquireQueued(final Node node, int arg) 
        boolean failed = true;
        try 
            boolean interrupted = false;
            for (;;) 
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) 
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            
         finally 
            if (failed)
                cancelAcquire(node);
        
    

通过源码我们可以看到

  1. addWaiter:通过自旋CAS,将当前线程加入上面锁的双向链表(等待队列)中。
  2. acquireQueued:通过自旋,判断当前队列节点是否可以获取锁。

通过上面的分析,那么我们来分析通过reids 实现一个分布式锁,如何实现,以及有哪些注意事项?

我们通过对redis 设置key ,当key不存在时,可以获取锁,当key存在时,表示锁已获取,并且reids单线程操作,数据通过卡槽分布,不存在线程安全问题。

那么有哪些注意事项呢

1.加锁过程必须设置过期时间,加锁和设置过期时间过程必须是原子操作
如果没有设置过期时间,那么就发生死锁,锁永远不能被释放。如果加锁后服务宕机或程序崩溃,来不及设置过期时间,同样会发生死锁。
2.解锁必须是解除自己加上的锁
试想一个这样的场景,服务A加锁,但执行效率非常慢,导致锁失效后还未执行完,但这时候服务B已经拿到锁了,这时候服务A执行完毕了去解锁,把服务B的锁给解掉了,其他服务C、D、E...都可以拿到锁了,这就有问题了。加锁的时候我们可以设置唯一value,解锁时判断是不是自己先前的value就行了

public class RedisLock 

    /**
     * 解锁脚本,原子操作
     */
    private static final String unlockScript =
            "if redis.call(\\"get\\",KEYS[1]) == ARGV[1]\\n"
                    + "then\\n"
                    + "    return redis.call(\\"del\\",KEYS[1])\\n"
                    + "else\\n"
                    + "    return 0\\n"
                    + "end";

    private StringRedisTemplate redisTemplate;

    public RedisLock(StringRedisTemplate redisTemplate) 
        this.redisTemplate = redisTemplate;
    

    /**
     * 加锁,有阻塞
     * @param name
     * @param expire
     * @param timeout
     * @return
     */
    public String lock(String name, long expire, long timeout)
        long startTime = System.currentTimeMillis();
        String token;
        do
            token = tryLock(name, expire);
            if(token == null) 
                if((System.currentTimeMillis()-startTime) > (timeout-50))
                    break;
                try 
                    Thread.sleep(50); //try 50 per sec
                 catch (InterruptedException e) 
                    e.printStackTrace();
                    return null;
                
            
        while(token==null);

        return token;
    

    /**
     * 加锁,无阻塞
     * @param name
     * @param expire
     * @return
     */
    public String tryLock(String name, long expire) 
        String token = UUID.randomUUID().toString();
        RedisConnectionFactory factory = redisTemplate.getConnectionFactory();
        RedisConnection conn = factory.getConnection();
        try
            Boolean result = conn.set(name.getBytes(Charset.forName("UTF-8")), token.getBytes(Charset.forName("UTF-8")),
                    Expiration.from(expire, TimeUnit.MILLISECONDS), RedisStringCommands.SetOption.SET_IF_ABSENT);
            if(result!=null && result)
                return token;
        finally 
            RedisConnectionUtils.releaseConnection(conn, factory);
        
        return null;
    

    /**
     * 解锁
     * @param name
     * @param token
     * @return
     */
    public boolean unlock(String name, String token) 
        byte[][] keysAndArgs = new byte[2][];
        keysAndArgs[0] = name.getBytes(Charset.forName("UTF-8"));
        keysAndArgs[1] = token.getBytes(Charset.forName("UTF-8"));
        RedisConnectionFactory factory = redisTemplate.getConnectionFactory();
        RedisConnection conn = factory.getConnection();
        try 
            Long result = (Long)conn.scriptingCommands().eval(unlockScript.getBytes(Charset.forName("UTF-8")), ReturnType.INTEGER, 1, keysAndArgs);
            if(result!=null && result>0)
                return true;
        finally 
            RedisConnectionUtils.releaseConnection(conn, factory);
        

        return false;
    

参考文档

https://www.jianshu.com/p/7b0e11a1e605

redis实现分布式锁

https://www.cnblogs.com/heyanan/p/12800123.html

以上是关于从阅读ReentrantLock 源码到实现自己的分布式锁的主要内容,如果未能解决你的问题,请参考以下文章

Java并发——ReentrantLock类源码阅读

jdk1.8 J.U.C并发源码阅读------ReentrantLock源码解析

jdk1.8 J.U.C并发源码阅读------ReentrantLock源码解析

Java多线程——ReentrantLock源码阅读

ReentrantLock实现原理

ReentrantLock源码分析