exchanger java另一种栅栏

Posted liumy

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了exchanger java另一种栅栏相关的知识,希望对你有一定的参考价值。

是另一种栅栏,它是一种两方two-party栅栏,各方在栅栏位置上交换数据。

当两方执行不对称的操作时,exchanger会非常有用。

场景例子:

当一个线程向缓冲区写入数据,而另一个线程从缓冲区中读取数据。这些线程可以使用Exchanger来汇合,并将满的缓冲区与空的缓冲区交换。当两个线程通过Exchanger交换对象时,这种交换就把这两个对象安全地发布给另一方。

数据交换的时机取决于应用程序的相应需求。最简单的方案是当缓冲区被填满时,由填充任务进行交换,当缓冲区为空时,由清空任务进行交换。这样会把需要交换的次数降至最低,但如果新数据的到达率不可预测,那么一些数据的处理过程就将延迟。另一个方法是不仅当缓冲被填满时进行交换,并且当缓冲被填充到一定程度并保持一定时间后,也进行交换。

例子:

技术图片
 1 package com.citi.test.mutiplethread.demo5;
 2 
 3 import java.util.concurrent.Exchanger;
 4 import java.util.concurrent.ExecutorService;
 5 import java.util.concurrent.Executors;
 6 
 7 public class ExchangerTest 
 8     public static void main(String[] args) 
 9         ExecutorService executor=Executors.newCachedThreadPool();
10         final Exchanger exchanger=new Exchanger();
11         executor.execute(new Runnable() 
12             String data1="Ling";
13             @Override
14             public void run() 
15                 doExchangerWork(data1, exchanger);
16             
17         );
18         executor.execute(new Runnable() 
19             String data1="huhx";
20             @Override
21             public void run() 
22                 doExchangerWork(data1, exchanger);
23             
24         );
25         executor.shutdown();
26     
27     private static void doExchangerWork(String data1,Exchanger exchanger)
28         try 
29             System.out.println(Thread.currentThread().getName()+" 正在把数据"+data1+"交换出去");
30             Thread.sleep((long)(Math.random()*1000));
31             
32             String data2=(String)exchanger.exchange(data1);
33             System.out.println(Thread.currentThread().getName()+" 交换数据到"+data2);
34          catch (Exception e) 
35             e.printStackTrace();
36         
37     
38 
View Code

如果我们一直买东西,而不邮寄东西,那么Exchanger类其实就变成了简化版本的生产者和消费者的模型。快递员就是生产者,我们本身就是消费者,而柜子就成为了我们媒介容器,看下面的一个例子:

技术图片
 1 package com.citi.test.mutiplethread.demo5;
 2 
 3 import java.util.concurrent.Exchanger;
 4 import java.util.concurrent.TimeUnit;
 5 import java.util.concurrent.atomic.AtomicInteger;
 6 
 7 public class ExchangerTest1 
 8     private static Exchanger<DataBuffer<Integer>> exchanger=new Exchanger<>();
 9     static DataBuffer<Integer> initialEmptyBuffer=new DataBuffer<Integer>();
10     static DataBuffer<Integer> initialFullBuffer=new DataBuffer<Integer>();
11     static AtomicInteger countDown=new AtomicInteger(5);
12     static class ProducerWorker implements Runnable
13         long sleep;
14         public ProducerWorker(long sleep) 
15             this.sleep=sleep;
16         
17         @Override
18         public void run() 
19             DataBuffer<Integer> currentBuffer=initialEmptyBuffer;
20             while(currentBuffer!=null&&countDown.get()>0)
21                 try 
22                     TimeUnit.SECONDS.sleep(sleep);
23                  catch (InterruptedException e) 
24                     // TODO Auto-generated catch block
25                     e.printStackTrace();
26                 
27                 currentBuffer.put(countDown.get());//每次放入数据
28                 if(currentBuffer.isFull())
29                     try 
30                         System.out.println(Thread.currentThread().getName()+" 放入了快递"+countDown.get());
31                         currentBuffer=exchanger.exchange(currentBuffer);
32                      catch (InterruptedException e) 
33                         // TODO Auto-generated catch block
34                         e.printStackTrace();
35                     //交换后得到null
36                 
37                 countDown.getAndDecrement();
38             
39         
40     
41     
42     static class ConsumerWorker implements Runnable
43         long sleep;
44         public ConsumerWorker(long sleep) 
45             this.sleep=sleep;
46         
47         @Override
48         public void run() 
49             DataBuffer<Integer> currentBuffer=initialFullBuffer;
50             while(currentBuffer!=null&&countDown.get()>0)
51                 try 
52                     TimeUnit.SECONDS.sleep(sleep);
53                  catch (InterruptedException e) 
54                     // TODO Auto-generated catch block
55                     e.printStackTrace();
56                 
57                 //如果为空就进行交换
58                 if(currentBuffer.isEmpty())
59                     try 
60                         currentBuffer=exchanger.exchange(currentBuffer);//交换数据
61                         Integer value=currentBuffer.get();
62                         System.out.println(Thread.currentThread().getName()+" 拿走了快递"+value);
63                         System.out.println();
64                      catch (Exception e) 
65                         e.printStackTrace();
66                     
67                 
68             
69         
70     
71     public static void main(String[] args) 
72         new Thread(new ProducerWorker(1),"快递员").start();
73         new Thread(new ConsumerWorker(3),"").start();
74     
75     
76     private static class DataBuffer<T>
77         T data;
78         public boolean isFull()
79             return data!=null;
80         
81         public boolean isEmpty()
82             return data==null;
83         
84         public T get()
85             T d=data;
86             data=null;
87             return d;
88         
89         public void put(T data)
90             this.data=data;
91         
92     
93 
View Code

下面是主要的内部类,属性和方法。

技术图片
 1 /**
 2  * Nodes hold partially exchanged data.  This class
 3  * opportunistically subclasses AtomicReference to represent the
 4  * hole.  So get() returns hole, and compareAndSet CAS‘es value
 5  * into hole.  This class cannot be parameterized as "V" because
 6  * of the use of non-V CANCEL sentinels.
 7  
 8     Node 持有部分交换数据。这个类继承AtomicReference适时地代表那个洞。
 9     所以get()返回洞,并且用CAS来将值填充进洞。
10     
11  
12  */
13 private static final class Node extends AtomicReference<Object> 
14     /** The element offered by the Thread creating this node. */
15     public final Object item;
16 
17     /** The Thread waiting to be signalled; null until waiting. */
18     public volatile Thread waiter;
19 
20     /**
21      * Creates node with given item and empty hole.
22      * @param item the item
23      */
24     public Node(Object item) 
25         this.item = item;
26     
27 
28 
29 /**
30  * A Slot is an AtomicReference with heuristic padding to lessen
31  * cache effects of this heavily CAS‘ed location.  While the
32  * padding adds noticeable space, all slots are created only on
33  * demand, and there will be more than one of them only when it
34  * would improve throughput more than enough to outweigh using
35  * extra space.
36  */
37 private static final class Slot extends AtomicReference<Object> 
38     // Improve likelihood of isolation on <= 64 byte cache lines
39     long q0, q1, q2, q3, q4, q5, q6, q7, q8, q9, qa, qb, qc, qd, qe;
40 
41 
42 /**
43  * Main exchange function, handling the different policy variants.
44  * Uses Object, not "V" as argument and return value to simplify
45  * handling of sentinel values.  Callers from public methods decode
46  * and cast accordingly.
47  *
48  * @param item the (non-null) item to exchange
49  * @param timed true if the wait is timed
50  * @param nanos if timed, the maximum wait time
51  * @return the other thread‘s item, or CANCEL if interrupted or timed out
52  */
53 private Object doExchange(Object item, boolean timed, long nanos) 
54     Node me = new Node(item);                 // Create in case occupying
55     int index = hashIndex();                  // Index of current slot
56     int fails = 0;                            // Number of CAS failures
57 
58     for (;;) 
59         Object y;                             // Contents of current slot
60         Slot slot = arena[index];
61         if (slot == null)                     // Lazily initialize slots
62             createSlot(index);                // Continue loop to reread
63         else if ((y = slot.get()) != null &&  // Try to fulfill
64                  slot.compareAndSet(y, null)) 
65             Node you = (Node)y;               // Transfer item
66             if (you.compareAndSet(null, item)) 
67                 LockSupport.unpark(you.waiter);
68                 return you.item;
69                                              // Else cancelled; continue
70         
71         else if (y == null &&                 // Try to occupy
72                  slot.compareAndSet(null, me)) 
73             if (index == 0)                   // Blocking wait for slot 0
74                 return timed ?
75                     awaitNanos(me, slot, nanos) :
76                     await(me, slot);
77             Object v = spinWait(me, slot);    // Spin wait for non-0
78             if (v != CANCEL)
79                 return v;
80             me = new Node(item);              // Throw away cancelled node
81             int m = max.get();
82             if (m > (index >>>= 1))           // Decrease index
83                 max.compareAndSet(m, m - 1);  // Maybe shrink table
84         
85         else if (++fails > 1)                // Allow 2 fails on 1st slot
86             int m = max.get();
87             if (fails > 3 && m < FULL && max.compareAndSet(m, m + 1))
88                 index = m + 1;                // Grow on 3rd failed slot
89             else if (--index < 0)
90                 index = m;                    // Circularly traverse
91         
92     
93 
View Code

底层原理分析:

用到的关键技术是

1.使用CAS自旋来进行数据交换。

2.使用LockSupport的park方法使交换线程进入休眠等待,用unpartk方法使线程唤醒。

3.此外还声明了一个Node对象来存储交换数据,该类继承了AtomicReference.

使用exchanger可以轻松的实现两个线程交换数据。如果超过两个线程,很可能会有问题。

 

 

参考资料:

https://cloud.tencent.com/developer/article/1350850

以上是关于exchanger java另一种栅栏的主要内容,如果未能解决你的问题,请参考以下文章

Exchanger详解

Windows Server 2016 + Exchange 2016 +Office365混合部署

如何从 Exchange Server 2007 中提取公共日历数据?

在 C# 中阅读 MS Exchange 电子邮件

JAVA并发包源码分析循环栅栏:CyclicBarrier

小米自带电子邮箱怎么设置接收公司Exchange邮件?