java同步器
Posted pamne
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了java同步器相关的知识,希望对你有一定的参考价值。
摘要:java提供了synchronized关键字对临界区进行线程同步访问。由于synchronized 很难正确的编写同步代码,并发工具类提供了高级的同步器(控制通用同步方法的类)
本文主要介绍倒计时门闩(CountDownLatch)、同步屏障(cyclic barrier)、交换器(exchanger)、信号量(semaphore)以及phaser 同步器。
1.倒计时门闩(CountDownLatch)
CountDownLatch核心思想:倒计时门栓会导致一条或者多条线程一直在门口等待,知道另一条线程打开这扇们,线程才得以继续运行。它是由一个计数变量和两个操作组成的,这两个操作分别是“导致一条线程一直等待直到计数变为0”以及“递减计数变量”
CountDownLatch核心方法:
void await():除非线程中断,否则强制调用线程一直等到计数倒数为0
boolean await(long timeout,TimeUnit unit):除非线程中断,否则一直强制调用线程一直等到计数倒数为0作者以unit作为timeout超时。
void CountDown():递减计数,当计数降至0时,释放所有的等待线程,当已经为0 时,什么也不会发生。
long getCount():返回当前的计数,用于测试和调试
String toString();返回一条标识这个门闩及其状态的字符串。
package multithreading; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class CountDownLatchDemo { final static int nThreads=3; public static void main(String[] args) { // TODO Auto-generated method stub final CountDownLatch startFlag=new CountDownLatch(1); final CountDownLatch doneFlag=new CountDownLatch(nThreads); Runnable r=new Runnable() { @Override public void run() { // TODO Auto-generated method stub try { report("entered run"); startFlag.await(); report("doing work!"); Thread.sleep(2000); doneFlag.countDown(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } void report(String s) { // TODO Auto-generated method stub System.out.println(System.currentTimeMillis()+""+Thread.currentThread()+":"+s); } }; ExecutorService executor=Executors.newFixedThreadPool(nThreads); for(int i=0;i<nThreads;i++) { executor.execute(r); }; System.err.println("主线程开始执行"); try { Thread.sleep(1000); startFlag.countDown(); System.err.println("主线程do something else!"); doneFlag.await(); executor.shutdown(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } //每次执行完任务执行1秒 } }
运行结果:
注:startFlag门闩会在默认主线程结束之前禁止任何线程执行,endFlag线程会在默认3条主线程全部执行完。
2.同步屏障(CyclicBarrier)
核心思想:同步屏障允许一组线程彼此相互等待,知道抵达某个公共的屏障点,因为该屏障在等待线程被释放之后可以重用,所以称它为可循环使用的屏障。
使用场景:数量固定并且相互之间必须不时等待彼此的多线程应用。
核心方法:int await():强调线程一直等待直到所有的parties都已经在同步屏障上调用await()方法。当调用线程自己或其他等待线程被中断、有线程在等待中超时或者有线程在同步屏障之上调用reset()方法,该调用线程就会停止等待。
如果调用线程在进入方法时设置过中断装袋或者等待时被中断,该方法就会抛出Interrupted Exception并且中断状态会被清除。
线程正在等待时该同步屏障被重置了(通过reset()方法),该方法就会抛出java.until.conrrent.BrokenBarrierException
调用线程是最后一条线程,并企稳构造函数提供了一个非空的barrierAction,这条线程就会允许其他线程继续执行之前率先执行的runnable。该方法就会返回调用线程到达索引,getParties()-1代表第一条到达的线程,0表示最后一条到达的线 程。
int wait(long timeout,TimeUnit unit):除了让你指定调用线程愿意等待的时长之外,该方法等同于之前的方法。 该线程在等待中超时,该方法会抛出java.util.concurrent.TimeoutException
int getNumberWaiting():返回当前在同步屏幕上等待的线程数目,该方法用于调试和断言
int getParties();返回需要跨越同步屏障的线程数目。
int isBroken();当一条或者多条线程由于在同步屏障创建或长刺重置之后,中断或者超时打破同步屏障,或者有因为一个异常导致barrier action 失败,返回true,否则返回false
void reset():把同步屏障重置到器原始状态。如果此时任意的线程等待在这个同步屏障上,就会抛出一个BrokenBarrier.注意,在由于某些原因发生 的跳出操作之后进行重置是非常难以实现的。线程需要通过一一些其他方式同步并挑选一条线程进 行同步操作。最好给后续的使用创建一个新的同步屏障。
package multithreading; import java.util.concurrent.CyclicBarrier; public class CyclicBarrierDemo { /** * 使用同步屏障把一个任务分解成多个子任务 * @param args */ public static void main(String[] args) { // TODO Auto-generated method stub float [][] matrix=new float[3][3]; int counter=0; for(int row=0;row<matrix.length;row++) { for(int col=0;col<matrix.length;col++) { matrix[row][col]=counter++; } } dump(matrix); System.out.println(); //为每一行分别建立一个线程进行计算 Solver solver=new Solver(matrix); System.out.println(solver.flag); dump(matrix); } //打印矩阵 static void dump(float[][] matrix) { // TODO Auto-generated method stub for(int row=0;row<matrix.length;row++) { for(int col=0;col<matrix.length;col++) { System.out.print(matrix[row][col]+" "); } System.out.println(); } } } class Solver { float[][] data; CyclicBarrier barrier ; boolean flag=false; public Solver(float[][] matrix) { // TODO Auto-generated constructor stub data = matrix; //创建3条同步屏障,在屏障跨越的时候启动 barrier = new CyclicBarrier(matrix.length, new Runnable() { @Override public void run() { new CyclicBarrierDemo(); // TODO Auto-generated method stub CyclicBarrierDemo.dump(matrix); } }); System.err.println("创建的线程条数"+matrix.length); for(int i=0;i<matrix.length;i++) { //启动线程 new Thread(new Worker(barrier,matrix,i)).start(); System.out.println("主线程正在等待..."); System.out.println("主线程被唤醒..."); } } }
package multithreading; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; public class Worker implements Runnable{ int myrow; boolean done=false; private float[][] data; CyclicBarrier barrier; public Worker(float[][] matrix, int i) { // TODO Auto-generated constructor stub } public Worker(CyclicBarrier barrier2, float[][] matrix, int i) { // TODO Auto-generated constructor stub myrow=i; data = matrix; barrier=barrier2; } @Override public void run() { // TODO Auto-generated method stub while(!done) { processRow(); //开始计算 try { System.out.println("++++++++++屏障等待线程条数"+barrier.getNumberWaiting()); barrier.await(); } catch (InterruptedException e) { // TODO Auto-generated catch block return; } catch (BrokenBarrierException e) { // TODO Auto-generated catch block return; } } } //开始计算 void processRow() { System.out.println("开始计算,行数为:"+myrow); for(int i=0;i<data.length;i++) { data[myrow][i]*=10; done=true; } System.err.println("第"+myrow+"行计算完毕"); } }
注:创建n条线程的同步屏障在跨越时启动
运行结果:
3.交换器
核心思想:交换器提供了一个线程彼此之间能够交换对象的同步点。每条线程都会往这个交换器的exchange()方法中传入一些对象,匹配伙伴线程,同时接受伙伴线程中的对象作为返回值。在诸如遗传算法和管道设计的应用程序中,交换器会很有用。
核心方法:
V exchange(V,x) :在这个交互点等待其他线程到达(除非调用线程中断),之后将所有的对象方法传入其中,接收其他线程的对象 作为返回。如果其他交换线程已经在交换点等待,为了线程调度,它会从中恢复并且会接收调用线程锁传入的对 象,当前线程会立即返回,接收其他线程传入交换器中的对象。当调用线程被中断了该方法会抛出InterruptedException
V exchange(V,x,long timeout,TimeUnit unit):除了让指定的调用线程愿意等待的时长之外,该方法等同于之前的方法。当线程在等待时,该方法会抛出TimeoutExeception
package Exchanger; import java.util.concurrent.Exchanger; public class ExechangeDemo { final static Exchanger<DataBuffer> exchanger=new Exchanger<DataBuffer>(); final static DataBuffer initialEmptyBuffer=new DataBuffer(); final static DataBuffer initialFullBuffer=new DataBuffer("I"); public static void main(String[] args) { // TODO Auto-generated method stub new Thread(new EmptyLoop(exchanger,initialFullBuffer)).start(); new Thread(new FillingLoop(exchanger,initialEmptyBuffer)).start(); } }
package Exchanger; import java.util.ArrayList; import java.util.List; public class DataBuffer { private final static int maxitem=10; private final List<String> items=new ArrayList<>(); public DataBuffer() { // TODO Auto-generated constructor stub } public DataBuffer(String prefix) { // TODO Auto-generated constructor stub for(int i=0;i<maxitem;i++) { String item=prefix+i; System.out.println("Adding"+item); items.add(item); } } //添加元素 synchronized void add(String s) { if(!isFull()) { items.add(s); } } //缓冲区元素是否 synchronized boolean isFull() { // TODO Auto-generated method stub return items.size()==maxitem; } synchronized String remove() { if(!isEmpty()) { return items.remove(0); } return null; } synchronized boolean isEmpty() { // TODO Auto-generated method stub return items.size()==0; } }
package Exchanger; import java.util.concurrent.Exchanger; public class FillingLoop implements Runnable{ int count=0; DataBuffer currentBuffer; final Exchanger<DataBuffer> exchanger; public FillingLoop(Exchanger<DataBuffer> exchanger2, DataBuffer initialemptybuffer) { // TODO Auto-generated constructor stub currentBuffer=initialemptybuffer; exchanger=exchanger2; } @Override public void run() { // TODO Auto-generated method stub try { while(true) { addTobuffer(currentBuffer); System.err.println(currentBuffer.isFull()); if(currentBuffer.isFull()) { System.out.println("filling thread wants to exchange..."); currentBuffer=exchanger.exchange(currentBuffer); System.err.println("filling thread recives exchange"); } } }catch (InterruptedException e) { // TODO Auto-generated catch block System.err.println("filling thread interrupt.."); } } private void addTobuffer(DataBuffer buffer) { // TODO Auto-generated method stub String item="缓冲区填充->"+count++; System.out.println("Adding:"+item); buffer.add(item); } }
package Exchanger; import java.util.concurrent.Exchanger; public class EmptyLoop implements Runnable{ final Exchanger<DataBuffer> exchanger; DataBuffer currentBuffer; public EmptyLoop(Exchanger<DataBuffer> exchanger2, DataBuffer initialfullbuffer) { // TODO Auto-generated constructor stub currentBuffer=initialfullbuffer; exchanger=exchanger2; } @Override public void run() { // TODO Auto-generated method stub try { while(true) { takeFromBuffer(currentBuffer); if(currentBuffer.isEmpty()) { System.out.println("empty thread wants to exchange"); currentBuffer=exchanger.exchange(currentBuffer); } System.err.println("emptying thread recives exchage"); } }catch (InterruptedException e) { // TODO Auto-generated catch block System.err.println("emptying thread interrupted"); } } void takeFromBuffer(DataBuffer buffer) { // TODO Auto-generated method stub System.out.println("缓冲区取数据->:"+buffer.remove()); } }
运行结果:
4.信号量
信号量维护了一组许可证(permit),以约束访问被限制资源的线程数。当没有可用的许可证时,线程的获取尝试 一直阻塞,知道其他线的线程释放一个许可证。
类java.util.concurrent.semaphore实现了这一同步器,同时将信号量概念化成一个维护许可证的对象。你可以调用semaphore(int permits)构造函数来初始化一个信号量,其中permits指定了许可证的数量。这个信号量的公平策略被设置成false(不公平)。
也可以调成公平策略。
注:当前的值可以被递增1的信号量称为计数信号量。而当前的值只能取0或1 的信号量则称为二进制信号量或者互斥信号量。在这两种场景中当前的值都不能为负数
5.信号量和公平策略
不公平策略为false:acquire()方法能先于等待线程获取许可证。
公平策略:信号量能顾保证调用acquire()方法的任意线程能按照方法被调用处理的顺序获取许可证(先进先出,fifo)
因为FIFO顺序需要应用到这些方法中指定的内部执行点,很可能一条线程先于另一条线程调用acquire()方法,但是却后于那条线程抵达顺序点,从方法中返回也类似。
不限时的tryAcquire()方法不会遵循公平策略的设定。
信号量核心方法:
void acquire():从这个信号量中获取一个许可证,否则用阻塞直到有一个许可证可用或者调用线程别中断。
void acquire(int permits):从这个信号量中获取permits数量的许可证,否则阻塞直到这些许可证可用或者调用线程中断,抛出异常,当permits小于0,则抛出illegalArgumentException
void acquireUninterruptibly():获取一个许可证,否则阻塞到有一个许可证可以用
int availablePermits();返回当前可用许可证的数目,该方法用于调试及测试
int drainPermits():获取并返回立即可用的许可证。
int getQueueLength();返回等待获取许可证的大致线程数。返回的值为动态的估算值。
boolean hasQueuedThreads():查询是否存在等待获取许可证的线程。由于取消可能发生在任意时刻,返回true并不能保证其他线程就能获取许可证
boolean isFail():返回公平设置
void release();释放一个许可证,将其返回给信号量,可用许可证数量+1.如果任何线程尝试获取一个许可证,被选用的线程就会给予刚刚释放的许可证,那条线程就会因为调度被重新启用。
void release(int permits):释放permits数量的许可证。若没有足够的许可证可用,该线程会一直等待,知道足够的许可证可以用。
String toString():返回比嗾使该信号量及其状态的字符串。
boolean tryAcquire();仅当调用时有一个许可证可用的情况,才能从这个信号量中获取这个许可证。
boolean tryAcquire(int permits);仅当调用时有permits个许可证可用的情况,才从这个信号量中获取这些许可证
boolean tryAcquire(long timeout TimeUnit unit);调用线程会一直等待直到有一个许可证可以用。
示例代码:
package semaphore; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class SemaphoreDemo { public static void main(String[] args) { // TODO Auto-generated method stub final Pool pool=new Pool(); Runnable r=new Runnable() { @Override public void run() { // TODO Auto-generated method stub String name=Thread.currentThread().getName(); try { while(true) { String item; System.err.println(name+" acquireing "+(item=pool.getItem()));//从信号量中获取一个许可证
Thread.sleep(200+(int)(Math.random()*100)); System.out.println(name +" putting back "+item); pool.putItem(item); } }catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }; ExecutorService[] executors =new ExecutorService[Pool.MAX_AVAILABLE+1]; for(int i=0;i<executors.length; i++) { executors[i]=Executors.newSingleThreadExecutor(); executors[i].execute(r); } } }
package semaphore; import java.util.concurrent.Semaphore; public class Pool { public final static int MAX_AVAILABLE=10; private final Semaphore avaliable=new Semaphore(MAX_AVAILABLE, true);//设置为公平策略,保证有任何线程在访问资源时被饿死 private final String[] items; private final boolean[] used=new boolean[MAX_AVAILABLE]; Pool() { items=new String[MAX_AVAILABLE]; for(int i=0;i<items.length;i++) { items[i]="I "+i; } } String getItem() throws InterruptedException { avaliable.acquire(); return getNextAvailableItem(); } void putItem(String item) { if(markAsUnused(item)) { avaliable.release(); } } private synchronized boolean markAsUnused(String item) { // TODO Auto-generated method stub for(int i=0;i<MAX_AVAILABLE;++i) { if(item==items[i]) { if(used[i]) { used[i]=false; return true; } else { return false; } } } return false; } private synchronized String getNextAvailableItem() { // TODO Auto-generated method stub for(int i=0;i<MAX_AVAILABLE;++i) { if(!used[i]) { used[i]=true; return items[i]; } } return null; } }
运行结果
5.Phaser
phaser是一个更加弹性的同步屏障。和同步屏障一样,一个phaser使得一组线程在屏障上等待,在最后一条线程到达之后,这些线程得以继续执行。phaser也提供barrier action的等价操作。和同步屏障协调固定数目的线程不同,一个phaser 能够协调不同数目的线程,这些线程可以在任何时候注册。为了实现这一功能,phaser使用了phase和phase值。
phaser是phaser当前的状态,同时这一状态被一个整型的phase值所确定,当最后一条注册线程到达phaser屏障,phaser提前抵达phase并且给其加1
核心方法:phaser(int phaser) 构造函数创建了一个phaser,一开始就协调parties数目的线程(还么有抵达phaser屏障),同时其phaser初始为0
int register():往phaser中添加一条尚未抵达的线程,同时返回phase值作抵达分类用。这个值被称为phase值
代码示例:https://blog.csdn.net/sprayabc/article/details/11737993
以上是关于java同步器的主要内容,如果未能解决你的问题,请参考以下文章
[工作积累] UE4 并行渲染的同步 - Sync between FParallelCommandListSet & FRHICommandListImmediate calls(代码片段