jdk源码解析--Exchanger类
Posted 我的IT技术路
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了jdk源码解析--Exchanger类相关的知识,希望对你有一定的参考价值。
本文要介绍一个在多线程中不常用的类,线程间数据交换类Exchanger,它允许两个线程在运行到同一个位置进行数据交换。我们定义一个数据交换器,当一个线程先执行到交互时只能阻塞等待另一个线程也走到这里,然后进行安全的数据交换,这里有个限制,就是只允许两个线程的数据进行交换。所以在实际中并不常用这个类。下面我们先通过一个demo来看下其具体使用。
1. import java.util.Calendar;
2. import java.util.concurrent.Exchanger;
3. import java.util.stream.IntStream;
4.
5. public class ExchangerTest {
6. public static void main(String[] args) {
7. int size = 2;//创建两个线程进行交换
8. Exchanger<String> exchanger = new Exchanger<String>();//创建一个交换地
9. IntStream.range(0,size).forEach(i-> new Thread(new Hander(exchanger)).start());
10.
11. }
12. private static class Hander implements Runnable{
13. private Exchanger<String> exchanger;
14. public Hander(Exchanger<String> exchanger) {
15. this.exchanger = exchanger ;
16. }
17. @Override
18. public void run() {
19. try {
20. String exT = exchanger.exchange(Thread.currentThread().getName());
21. System.out.println(Calendar.getInstance().getTime() +" "+ Thread.currentThread().getName()+" get "+exT );//将自己的线程名和另一个线程进行交换
22. } catch (InterruptedException e) {
23. }
24. }
25. }
26. }
上面就是创建了两个线程进行数据交换,线程1和线程2的线程名进行互换,结果如下:
所以上面线程0交换得到线程1的名字,线程1交换得到线程0的名字。看完上面的例子,Exchanger的实现思路分为单槽交换和多槽交换,当线程的并发量高的时候,将会有单槽转换为多槽进行分组交换,我们看下其内部具体的核心是实现代码:
1. public class Exchanger<V> {
2. static final class Node {
3. //多槽交换的变量
4. int index; // 多槽数组的索引
5. int bound; // 上一次记录交换边界线的值
6. int collides; // cas设置失败的次数
7. //单槽交换的变量
8. int hash; // 自旋的伪随机数
9. Object item; // 当前线程的数值
10. volatile Object match; // 后到达线程的数据值
11. volatile Thread parked; // 先到并等待阻塞的线程
12. }
13. //记录当前值,threadLocal值
14. static final class Participant extends ThreadLocal<Node> {
15. public Node initialValue() { return new Node(); }
16. }
17. private volatile Node[] arena;//多槽数组
18. private volatile Node slot;//单槽的节点
19. private volatile int bound;//多槽交换的界限
20. public Exchanger() {//构造函数就是初始化了一个本地节点变量
21. participant = new Participant();
22. }
23. public V exchange(V x) throws InterruptedException {
24. Object v;
25. Object item = (x == null) ? NULL_ITEM : x; // translate null args
26. if (
27. (arena != null ||(v = slotExchange(item, false, 0L)) == null) &&//多槽为空进行单槽交换
28. ((Thread.interrupted() || (v = arenaExchange(item, false, 0L)) == null))//多槽不为空,且现场未中断,进行多槽交换
29. )
30. throw new InterruptedException();//抛出异常
31. return (v == NULL_ITEM) ? null : (V)v;//返回新值
32. }
33. //单槽交换
34. private final Object slotExchange(Object item, boolean timed, long ns){
35. Node p = participant.get();//从threadlocal中取出值
36. Thread t = Thread.currentThread();
37. if (t.isInterrupted()) // preserve interrupt status so caller can recheck
38. return null;
39.
40. for (Node q;;) {
41. if ((q = slot) != null) {//其他交换线程先到了
42. if (U.compareAndSwapObject(this, SLOT, q, null)) {//进行值交换
43. Object v = q.item;
44. q.match = item;
45. Thread w = q.parked;
46. if (w != null)
47. U.unpark(w);//交换成功,唤醒等待的线程
48. return v;
49. }
50. // 交换失败,进行创建多槽交换数组,设置bound的值为seq
51. if (NCPU > 1 && bound == 0 &&
52. U.compareAndSwapInt(this, BOUND, 0, SEQ))
53. arena = new Node[(FULL + 2) << ASHIFT];
54. }
55. else if (arena != null)//arena 不为空说明应该进行多槽交换,直接 退出进行多槽交换
56. return null; // caller must reroute to arenaExchange
57. else {
58. p.item = item;
59. if (U.compareAndSwapObject(this, SLOT, null, p))//当前线程先到,占用slot值
60. break;
61. p.item = null;
62. }
63. }
64.
65. // 等待其他线程到达,并且唤醒当前线程
66. int h = p.hash;
67. long end = timed ? System.nanoTime() + ns : 0L;
68. int spins = (NCPU > 1) ? SPINS : 1;
69. Object v;
70. while ((v = p.match) == null) {//等待的线程没到,进行随机自旋
71. if (spins > 0) {
72. h ^= h << 1; h ^= h >>> 3; h ^= h << 10;
73. if (h == 0)
74. h = SPINS | (int)t.getId();
75. else if (h < 0 && (--spins & ((SPINS >>> 1) - 1)) == 0)
76. Thread.yield();
77. }
78. else if (slot != p)
79. spins = SPINS;
80. else if (!t.isInterrupted() && arena == null &&
81. (!timed || (ns = end - System.nanoTime()) > 0L)) {
82. U.putObject(t, BLOCKER, this);//自旋时间过久,直接阻塞当前线程
83. p.parked = t;
84. if (slot == p)
85. U.park(false, ns);
86. p.parked = null;
87. U.putObject(t, BLOCKER, null);
88. }
89. else if (U.compareAndSwapObject(this, SLOT, p, null)) {//超时或者取消了,把slot让出来
90. v = timed && ns <= 0L && !t.isInterrupted() ? TIMED_OUT : null;
91. break;
92. }
93. }
94. U.putOrderedObject(p, MATCH, null);
95. p.item = null;
96. p.hash = h;
97. return v;
98. }
99. //多槽自旋
100. private final Object arenaExchange(Object item, boolean timed, long ns) {
101. Node[] a = arena;
102. Node p = participant.get();//当前线程的数据值
103. for (int i = p.index;;) { // 数组值的索引
104. int b, m, c; long j; // j is raw array offset
106. Node q = (Node)U.getObjectVolatile(a, j = (i << ASHIFT) + ABASE);
107. if (q != null && U.compareAndSwapObject(a, j, q, null)) {//q不是空的,说明已有线程占用
108. Object v = q.item; // 和单槽一样交换并唤醒
109. q.match = item;
110. Thread w = q.parked;
111. if (w != null)
112. U.unpark(w);
113. return v;
114. }
115. else if (i <= (m = (b = bound) & MMASK) && q == null) {//有槽位置,但是为空,和上面的一样,先自旋等待,然后阻塞
116. p.item = item; // offer
117. if (U.compareAndSwapObject(a, j, null, p)) {//占槽
118. long end = (timed && m == 0) ? System.nanoTime() + ns : 0L;
119. Thread t = Thread.currentThread(); // wait
120. for (int h = p.hash, spins = SPINS;;) {
121. Object v = p.match;
122. if (v != null) {//有配对的到达,交换之后返回
123. U.putOrderedObject(p, MATCH, null);
124. p.item = null; // clear for next use
125. p.hash = h;
126. return v;
127. }
128. else if (spins > 0) {//进行自旋次数的操作
129. h ^= h << 1; h ^= h >>> 3; h ^= h << 10; // xorshift
130. if (h == 0) // initialize hash
131. h = SPINS | (int)t.getId();
132. else if (h < 0 && // approx 50% true
133. (--spins & ((SPINS >>> 1) - 1)) == 0)
134. Thread.yield(); // two yields per wait
135. }
136. else if (U.getObjectVolatile(a, j) != p)
137. spins = SPINS; // releaser hasn't set match yet
138. else if (!t.isInterrupted() && m == 0 &&
139. (!timed ||
140. (ns = end - System.nanoTime()) > 0L)) {//阻塞当前
141. U.putObject(t, BLOCKER, this); // emulate LockSupport
142. p.parked = t; // minimize window
143. if (U.getObjectVolatile(a, j) == p)
144. U.park(false, ns);
145. p.parked = null;
146. U.putObject(t, BLOCKER, null);
147. }
148. else if (U.getObjectVolatile(a, j) == p &&
149. U.compareAndSwapObject(a, j, p, null)) {
150. if (m != 0) // try to shrink
151. U.compareAndSwapInt(this, BOUND, b, b + SEQ - 1);//尝试缩减卡长度
152. p.item = null;
153. p.hash = h;
154. i = p.index >>>= 1; // descend
155. if (Thread.interrupted())//被中断
156. return null;
157. if (timed && m == 0 && ns <= 0L)//超时返回
158. return TIMED_OUT;
159. break; // expired; restart
160. }
161. }
162. }
163. else
164. p.item = null; // clear offer
165. }
166. else {//无效的卡槽位置,下面的需要扩容
167. if (p.bound != b) { // stale; reset
168. p.bound = b;
169. p.collides = 0;
170. i = (i != m || m == 0) ? m : m - 1;
171. }
172. else if ((c = p.collides) < m || m == FULL ||
173. !U.compareAndSwapInt(this, BOUND, b, b + SEQ + 1)) {//进行扩容操作
174. p.collides = c + 1;
175. i = (i == 0) ? m : i - 1; // cyclically traverse
176. }
177. else
178. i = m + 1; // grow
179. p.index = i;
180. }
181. }
182. }
183.
184. }
核心的方法在于单槽交换和多槽交换,实现的交换流程主要有以下几个步骤:
1. 先到的进行自旋等待
2. 如果等待不到开始阻塞当前线程
3. 如果超时或者中断直接返回
4. 后到的直接进行匹配交换
在实际中,我个人不怎么使用exchanger类,不过里面提供的思想在很多java 类中都有体现,这个是值得学习的,ok,本文的内容就到这里了,该类其余的内容读者可自行查看。
以上是关于jdk源码解析--Exchanger类的主要内容,如果未能解决你的问题,请参考以下文章