Java Lock-同步的另一种实现

Posted JOE-1992

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java Lock-同步的另一种实现相关的知识,希望对你有一定的参考价值。

通常初级的程序员喜欢使用synchronized关键字来实现同步机制,理由很简单,使用它简单,我们不用考虑更多的细节,对程序员的要求比较低。那这里我们介绍另外一种通过Lock实现的同步的方法,显然使用Lock方法,能够使程序并发更加高效、灵活,其对程序员的要求也就更高。

Lock中的方法

public interface Lock {
    void lock();
    void lockInterruptibly() throws InterruptedException;
    boolean tryLock();
    boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
    void unlock();
    Condition newCondition();
}

从Lock源码中我们可以看到它是一个接口。

  • lock() 如果能够获取锁,就返回,如果不能获取,它就会一直在等待获取锁。

  • lockInterruptibly() 支持线程中断,如果线程中断,默认它就不会去竞争了,这个相对于同步代码代码与同步方法而言,可扩展性还是大了很多。

  • tryLock() 如果能获取锁,就立马返回true,否则就返回false。

  • tryLock(long time, TimeUnit unit) 如果能获锁,就返回true,如果有以下两种情况,就会设置成false:
    一是线程被中断了;
    二是时间到了。

  • unlock() 释放一个锁。

  • Condition newCondition() 引入Condition有两个方法的意图:

    a.对一个共享资源有读和写的能力,如果读线程或写线程获取了Lock的权力,即有能力进入,但是如果里面没有内容,读也没有用,如果空间已满了,写也写不了,所有还得有条件去判断一下,是不是线程要等待了;

    b.提供一种多线程之间的通信机制,类似wait()和nofity()的原理。

锁相关概念介绍

1.可重入锁

可重入锁,也叫做递归锁,指的是同一线程 外层函数获得锁之后 ,内层递归函数仍然有获取该锁的代码,但不受影响。

在JAVA环境下 ReentrantLock 和synchronized 都是可重入锁,可重入锁最大的作用是避免死锁。

class MyClass {
    public synchronized void method1() {
        method2();
    }

    public synchronized void method2() {

    }
}

在一个线程获得method1方法的锁后,进入method1,同样被synchronized修饰的method2方法就不需要再次去获取锁了。因为synchronized具有可重入性,所以进入直接运行。

2.可中断锁

可中断锁:即为可以实现中断的锁。

在Java中,synchronized就不是可中断锁,而Lock是可中断锁,这也是我们需要Lock实现同步的重要原因。

如果某一线程A正在执行锁中的代码,另一线程B正在等待获取该锁,可能由于等待时间过长,线程B不想等待了,想先处理其他事情,我们可以让它中断自己或者在别的线程中中断它,这种就是可中断锁。

在前面Lock的接口方法lockInterruptibly()的用法时已经体现了Lock的可中断性。

3.公平锁

公平锁即尽量以请求锁的顺序来获取锁。比如同是有多个线程在等待一个锁,当这个锁被释放时,等待时间最久的线程(最先请求的线程)会获得该所,这种就是公平锁。

非公平锁即无法保证锁的获取是按照请求锁的顺序进行的。这样就可能导致某个或者一些线程永远获取不到锁。

在Java中,synchronized就是非公平锁,它无法保证等待的线程获取锁的顺序。

而对于ReentrantLock和ReentrantReadWriteLock,它默认情况下是非公平锁,但是可以设置为公平锁,等下后面会有介绍。

Lock的实现

1.ReentrantLock

ReentrantLock,也就是可重入锁,前面已经介绍过可重入锁的概念。现在就从源码的层面来分析分析ReentrantLock,这将有助于我们更好的理解线程调度,锁的实现,掌握更多的高并发编程的思想。

ReentrantLock中lock() 方法实现:

public void lock() {
        sync.lock();
    }

sync的声明:

/** Synchronizer providing all implementation mechanics */
    private final Sync sync;

这部分看起来很简单,也很好理解,但是让你困惑的是sync是什么,它的内部是怎样实现的呢?下面我们来看看sync究竟是怎样实现的。通过进入lock源码我们发现:

这里写图片描述

Sync,FairSync和NonFairSync都是ReentrantLock的静态内部类。Sync 是一个抽象类,而FairSync和NonFairSync则是具体类,分别对应了公平锁和非公平锁。这里我们主要介绍非公平锁。

首先我们来看看最核心的AbstractQueuedSynchronizer 类,Sync继承了这个类,最重要的两个数据成员当前锁状态和等待链表都是由它来实现的。

/**
     * Head of the wait queue, lazily initialized.  Except for
     * initialization, it is modified only via method setHead.  Note:
     * If head exists, its waitStatus is guaranteed not to be
     * CANCELLED.
     */
    private transient volatile Node head;

    /**
     * Tail of the wait queue, lazily initialized.  Modified only via
     * method enq to add new wait node.
     */
    private transient volatile Node tail;
    /**
     * The synchronization state.
     */
    private volatile int state; // 记录了当前锁被锁定的次数

当state值为0,说明未被绑定,加锁通过更高state值来实现,而更改状态主要由函数compareAndSetState实现。调用cas原语以保证操作的原子性,如果state值为expect,则更新为update值且返回true,否则不更改state且返回false.

 /**
     * Atomically sets synchronization state to the given updated
     * value if the current state value equals the expected value.
     * This operation has memory semantics of a {@code volatile} read
     * and write.
     *
     * @param expect the expected value
     * @param update the new value
     * @return {@code true} if successful. False return indicates that the actual
     *         value was not equal to the expected value.
     */
    protected final boolean compareAndSetState(int expect, int update) {
        // See below for intrinsics setup to support this
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }

当前线程则是在AbstractOwnableSynchronizer中:

/**
     * The current owner of exclusive mode synchronization.
     */
    private transient Thread exclusiveOwnerThread;

了解了这些基本的数据结构后,我们再来看sync.lock()的究竟,NonfairSync源码:

  /**
     * Sync object for non-fair locks
     */
    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = 7316153563782823691L;

        /**
         * Performs lock.  Try immediate barge, backing up to normal
         * acquire on failure.
         */
        final void lock() {

            // 如果锁没有被任何线程锁定且加锁成功则设定当前线程为锁的拥有者
            // 如果锁已被当前线程锁定,则在acquire中将状态加1并返回
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                // 加锁失败,再次尝试加锁,失败则加入等待队列,禁用当前线程,直到被中断或有线程释放锁时被唤醒
                acquire(1);
        }

        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
    }

acquire方法在AbstractQueuedSynchronizer中的实现:

 /**
     * Acquires in exclusive mode, ignoring interrupts.  Implemented
     * by invoking at least once {@link #tryAcquire},
     * returning on success.  Otherwise the thread is queued, possibly
     * repeatedly blocking and unblocking, invoking {@link
     * #tryAcquire} until success.  This method can be used
     * to implement method {@link Lock#lock}.
     *
     * @param arg the acquire argument.  This value is conveyed to
     *        {@link #tryAcquire} but is otherwise uninterpreted and
     *        can represent anything you like.
     */

    public final void acquire(int arg) {
        // 首先尝试获取锁,成功则直接返回
        // 否则将当前线程加入锁的等待队列并禁用当前线程
        // 直到线程被中断或者在锁为其它线程释放时唤醒
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

尝试获得锁,调用nonfairTryAcquire(acquires);方法,该方法的实现如下:

/**
         * Performs non-fair tryLock.  tryAcquire is implemented in
         * subclasses, but both need nonfair try for trylock method.
         */
        final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                // 如果锁空闲则尝试锁定,成功则设当前线程为锁拥有者
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            // 若当前线程为锁拥有者则直接修改锁状态计数
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            // 尝试获取锁失败,返回false
            return false;
        }

在tryAcquire失败后则进行如下操作

第一步调用AbstractQueuedSynchronizer.addWaiter将当前线程加入等待队列尾部。

/**
     * Creates and enqueues node for given thread and mode.
     *
     * @param current the thread
     * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
     * @return the new node
     */
    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }

第二步调用AbstractQueuedSynchronizer.acquireQueued让线程进入禁用状态,并在每次被唤醒时尝试获取锁,失败则继续禁用线程。

/**
     * Acquires in exclusive uninterruptible mode for thread already in
     * queue. Used by condition wait methods as well as acquire.
     *
     * @param node the node
     * @param arg the acquire argument
     * @return {@code true} if interrupted while waiting
     */
    final boolean acquireQueued(final Node node, int arg) {
        try {
            boolean interrupted = false;
            for (;;) {
                // 如果当前线程是head的直接后继则尝试获取锁
                // 这里不会和等待队列中其它线程发生竞争,但会和尝试获取锁且尚未进入等待队列的线程发生竞争。这是非公平锁和公平锁的一个重要区别。
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    return interrupted;
                }
                // 如果不是head直接后继或获取锁失败,则检查是否要禁用当前线程
                // 是则禁用,直到被lock.release唤醒或线程中断
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } catch (RuntimeException ex) {
            cancelAcquire(node);
            throw ex;
        }
    }

AbstractQueuedSynchronizer. shouldParkAfterFailedAcquire做了一件很重要的事:根据状态对等待队列进行清理,并设置等待信号。

这里需要先说明一下waitStatus,它是AbstractQueuedSynchronizer的静态内部类Node的成员变量,用于记录Node对应的线程等待状态.等待状态在刚进入队列时都是0,如果等待被取消则被设为Node.CANCELLED,若线程释放锁时需要唤醒等待队列里的其它线程则被置为Node.SIGNAL,还有一种状态Node.CONDITION这里先不讨论。

/** waitStatus value to indicate thread has cancelled */
        static final int CANCELLED =  1;
        /** waitStatus value to indicate successor's thread needs unparking */
        static final int SIGNAL    = -1;
        /** waitStatus value to indicate thread is waiting on condition */
        static final int CONDITION = -2;

AbstractQueuedSynchronizer.shouldParkAfterFailedAcquire实现如下:

/**
     * Checks and updates status for a node that failed to acquire.
     * Returns true if thread should block. This is the main signal
     * control in all acquire loops.  Requires that pred == node.prev
     *
     * @param pred node's predecessor holding status
     * @param node the node
     * @return {@code true} if thread should block
     */
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int s = pred.waitStatus;
        if (s < 0)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park
             */
            // 如果前置结点waitStatus已经被置为SIGNAL,则返回true,可以禁用线程
            return true;
        if (s > 0) {
            // 如果前置结果已被CALCEL,则移除。
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
        do {
        node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    }
        else
            /*
             * Indicate that we need a signal, but don't park yet. Caller
             * will need to retry to make sure it cannot acquire before
             * parking.
             */
            // 原子性将前置结点waitStatus设为SIGNAL
            compareAndSetWaitStatus(pred, 0, Node.SIGNAL);
        // 这里一定要返回false,有可能前置结点这时已经释放了锁,但因其waitStatus在释放锁时还未被置为SIGNAL而未触发唤醒等待线程操作,因此必须通过return false来重新尝试一次获取锁
        return false;
    }

AbstractQueuedSynchronizer.parkAndCheckInterrupt实现如下,很简单,直接禁用线程,并等待被唤醒或中断发生。对java中Thread.interrupted()都作了什么不甚了解的要做功课。

这里线程即被堵塞,醒来时会重试获取锁,失败则继续堵塞。即使Thread.interrupted()也无法中断。那些想在等待时间过长时中断退出的线程可以调用ReentrantLoc.lockInterruptibly().

/**
     * Convenience method to park and then check if interrupted
     *
     * @return {@code true} if interrupted
     */
    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }

ReadWriteLock

java.util.concurrent.locks.ReadWriteLock 读写锁是一个接口类,我们知道读写锁的概念,它允许多个线程同一时间对特定资源读取,但只允许一个线程对资源进行写操作。

其接口定义:

public interface ReadWriteLock {
    /**
     * Returns the lock used for reading.
     *
     * @return the lock used for reading
     */
    Lock readLock();

    /**
     * Returns the lock used for writing.
     *
     * @return the lock used for writing
     */
    Lock writeLock();
}
  • 读锁:当没有写操作线程锁定ReadWriteLock ,并且没有任何线程要求获得写操作,这时候允许多个读线程进行锁定。

  • 写锁:当没有任何线程读操作或者写操作时,允许唯一线程进行写操作锁定。

2.ReentrantReadWriteLock

ReadWriteLock(读写锁)的概念了解后,我们要深入研究它的实现ReentrantReadWriteLock 。可重入的概念以及实现在以上内容已经有详细的介绍,这里主要介绍ReentrantReadWriteLock 它的两个主要的readLock()和writeLock()内部类的实现。

ReadLock 内部实现:

    public static class ReadLock implements Lock, java.io.Serializable {
        private static final long serialVersionUID = -5992448646407690164L;
        private final Sync sync;

        protected ReadLock(ReentrantReadWriteLock lock) {
            sync = lock.sync;
        }

        public void lock() {
            sync.acquireShared(1);
        }

        public void lockInterruptibly() throws InterruptedException {
            sync.acquireSharedInterruptibly(1);
        }

        public boolean tryLock() {
            return sync.tryReadLock();
        }

        public boolean tryLock(long timeout, TimeUnit unit)
                throws InterruptedException {
            return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
        }

        public void unlock() {
            sync.releaseShared(1);
        }

        public Condition newCondition() {
            throw new UnsupportedOperationException();
        }

        public String toString() {
            int r = sync.getReadLockCount();
            return super.toString() +
                "[Read locks = " + r + "]";
        }
    }

WriteLock 实现:

    public static class WriteLock implements Lock, java.io.Serializable {
        private static final long serialVersionUID = -4992448646407690164L;
        private final Sync sync;

        protected WriteLock(ReentrantReadWriteLock lock) {
            sync = lock.sync;
    }

        public void lock() {
            sync.acquire(1);
        }

        public void lockInterruptibly() throws InterruptedException {
            sync.acquireInterruptibly(1);
        }

        public boolean tryLock( ) {
            return sync.tryWriteLock();
        }

        public boolean tryLock(long timeout, TimeUnit unit)
                throws InterruptedException {
            return sync.tryAcquireNanos(1, unit.toNanos(timeout));
        }

        public void unlock() {
            sync.release(1);
        }

        public Condition newCondition() {
            return sync.newCondition();
        }

        public String toString() {
            Thread o = sync.getOwner();
            return super.toString() + ((o == null) ?
                                       "[Unlocked]" :
                                       "[Locked by thread " + o.getName() + "]");
        }

        public boolean isHeldByCurrentThread() {
            return sync.isHeldExclusively();
        }

        public int getHoldCount() {
            return sync.getWriteHoldCount();
        }
    }

简单的从类方法的声明上,我们很难看出读写锁的区别,这里我们分析它们直接不同的点。

(1)trylock() 获取锁

tryReadLock() 的源码实现:

        /**
         * Performs tryLock for read, enabling barging in both modes.
         * This is identical in effect to tryAcquireShared except for
         * lack of calls to readerShouldBlock.
         */
        final boolean tryReadLock() {
            Thread current = Thread.currentThread();
            for (;;) {
                int c = getState();
                // 如果有写锁占用并且当前线程不是setExclusiveOwnerThread,返回false
                if (exclusiveCount(c) != 0 &&
                    getExclusiveOwnerThread() != current)
                    return false;
                int r = sharedCount(c);// 获得共享锁的数量
                if (r == MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
                if (compareAndSetState(c, c + SHARED_UNIT)) {
                    if (r == 0) {
                        firstReader = current;
                        firstReaderHoldCount = 1;
                    } else if (firstReader == current) {
                        firstReaderHoldCount++;
                    } else {
                        HoldCounter rh = cachedHoldCounter;
                        if (rh == null || rh.tid != getThreadId(current))
                            cachedHoldCounter = rh = readHolds.get();
                        else if (rh.count == 0)
                            readHolds.set(rh);
                        rh.count++;
                    }
                    return true;
                }
            }
        }

tryWriteLock() 的源码实现:

        /**
         * Performs tryLock for write, enabling barging in both modes.
         * This is identical in effect to tryAcquire except for lack
         * of calls to writerShouldBlock.
         */
        final boolean tryWriteLock() {
            Thread current = Thread.currentThread();
            int c = getState();
            if (c != 0) {
                int w = exclusiveCount(c);
                // 独占持有树为0,或者当前线程没有获得setExclusiveOwnerThread,返回false
                if (w == 0 || current != getExclusiveOwnerThread())
                    return false;
                if (w == MAX_COUNT) // 写线程超过最大数
                    throw new Error("Maximum lock count exceeded");
            }
            if (!compareAndSetState(c, c + 1))
                return false;
            setExclusiveOwnerThread(current);
            return true;
        }

从源码可以看出,ReadLock获取的是共享锁,WriteLock获取的是独占锁。WriteLock的实现与ReentrantLock里面的实现几乎相同,都是使用了AQS的acquire/release操作。

state字段(int类型,32位)用来描述有多少线程获持有锁。在独占锁的时代这个值通常是0或者1(如果是重入的就是重入的次数),在共享锁的时代就是持有锁的数量。

但在这里就需要两个变量来描述读写锁不同的数量,在ReentrantReadWrilteLock里面将这个字段一分为二,高位16位表示共享锁的数量,低位16位表示独占锁的数量(或者重入数量)。2^16-1=65536,这就是上节中提到的为什么共享锁和独占锁的数量最大只能是65535的原因了。

/*
         * Read vs write count extraction constants and functions.
         * Lock state is logically divided into two unsigned shorts:
         * The lower one representing the exclusive (writer) lock hold count,
         * and the upper the shared (reader) hold count.
         */

        static final int SHARED_SHIFT   = 16;
        static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
        static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
        static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

        /** Returns the number of shared holds represented in count  */
        static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }
        /** Returns the number of exclusive holds represented in count  */
        static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

感谢

学习源码的时候,虽然有些东西看的很吃力,但却让你能够获得不少的知识,你可以了解Java大师们是如何写好这些高效的并发程序的。这里也感谢以下博主:

Java并发编程:Lock

ReentrantLock代码剖析之ReentrantLock.lock

JAVA LOCK代码浅析

以上是关于Java Lock-同步的另一种实现的主要内容,如果未能解决你的问题,请参考以下文章

java实现自定义同步组件的过程

Java - “JUC”锁

显式锁和AQS

JUC系列LOCK框架系列之四 同步工具类Semaphore

JAVA基础——内部类详解

ConCurrent并发包 - Lock详解(转)