jdk源码解析--ReentryReadWriteLock类(下)
Posted 我的IT技术路
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了jdk源码解析--ReentryReadWriteLock类(下)相关的知识,希望对你有一定的参考价值。
在上文中,我们对读写锁大概有了一个了解,本节我们将深入去看下读写锁内部的具体实现,内部的同步器Sync是怎么实现读写锁的功能。现在我们先看下sync的内部实现。
1. abstract static class Sync extends AbstractQueuedSynchronizer{
2. private static final long serialVersionUID = 6317671515068378041L;
3. //以下使用位运算计算共享的数量
4. static final int SHARED_SHIFT = 16;
5. static final int SHARED_UNIT = (1 << SHARED_SHIFT);
6. static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
7. static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
8.
9. /** Returns the number of shared holds represented in count */
10. static int sharedCount(int c) { return c >>> SHARED_SHIFT; }//返回共享的数量
11. /** Returns the number of exclusive holds represented in count */
12. static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }//返回独占的数量
13.
14. static final class HoldCounter {//一个由本地线程变量维护的缓存器,返回该线程占用的次数
15. int count = 0;
16. // Use id, not reference, to avoid garbage retention
17. final long tid = getThreadId(Thread.currentThread());
18. }
19. static final class ThreadLocalHoldCounter
20. extends ThreadLocal<HoldCounter> {//使用threadlocal实现计数器
21. public HoldCounter initialValue() {
22. return new HoldCounter();
23. }
24. }
25. private transient ThreadLocalHoldCounter readHolds;//缓存
26. private transient HoldCounter cachedHoldCounter;
27. private transient Thread firstReader = null;//第一个读线程
28. private transient int firstReaderHoldCount;//第一个读线程的锁住次数
29.
30. Sync() {
31. readHolds = new ThreadLocalHoldCounter();
32. setState(getState()); // 确保读线程可见
33. }
34. abstract boolean readerShouldBlock();//是否读阻塞
35. abstract boolean writerShouldBlock();//是否写阻塞
36. protected final boolean tryRelease(int releases) {//尝试释放
37. if (!isHeldExclusively())
38. throw new IllegalMonitorStateException();
39. int nextc = getState() - releases;//减去释放的次数
40. boolean free = exclusiveCount(nextc) == 0;//判断是否已经没有占用者
41. if (free)
42. setExclusiveOwnerThread(null);//如果没有占用者,全部释放
43. setState(nextc);//设置state的值
44. return free;//返回是否释放完
45. }
46.
47. protected final boolean tryAcquire(int acquires) {
48.
49. Thread current = Thread.currentThread();//获取当前线程
50. int c = getState();//获取占用次数
51. int w = exclusiveCount(c);//该线程的占用次数
52. if (c != 0) {//读锁的情况
53. // 如果读数量和写的数量不为0且占用者不是该线程,返回fail
54. if (w == 0 || current != getExclusiveOwnerThread())
55. return false;
56. //如果已经大于最大的占用量,返回fail
57. if (w + exclusiveCount(acquires) > MAX_COUNT)
58. throw new Error("Maximum lock count exceeded");
59. // 否则设置值
60. setState(c + acquires);
61. return true;
62. }
63. //写锁的判断
64. if (writerShouldBlock() ||
65. !compareAndSetState(c, c + acquires))//如果是写锁阻塞或者设置失败,返回不成功
66. return false;
67. setExclusiveOwnerThread(current);//设置当前占有者
68. return true;//返回成功
69. }
70. //共享的释放
71. protected final boolean tryReleaseShared(int unused) {
72. Thread current = Thread.currentThread();
73. if (firstReader == current) {//如果当前线程是第一个读者
74. // assert firstReaderHoldCount > 0;
75. if (firstReaderHoldCount == 1)//如果第一个读者的持有数量是1
76. firstReader = null;//设置firstReader 为null
77. else
78. firstReaderHoldCount--;//减1
79. } else {
80. HoldCounter rh = cachedHoldCounter;
81. if (rh == null || rh.tid != getThreadId(current))
82. rh = readHolds.get();//获取当前线程的计数器
83. int count = rh.count;//获取计数值
84. if (count <= 1) {//如果小于等于1
85. readHolds.remove();//清理当前的计数器,防止内存泄漏
86. if (count <= 0)
87. throw unmatchedUnlockException();
88. }
89. --rh.count;//否则减1
90. }
91. for (;;) {
92. int c = getState();
93. int nextc = c - SHARED_UNIT;
94. if (compareAndSetState(c, nextc))//CAS设置值
95. return nextc == 0;
96. }
97. }
98.
99. private IllegalMonitorStateException unmatchedUnlockException() {
100. return new IllegalMonitorStateException(
101. "attempt to unlock read lock, not locked by current thread");
102. }
103.
104. protected final int tryAcquireShared(int unused) {
105.
106.
107. Thread current = Thread.currentThread();
108. int c = getState();
109. if (exclusiveCount(c) != 0 &&
110. getExclusiveOwnerThread() != current)//如果不是当前线程,返回失败
111. return -1;
112. int r = sharedCount(c);//获取共享的占用数量
113. if (!readerShouldBlock() &&
114. r < MAX_COUNT &&
115. compareAndSetState(c, c + SHARED_UNIT)) {//读没阻塞且小于最大占用数且设置成功
116. if (r == 0) {//第一个占用者,设置第一个占用者的信息
117. firstReader = current;
118. firstReaderHoldCount = 1;
119. } else if (firstReader == current) {
120. firstReaderHoldCount++;//如果第一个占用者是当前线程,占用数+1
121. } else {//其他线程的话,就相应获取加1
122. HoldCounter rh = cachedHoldCounter;
123. if (rh == null || rh.tid != getThreadId(current))
124. cachedHoldCounter = rh = readHolds.get();
125. else if (rh.count == 0)
126. readHolds.set(rh);
127. rh.count++;
128. }
129. return 1;
130. }
131. return fullTryAcquireShared(current);//上面的操作不成功,可能是CAS失败,尝试更新之后再添加
132. }
133.
134.
135. final int fullTryAcquireShared(Thread current) {
136.
137. HoldCounter rh = null;
138. for (;;) {//死循环
139. int c = getState();
140. if (exclusiveCount(c) != 0) {
141. if (getExclusiveOwnerThread() != current)//不是当前线程
142. return -1;//返回失败
143. // else we hold the exclusive lock; blocking here
144. // would cause deadlock.
145. } else if (readerShouldBlock()) {//如果是读锁阻塞
146. // Make sure we're not acquiring read lock reentrantly
147. if (firstReader == current) {
148. // assert firstReaderHoldCount > 0;
149. } else {//当前线程不是第一个占用者
150. if (rh == null) {
151. rh = cachedHoldCounter;
152. if (rh == null || rh.tid != getThreadId(current)) {
153. rh = readHolds.get();
154. if (rh.count == 0)//如果为0,清除
155. readHolds.remove();
156. }
157. }
158. if (rh.count == 0)
159. return -1;
160. }
161. }
162. if (sharedCount(c) == MAX_COUNT)//占用超过最大值
163. throw new Error("Maximum lock count exceeded");
164. if (compareAndSetState(c, c + SHARED_UNIT)) {//以下和上面tryAcquireShared一样重复了过程
165. if (sharedCount(c) == 0) {
166. firstReader = current;
167. firstReaderHoldCount = 1;
168. } else if (firstReader == current) {
169. firstReaderHoldCount++;
170. } else {
171. if (rh == null)
172. rh = cachedHoldCounter;
173. if (rh == null || rh.tid != getThreadId(current))
174. rh = readHolds.get();
175. else if (rh.count == 0)
176. readHolds.set(rh);
177. rh.count++;
178. cachedHoldCounter = rh; // cache for release
179. }
180. return 1;
181. }
182. }
183. }
184.
185. /**
186. * Performs tryLock for write, enabling barging in both modes.
187. * This is identical in effect to tryAcquire except for lack
188. * of calls to writerShouldBlock.
189. */
190. final boolean tryWriteLock() {//尝试占用写锁,立马返回的
191. Thread current = Thread.currentThread();
192. int c = getState();
193. if (c != 0) {
194. int w = exclusiveCount(c);
195. if (w == 0 || current != getExclusiveOwnerThread())
196. return false;
197. if (w == MAX_COUNT)
198. throw new Error("Maximum lock count exceeded");
199. }
200. if (!compareAndSetState(c, c + 1))
201. return false;
202. setExclusiveOwnerThread(current);
203. return true;
204. }
205.
206. /**
207. * Performs tryLock for read, enabling barging in both modes.
208. * This is identical in effect to tryAcquireShared except for
209. * lack of calls to readerShouldBlock.
210. */
211. final boolean tryReadLock() {//尝试占用读锁,立马返回的,操作基本类似
212. Thread current = Thread.currentThread();
213. for (;;) {
214. int c = getState();
215. if (exclusiveCount(c) != 0 &&
216. getExclusiveOwnerThread() != current)
217. return false;
218. int r = sharedCount(c);
219. if (r == MAX_COUNT)
220. throw new Error("Maximum lock count exceeded");
221. if (compareAndSetState(c, c + SHARED_UNIT)) {
222. if (r == 0) {
223. firstReader = current;
224. firstReaderHoldCount = 1;
225. } else if (firstReader == current) {
226. firstReaderHoldCount++;
227. } else {
228. HoldCounter rh = cachedHoldCounter;
229. if (rh == null || rh.tid != getThreadId(current))
230. cachedHoldCounter = rh = readHolds.get();
231. else if (rh.count == 0)
232. readHolds.set(rh);
233. rh.count++;
234. }
235. return true;
236. }
237. }
238. }
239.
240. protected final boolean isHeldExclusively() {
241. // While we must in general read state before owner,
242. // we don't need to do so to check if current thread is owner
243. return getExclusiveOwnerThread() == Thread.currentThread();
244. }
245.
246. // Methods relayed to outer class
247.
248. final ConditionObject newCondition() {
249. return new ConditionObject();
250. }
251.
252. final Thread getOwner() {
253. // Must read state before owner to ensure memory consistency
254. return ((exclusiveCount(getState()) == 0) ?
255. null :
256. getExclusiveOwnerThread());
257. }
258.
259. final int getReadLockCount() {
260. return sharedCount(getState());
261. }
262.
263. final boolean isWriteLocked() {
264. return exclusiveCount(getState()) != 0;
265. }
266.
267. final int getWriteHoldCount() {
268. return isHeldExclusively() ? exclusiveCount(getState()) : 0;
269. }
270.
271. final int getReadHoldCount() {
272. if (getReadLockCount() == 0)
273. return 0;
274.
275. Thread current = Thread.currentThread();
276. if (firstReader == current)
277. return firstReaderHoldCount;
278.
279. HoldCounter rh = cachedHoldCounter;
280. if (rh != null && rh.tid == getThreadId(current))
281. return rh.count;
282.
283. int count = readHolds.get().count;
284. if (count == 0) readHolds.remove();
285. return count;
286. }
287.
288. /**
289. * Reconstitutes the instance from a stream (that is, deserializes it).
290. */
291. private void readObject(java.io.ObjectInputStream s)
292. throws java.io.IOException, ClassNotFoundException {
293. s.defaultReadObject();
294. readHolds = new ThreadLocalHoldCounter();
295. setState(0); // reset to unlocked state
296. }
297.
298. final int getCount() { return getState(); }
299. }
上面的核心在于共享和可重入值的计算,一个总共线程占用的次数state值,一个是当前线程占用的次数使用本地变量计算器统计。在看这里的源码要对这两个值理解清楚,上面的代码看起来就不是那么复杂。先看下公平和非公平的构造
1. static final class NonfairSync extends Sync {
2. private static final long serialVersionUID = -8159625535654395037L;
3. final boolean writerShouldBlock() {
4. return false; // 一直不阻塞
5. }
6. final boolean readerShouldBlock() {
7. /* As a heuristic to avoid indefinite writer starvation,
8. * block if the thread that momentarily appears to be head
9. * of queue, if one exists, is a waiting writer. This is
10. * only a probabilistic effect since a new reader will not
11. * block if there is a waiting writer behind other enabled
12. * readers that have not yet drained from the queue.
13. */
14. return apparentlyFirstQueuedIsExclusive();//判断队列的第一是否是独占的,如果是独占,那么读锁就应该阻塞,写锁也要阻塞,如果是共享的话,读锁可以共享,写锁阻塞
15. }
16. }
17. static final class FairSync extends Sync {
18. private static final long serialVersionUID = -2274990926593161451L;
19. final boolean writerShouldBlock() {
20. return hasQueuedPredecessors();//是否是第一个
21. }
22. final boolean readerShouldBlock() {
23. return hasQueuedPredecessors();//是否是第一个
24. }
25. }
下面的读写锁的代码就相对简单了,基本是调用上面的同步器执行相应的操作,在这里就不介绍了,有兴趣的读者可以查看jdk的源码。当然读写锁还有部分问题存在,读的过程不允许写,如果希望增加效率的话,允许读的过程,也可以写,可以看下下一文中介绍的StampedLock。
以上是关于jdk源码解析--ReentryReadWriteLock类(下)的主要内容,如果未能解决你的问题,请参考以下文章