AbstractQueuedSynchronizer和ReentranLock基本原理
Posted chongcheng
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了AbstractQueuedSynchronizer和ReentranLock基本原理相关的知识,希望对你有一定的参考价值。
先把我主要学习参考的文章放上来先,这篇文章讲的挺好的,分析比较到位,最好是先看完这篇文章,在接下去看我写的。不然你会一脸懵逼,不过等你看完这篇文章,可能我的文章对你也用途不大了
深入分析AbstractQueuedSynchronizer独占锁的实现原理:ReentranLock
小弟我也是刚开始研究这个Lock锁相关,如果哪里写的有问题,希望各位大哥大姐帮忙指出,谢谢。
正文:
AbstractQueuedSynchronizer(简称AQS),抽象队列同步器,AQS是整个Concurrent包中最核心的地方,其它的并发工具也都是使用AQS来实现的,我们所熟悉的ConcurrentHashMap也是在这玩意的基础上实现的同步。
在深入学习之前,我们先来看看Concurrent包里面有什么东东:
这篇文章只会讲AbstractQueuedSynchronizer和ReentrantLock,而且也只讲加锁和放锁。
大概说一下加锁的原理:
AbstractQueuedSynchronizer 有一个state字段,当这个字段为0时,就代表当前锁对象可以被线程获取到锁;相反,如果state这个字段不为0,那当前锁对象就不能被线程获取到锁。state这个字段就相当于sychronized关键字所用的monitor。
AbstractQueuedSynchronizer有一个链表队列,这个队列每个节点都是一个等待获取锁的线程节点。
当线程尝试获取锁时,先直接尝试获取锁(也就是判断state是不是为0),如果获取不到锁,那就向队尾插入一个新节点,并且当前线程不断循环去获取锁(一定次数获取不到就会挂起线程,直到被别的线程唤醒)。
AbstractQueuedSynchronizer的state字段是用volatile进行修饰的(保证了对所有线程可见),同时对state的修改是使用compareAndSet进行乐观锁修改的,正是因为这两点,才使得不用sychronized也能在多线程下保证线程安全。
释放锁:
释放锁其实更简单,就是讲 state减少至0,正常来讲,每次加锁都是对state进行加1操作,对应的每次释放锁都是对state进行减1操作。
一个线程可以对一个锁进行重复次加锁,state可以一直加加加,但是当这个线程不在需要这个锁时,就必须将这个锁的state减少至0,也就是说你用了多少次lock(),就必须用多少次unLock(),否则这把锁就出错了,报废了,
再也没有线程可以获取到锁了,所有线程都卡死在这了。这一点和sychronized不同,sychronized会自动加锁放锁,但是lock必须自己手动进行。
另外,当一个线程将锁都释放完后,会唤醒其他等待这把锁,但是被挂起了的线程,让他们重新去获取锁。
AbstractQueuedSynchronizer里面比较关键的代码:
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer{ protected AbstractQueuedSynchronizer() { } /** * 这玩意会形成一个双向队列连表,用于保存等待锁的线程。 * 队列首节点就是获取到锁的线程节点,新增线程请求锁,会加入到链表尾 * */ static final class Node { /** 共享模式节点 */ static final Node SHARED = new Node(); /** 独占模式节点 */ static final Node EXCLUSIVE = null; // 线程节点的状态 /** waitStatus value to indicate thread has cancelled */ static final int CANCELLED = 1; /** waitStatus value to indicate successor‘s thread needs unparking */ // 只有当前节点的前一个节点为SIGNAL时,才能当前节点才能被挂起。 static final int SIGNAL = -1; /** waitStatus value to indicate thread is waiting on condition */ static final int CONDITION = -2; /** * waitStatus value to indicate the next acquireShared should * unconditionally propagate */ static final int PROPAGATE = -3; /** * Status field, taking on only the values: * SIGNAL: The successor of this node is (or will soon be) * blocked (via park), so the current node must * unpark its successor when it releases or * cancels. To avoid races, acquire methods must * first indicate they need a signal, * then retry the atomic acquire, and then, * on failure, block. * CANCELLED: This node is cancelled due to timeout or interrupt. * Nodes never leave this state. In particular, * a thread with cancelled node never again blocks. * CONDITION: This node is currently on a condition queue. * It will not be used as a sync queue node * until transferred, at which time the status * will be set to 0. (Use of this value here has * nothing to do with the other uses of the * field, but simplifies mechanics.) * PROPAGATE: A releaseShared should be propagated to other * nodes. This is set (for head node only) in * doReleaseShared to ensure propagation * continues, even if other operations have * since intervened. * 0: None of the above * * The values are arranged numerically to simplify use. * Non-negative values mean that a node doesn‘t need to * signal. So, most code doesn‘t need to check for particular * values, just for sign. * * The field is initialized to 0 for normal sync nodes, and * CONDITION for condition nodes. It is modified using CAS * (or when possible, unconditional volatile writes). */
// 线程的等待状态 volatile int waitStatus; volatile Node prev; volatile Node next; /** * 当前节点的线程,也就是请求锁的线程 */ volatile Thread thread; /** * Link to next node waiting on condition, or the special * value SHARED. Because condition queues are accessed only * when holding in exclusive mode, we just need a simple * linked queue to hold nodes while they are waiting on * conditions. They are then transferred to the queue to * re-acquire. And because conditions can only be exclusive, * we save a field by using special value to indicate shared * mode. */ Node nextWaiter; /** * Returns true if node is waiting in shared mode. */ final boolean isShared() { return nextWaiter == SHARED; } /** * 获取前一个节点 */ final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) throw new NullPointerException(); else return p; } Node() { // Used to establish initial head or SHARED marker } Node(Thread thread, Node mode) { // Used by addWaiter this.nextWaiter = mode; this.thread = thread; } Node(Thread thread, int waitStatus) { // Used by Condition this.waitStatus = waitStatus; this.thread = thread; } } /** * 链表队列首节点 */ private transient volatile Node head; /** * 链表队列尾节点 */ private transient volatile Node tail; /** * 同步状态值,注意,这个值就是锁的关键,也就是线程打生打死都在抢的东西 * 我们所谓的获取锁,就是state这个值为0的时候给他加1 * 释放锁就是把state的值减1 * 当这个值重新变成0时,锁就被彻底释放,其他线程就可以获取锁了。 * 如果state不为0,那是不管怎样都抢不到锁的 * * 可能会有人问,state仅用volatile修饰,还是线程不安全的。所以,所有关于volatile修饰的变量 * 在这里都是通过compareAndSet来进行修改的 */ private volatile int state; /** * 独占锁的线程,这个属性是父类AbstractOwnableSynchronizer的,这里需要注意 * 在线程抢到锁后会将这个属性设成自己的线程 * */ private transient Thread exclusiveOwnerThread; /** * 请求获取锁。 * AbstractQueuedSynchronizer请求锁是非公平锁。 * 什么是公平锁什么是非公平锁呢? * 公平锁就是先来先拿锁,后来得排队。 * 非公平锁就是后来者先尝试插队取锁,如果别人还没用完锁并归还,那他就只能乖乖排队。 * 两者的唯一差别就是非公平锁会先进行一次获取锁操作,失败就和公平锁一样排队获取。 * */ public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // 如果插不了队取锁,就新增一个节点去连表队列尾部排队获取锁 selfInterrupt(); } /** * 这个方法是子类必须实现的,用于具体获取锁 * */ protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); } /** * 队列节点自旋获取锁。 * 什么叫自旋呢?自旋就是自己一直循环执行 * */ final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); /** * 在队列里获取锁是按先后顺序进行获取的 * 因为队列首节点就是获取到锁的线程,所以首节点后的一个节点就是下一个获取锁的线程,所以这里要求p == head, * 这样即便多线程下,别的线程因为不是第二个节点,所以也不能去抢锁 * 另外首节点有可能还在执行,也有可能执行完了并释放了锁,所以还要尝试获取一次锁tryAcquire(arg) * */ if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } // 如果抢不到锁,他会判断自己这个线程是否需要进行挂起,不浪费CPU资源 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } /** * 抢锁失败后判断是否需要挂起当前线程 * 他是根据前一个节点进行判断的 * */ private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; // 只有前一个节点为SIGNAL的节点,才会将线程挂起,其他线程等待状态都会在尝试一次 // 绝大多数线程在多次获取不到锁后,都会被置为SIGNAL这个状态,然后挂起 if (ws == Node.SIGNAL) /* * This node has already set status asking a release * to signal it, so it can safely park. */ return true; if (ws > 0) { /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don‘t park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; } /** * 挂起线程,让他休眠,有点类似于wait方法,CPU将不再调用他,需要对应的unpark方法进行唤醒,unpark会在获取到锁的线程 * 释放锁或者取消后才调用 * */ private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); } /** * 释放锁,如果释放锁成功了,就会调用unparkSuccessor来唤醒其他休眠的线程 * */ public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; } /** * 释放锁的具体方法,子类必须实现 * */ protected boolean tryRelease(int arg) { throw new UnsupportedOperationException(); } /***/ private void unparkSuccessor(Node node) { /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; // 从链表尾开始一路往前遍历,找到最前面的非cancelled的节点 for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } // 唤醒线程 if (s != null) LockSupport.unpark(s.thread); } }
ReentrantLock 的一些关键代码:
public class ReentrantLock implements Lock, java.io.Serializable { /** * 同步器,继承自AbstractQueuedSynchronizer * */ private final Sync sync; /** * 这个锁同步器可以是公平锁也可以非公平锁 * */ abstract static class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = -5179523762034025860L; /** * 子类必须实现上锁方法 */ abstract void lock(); /** * 非公平锁获取锁 */ final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } /** * 释放锁,释放锁没有公平和非公平的说法,都一样 * */ protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; } } /** * 非公平锁同步器 * */ static final class NonfairSync extends Sync { /** * 获取锁,这个方法会一直阻塞,知道获取到锁为止,或者被取消 */ final void lock() { // 因为是非公平的,所以会先尝试一次获取锁,失败就调用父类的acquire方法排队获取 if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); } /** * 尝试获取锁,只尝试一次,成功就成功,失败就失败 * */ protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } } /** * 公平锁同步器 * */ static final class FairSync extends Sync { /** * 因为这是公平锁,所以他直接就进入队列进行排队 * */ final void lock() { /** * 当然,因为AbstractQueuedSynchronizer的acquire本身就是会尝试先获取一次锁,失败才排队,所以严格来说他还是非公平的 * */ acquire(1); } /** * 如果没有别的线程在排队,他才会去获取锁,否则都不获取 */ protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } } /** * ReentrantLock默认就是非公平锁 * */ public ReentrantLock() { sync = new NonfairSync(); } /** * ReentrantLock也支持公平锁 * */ public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); } /** * 获取锁 * */ public void lock() { sync.lock(); } /** * 尝试加锁 * */ public boolean tryLock() { return sync.nonfairTryAcquire(1); } /** * 释放锁,这里是直接用AbstractQueuedSynchronizer的释放方法 * */ public void unlock() { sync.release(1); } }
以上是关于AbstractQueuedSynchronizer和ReentranLock基本原理的主要内容,如果未能解决你的问题,请参考以下文章