jdk源码解析--Exchanger类

Posted 我的IT技术路

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了jdk源码解析--Exchanger类相关的知识,希望对你有一定的参考价值。

本文要介绍一个在多线程中不常用的类,线程间数据交换类Exchanger它允许两个线程在运行到同一个位置进行数据交换。我们定义一个数据交换器,当一个线程先执行到交互时只能阻塞等待另一个线程也走到这里,然后进行安全的数据交换,这里有个限制,就是只允许两个线程的数据进行交换。所以在实际中并不常用这个类。下面我们先通过一个demo来看下其具体使用。

1. import java.util.Calendar;  

2. import java.util.concurrent.Exchanger;  

3. import java.util.stream.IntStream;  

4.   

5. public class ExchangerTest {  

6.     public static void main(String[] args)  {  

7.         int size = 2;//创建两个线程进行交换  

8.         Exchanger<String> exchanger = new Exchanger<String>();//创建一个交换地  

9.         IntStream.range(0,size).forEach(i-> new Thread(new Hander(exchanger)).start());  

10.   

11.     }  

12.     private static class Hander implements Runnable{  

13.         private Exchanger<String>  exchanger;  

14.         public Hander(Exchanger<String>  exchanger) {  

15.             this.exchanger = exchanger ;  

16.         }  

17.         @Override  

18.         public void run() {  

19.             try {  

20.                 String exT = exchanger.exchange(Thread.currentThread().getName());  

21.                 System.out.println(Calendar.getInstance().getTime() +" "+ Thread.currentThread().getName()+" get "+exT );//将自己的线程名和另一个线程进行交换  

22.             } catch (InterruptedException e) {  

23.             }  

24.         }  

25.     }  

26. }  

上面就是创建了两个线程进行数据交换,线程1和线程2的线程名进行互换,结果如下:

 


所以上面线程0交换得到线程1的名字,线程1交换得到线程0的名字。看完上面的例子,Exchanger的实现思路分为单槽交换和多槽交换,当线程的并发量高的时候,将会有单槽转换为多槽进行分组交换,我们看下其内部具体的核心是实现代码:

1. public class Exchanger<V> {  

2.    static final class Node {  

3.         //多槽交换的变量  

4.         int index;              // 多槽数组的索引  

5.         int bound;              // 上一次记录交换边界线的值  

6.         int collides;           // cas设置失败的次数  

7.         //单槽交换的变量  

8.         int hash;               // 自旋的伪随机数  

9.         Object item;            // 当前线程的数值  

10.         volatile Object match;  // 后到达线程的数据值  

11.         volatile Thread parked; // 先到并等待阻塞的线程  

12.     }  

13.     //记录当前值,threadLocal值  

14.     static final class Participant extends ThreadLocal<Node> {  

15.         public Node initialValue() { return new Node(); }  

16.     }  

17.     private volatile Node[] arena;//多槽数组  

18.     private volatile Node slot;//单槽的节点  

19.     private volatile int bound;//多槽交换的界限  

20.     public Exchanger() {//构造函数就是初始化了一个本地节点变量  

21.         participant = new Participant();  

22.     }  

23.     public V exchange(V x) throws InterruptedException {  

24.         Object v;  

25.         Object item = (x == null) ? NULL_ITEM : x; // translate null args  

26.         if (  

27.              (arena != null ||(v = slotExchange(item, false, 0L)) == null) &&//多槽为空进行单槽交换  

28.              ((Thread.interrupted() || (v = arenaExchange(item, false, 0L)) == null))//多槽不为空,且现场未中断,进行多槽交换  

29.             )  

30.             throw new InterruptedException();//抛出异常  

31.         return (v == NULL_ITEM) ? null : (V)v;//返回新值  

32.     }  

33.     //单槽交换  

34.     private final Object slotExchange(Object item, boolean timed, long ns){  

35.         Node p = participant.get();//从threadlocal中取出值  

36.         Thread t = Thread.currentThread();  

37.         if (t.isInterrupted()) // preserve interrupt status so caller can recheck  

38.             return null;  

39.   

40.         for (Node q;;) {  

41.             if ((q = slot) != null) {//其他交换线程先到了  

42.                 if (U.compareAndSwapObject(this, SLOT, q, null)) {//进行值交换  

43.                     Object v = q.item;  

44.                     q.match = item;  

45.                     Thread w = q.parked;  

46.                     if (w != null)  

47.                         U.unpark(w);//交换成功,唤醒等待的线程  

48.                     return v;  

49.                 }  

50.                 // 交换失败,进行创建多槽交换数组,设置bound的值为seq  

51.                 if (NCPU > 1 && bound == 0 &&  

52.                     U.compareAndSwapInt(this, BOUND, 0, SEQ))  

53.                     arena = new Node[(FULL + 2) << ASHIFT];  

54.             }  

55.             else if (arena != null)//arena 不为空说明应该进行多槽交换,直接 退出进行多槽交换  

56.                 return null// caller must reroute to arenaExchange  

57.             else {  

58.                 p.item = item;  

59.                 if (U.compareAndSwapObject(this, SLOT, null, p))//当前线程先到,占用slot值  

60.                     break;  

61.                 p.item = null;  

62.             }  

63.         }  

64.   

65.         // 等待其他线程到达,并且唤醒当前线程  

66.         int h = p.hash;  

67.         long end = timed ? System.nanoTime() + ns : 0L;  

68.         int spins = (NCPU > 1) ? SPINS : 1;  

69.         Object v;  

70.         while ((v = p.match) == null) {//等待的线程没到,进行随机自旋  

71.             if (spins > 0) {  

72.                 h ^= h << 1; h ^= h >>> 3; h ^= h << 10;  

73.                 if (h == 0)  

74.                     h = SPINS | (int)t.getId();  

75.                 else if (h < 0 && (--spins & ((SPINS >>> 1) - 1)) == 0)  

76.                     Thread.yield();  

77.             }  

78.             else if (slot != p)  

79.                 spins = SPINS;  

80.             else if (!t.isInterrupted() && arena == null &&  

81.                      (!timed || (ns = end - System.nanoTime()) > 0L)) {  

82.                 U.putObject(t, BLOCKER, this);//自旋时间过久,直接阻塞当前线程  

83.                 p.parked = t;  

84.                 if (slot == p)  

85.                     U.park(false, ns);  

86.                 p.parked = null;  

87.                 U.putObject(t, BLOCKER, null);  

88.             }  

89.             else if (U.compareAndSwapObject(this, SLOT, p, null)) {//超时或者取消了,把slot让出来  

90.                 v = timed && ns <= 0L && !t.isInterrupted() ? TIMED_OUT : null;  

91.                 break;  

92.             }  

93.         }  

94.         U.putOrderedObject(p, MATCH, null);  

95.         p.item = null;  

96.         p.hash = h;  

97.         return v;  

98.     }  

99.     //多槽自旋  

100.     private final Object arenaExchange(Object item, boolean timed, long ns) {  

101.         Node[] a = arena;  

102.         Node p = participant.get();//当前线程的数据值  

103.         for (int i = p.index;;) {                      // 数组值的索引  

104.             int b, m, c; long j;                       // j is raw array offset  

106.             Node q = (Node)U.getObjectVolatile(a, j = (i << ASHIFT) + ABASE);  

107.             if (q != null && U.compareAndSwapObject(a, j, q, null)) {//q不是空的,说明已有线程占用  

108.                 Object v = q.item;                     // 和单槽一样交换并唤醒  

109.                 q.match = item;  

110.                 Thread w = q.parked;  

111.                 if (w != null)  

112.                     U.unpark(w);  

113.                 return v;  

114.             }  

115.             else if (i <= (m = (b = bound) & MMASK) && q == null) {//有槽位置,但是为空,和上面的一样,先自旋等待,然后阻塞  

116.                 p.item = item;                         // offer  

117.                 if (U.compareAndSwapObject(a, j, null, p)) {//占槽  

118.                     long end = (timed && m == 0) ? System.nanoTime() + ns : 0L;  

119.                     Thread t = Thread.currentThread(); // wait  

120.                     for (int h = p.hash, spins = SPINS;;) {  

121.                         Object v = p.match;  

122.                         if (v != null) {//有配对的到达,交换之后返回  

123.                             U.putOrderedObject(p, MATCH, null);  

124.                             p.item = null;             // clear for next use  

125.                             p.hash = h;  

126.                             return v;  

127.                         }  

128.                         else if (spins > 0) {//进行自旋次数的操作  

129.                             h ^= h << 1; h ^= h >>> 3; h ^= h << 10// xorshift  

130.                             if (h == 0)                // initialize hash  

131.                                 h = SPINS | (int)t.getId();  

132.                             else if (h < 0 &&          // approx 50% true  

133.                                      (--spins & ((SPINS >>> 1) - 1)) == 0)  

134.                                 Thread.yield();        // two yields per wait  

135.                         }  

136.                         else if (U.getObjectVolatile(a, j) != p)  

137.                             spins = SPINS;       // releaser hasn't set match yet  

138.                         else if (!t.isInterrupted() && m == 0 &&  

139.                                  (!timed ||  

140.                                   (ns = end - System.nanoTime()) > 0L)) {//阻塞当前  

141.                             U.putObject(t, BLOCKER, this); // emulate LockSupport  

142.                             p.parked = t;              // minimize window  

143.                             if (U.getObjectVolatile(a, j) == p)  

144.                                 U.park(false, ns);  

145.                             p.parked = null;  

146.                             U.putObject(t, BLOCKER, null);  

147.                         }  

148.                         else if (U.getObjectVolatile(a, j) == p &&  

149.                                  U.compareAndSwapObject(a, j, p, null)) {  

150.                             if (m != 0)                // try to shrink  

151.                                 U.compareAndSwapInt(this, BOUND, b, b + SEQ - 1);//尝试缩减卡长度  

152.                             p.item = null;  

153.                             p.hash = h;  

154.                             i = p.index >>>= 1;        // descend  

155.                             if (Thread.interrupted())//被中断  

156.                                 return null;  

157.                             if (timed && m == 0 && ns <= 0L)//超时返回  

158.                                 return TIMED_OUT;  

159.                             break;                     // expired; restart  

160.                         }  

161.                     }  

162.                 }  

163.                 else  

164.                     p.item = null;                     // clear offer  

165.             }  

166.             else {//无效的卡槽位置,下面的需要扩容  

167.                 if (p.bound != b) {                    // stale; reset  

168.                     p.bound = b;  

169.                     p.collides = 0;  

170.                     i = (i != m || m == 0) ? m : m - 1;  

171.                 }  

172.                 else if ((c = p.collides) < m || m == FULL ||  

173.                          !U.compareAndSwapInt(this, BOUND, b, b + SEQ + 1)) {//进行扩容操作  

174.                     p.collides = c + 1;  

175.                     i = (i == 0) ? m : i - 1;          // cyclically traverse  

176.                 }  

177.                 else  

178.                     i = m + 1;                         // grow  

179.                 p.index = i;  

180.             }  

181.         }  

182.     }  

183.   

184. }  

核心的方法在于单槽交换和多槽交换,实现的交换流程主要有以下几个步骤:

1. 先到的进行自旋等待

2. 如果等待不到开始阻塞当前线程

3. 如果超时或者中断直接返回

4. 后到的直接进行匹配交换

在实际中,我个人不怎么使用exchanger类,不过里面提供的思想在很多java 类中都有体现,这个是值得学习的,ok,本文的内容就到这里了,该类其余的内容读者可自行查看。


以上是关于jdk源码解析--Exchanger类的主要内容,如果未能解决你的问题,请参考以下文章

jdk源码解析--LongAdder类

jdk源码解析--AbstractStringBuilder类

jdk源码解析--String 类

jdk源码解析--BitSet类

jdk源码解析--Writer类

jdk源码解析--WeakReference/softReference类