新类库的构件
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了新类库的构件相关的知识,希望对你有一定的参考价值。
CountDownLatch
public class CountDownLatch extends Object
A synchronization aid that allows one or more threads to wait until a set of operations being performed in other threads completes.
A
CountDownLatch
is initialized with a given count. Theawait
methods block until the current count reaches zero due to invocations of thecountDown()
method, after which all waiting threads are released and any subsequent invocations ofawait
return immediately. This is a one-shot phenomenon -- the count cannot be reset. If you need a version that resets the count, consider using aCyclicBarrier
.A
CountDownLatch
is a versatile synchronization tool and can be used for a number of purposes. ACountDownLatch
initialized with a count of one serves as a simple on/off latch, or gate: all threads invokingawait
wait at the gate until it is opened by a thread invokingcountDown()
. ACountDownLatch
initialized to N can be used to make one thread wait until N threads have completed some action, or some action has been completed N times.A useful property of a
CountDownLatch
is that it doesn‘t require that threads callingcountDown
wait for the count to reach zero before proceeding, it simply prevents any thread from proceeding past anawait
until all threads could pass.
用来同步一个或多个任务,强制它们等待其他任务执行的一组操作完成。可以这么理解,有两组任务A,B。A的多个任务等待B组的所有任务结束才能执行。
CountDownLatch对象设置一个初始的值。A中的任务执行前先调用CountDownLatch.await()方法将当前任务阻塞,当CountDownLatch的值为0时才能进行下去。B中的每个任务执行完都调用CountDownLatch.countDown()来减小计数值。这样就可以保证B中的任务可以同时进行,当B的任务全部结束,A的任务才可以开始。
CountDownLatch对象的计数值不能被再次重置,只能使用一次。想要重置,使用CyclicBarrier
。
public class CountDownLatchDemo { static final int SIZE = 10; public static void main(String[] args) { ExecutorService exec = Executors.newCachedThreadPool(); CountDownLatch latch = new CountDownLatch(SIZE); for(int i=0;i<10;i++){ exec.execute(new A(latch)); } for(int i=0;i<10;i++){ exec.execute(new B(latch)); } System.out.println("Launched all tasks"); exec.shutdown(); } } class B implements Runnable{ private static int counter =0; private final int id = counter++; private static Random random = new Random(47); private final CountDownLatch latch; public B(CountDownLatch latch){ this.latch=latch; } public void run(){ try{ doWork(); latch.countDown();//B中完成一次任务,计数值减1 }catch(InterruptedException e){ e.printStackTrace(); } } private void doWork() throws InterruptedException { TimeUnit.MILLISECONDS.sleep(random.nextInt(10)); System.out.println(this+" completed"); } public String toString(){ return String.format("B %1$-3d", id); } } class A implements Runnable{ private static int counter =0; private final int id = counter++; private static Random random = new Random(57); private final CountDownLatch latch; public A(CountDownLatch latch){ this.latch=latch; } @Override public void run() { try{ latch.await();//等待计数值为0,在这之前都处于阻塞状态 TimeUnit.MILLISECONDS.sleep(random.nextInt(1)); System.out.println("Latch barrier passedd for "+this); }catch(InterruptedException e){ e.printStackTrace(); } } public String toString(){ return String.format("A %1$-3d", id); } } 输出: Launched all tasks B 3 completed B 2 completed B 6 completed B 5 completed B 0 completed B 7 completed B 1 completed B 4 completed B 9 completed B 8 completed Latch barrier passedd for A 0
Latch barrier passedd for A 3
Latch barrier passedd for A 2
Latch barrier passedd for A 1
Latch barrier passedd for A 4
Latch barrier passedd for A 5
Latch barrier passedd for A 6
Latch barrier passedd for A 7
Latch barrier passedd for A 8
Latch barrier passedd for A 9
CyclicBarrier
试想有一个任务,把它分割成多个子任务交给不同的线程去做,等它们都完成后,再执行下一个步骤。这时可以用CyclicBarrier对象。
初始化CyclicBarrier对象时需要指定任务的数目n,当有n个线程对同一个CyclicBarrier对象的await方法调用并进入阻塞的话,才算达到栅栏点。
以下程序演示对矩阵的每个元素求平方,我们用5个线程分别处理5行数据。等到5个线程都完成工作后,将处理好的矩阵打印输出。
public class CyclicBarrierDemo { public static void main(String[] args) { int[][] matrix={{1,1,1,1,1},{2,2,2,2,2},{3,3,3,3,3},{4,4,4,4,4},{5,5,5,5,5}}; List<int[]> list = new ArrayList<int[]>();//把矩阵每一行放在list里 for(int i=0;i<matrix.length;i++){list.add(matrix[i]);} ExecutorService exec = Executors.newCachedThreadPool(); CyclicBarrier barrier = new CyclicBarrier(5,new Runnable(){ public void run() { System.out.println("Solver are all completed"); //打印处理后的矩阵 for(int i=0;i<matrix.length;i++){ System.out.println(Arrays.toString(list.get(i))); } } }); for(int i=0;i<5;i++){ exec.execute(new Solver(barrier,list.get(i))); } } } class Solver implements Runnable{ private int[] row; private static int count=0; public Random random = new Random(47); private final int id = count++; private CyclicBarrier barrier; public Solver(CyclicBarrier barrier,int[] row){ this.barrier=barrier; this.row=row; } public void run(){ try { //任务开始 int length = row.length; for(int i=0;i<length;i++){ row[i]=row[i]*row[i]; } TimeUnit.MILLISECONDS.sleep(random.nextInt(1000)); System.out.println("Solver "+id+" completed"); //任务结束,到达栅栏点 barrier.await(); } catch (InterruptedException e) { e.printStackTrace(); }catch (BrokenBarrierException e) { e.printStackTrace(); } } } 输出: Solver 1 completed Solver 2 completed Solver 0 completed Solver 3 completed Solver 4 completed Solver are all completed [1, 1, 1, 1, 1] [4, 4, 4, 4, 4] [9, 9, 9, 9, 9] [16, 16, 16, 16, 16] [25, 25, 25, 25, 25]
DelayQueue
这是一个无界的BlockingQueue,里面放置Delayed对象,其中的对象只能在其到期后才能被取走。如果队列中无到期的对象,则等待。该队列是有序队列,队首是最先过期的那个对象。因此该队列内部的对象需要比较各自的过期时间,故对象必须实现Delayed接口,即实现两个方法compareTo()和getDelay()。一个是用于比较对象之间的时间先后,一个用于获取对象的过期时间。
public class DelayQueueDemo { public static void main(String[] args) { DelayQueue<DelayedTask> queue = new DelayQueue<DelayedTask>(); ExecutorService exec = Executors.newCachedThreadPool(); Random random = new Random(48); for(int i=0;i<20;i++) queue.put(new DelayedTask(random.nextInt(1000)));//把所有具有延迟到期功能的对象放在DelayQueue对列里 exec.execute(new DelayedTaskConsumer(queue)); queue.put(new DelayedTask(500)); } } //具有延迟到期功能的任务 class DelayedTask implements Runnable,Delayed{ private static int count=0; private final int id = count++; private final int delta; private final long trigger; public DelayedTask(int delayMilliseconds){ delta = delayMilliseconds; trigger = System.nanoTime()+TimeUnit.NANOSECONDS.convert(delta, TimeUnit.MILLISECONDS); } @Override public int compareTo(Delayed o) { DelayedTask that = (DelayedTask) o; if(trigger<that.trigger) return -1; if(trigger>that.trigger) return 1; return 0; } @Override public long getDelay(TimeUnit unit) { //返回剩余时间 return TimeUnit.NANOSECONDS.convert(System.nanoTime()-trigger,TimeUnit.NANOSECONDS); } @Override public void run() { System.out.println("DelayedTask delayTime ["+delta+"] "+"is running"); } } class DelayedTaskConsumer implements Runnable{ private DelayQueue<DelayedTask> queue ; public DelayedTaskConsumer(DelayQueue<DelayedTask> queue){ this.queue = queue; } @Override public void run() { try{ while(!Thread.interrupted()){queue.take().run();}//取出最先过期的对象,并操作该对象,这里执行了对象的run方法 }catch(InterruptedException e){e.printStackTrace();} } } 输出: DelayedTask delayTime [100] is running DelayedTask delayTime [140] is running DelayedTask delayTime [183] is running DelayedTask delayTime [244] is running DelayedTask delayTime [316] is running DelayedTask delayTime [368] is running DelayedTask delayTime [522] is running DelayedTask delayTime [562] is running DelayedTask delayTime [569] is running DelayedTask delayTime [703] is running DelayedTask delayTime [794] is running DelayedTask delayTime [804] is running DelayedTask delayTime [831] is running DelayedTask delayTime [877] is running DelayedTask delayTime [911] is running DelayedTask delayTime [926] is running DelayedTask delayTime [972] is running DelayedTask delayTime [982] is running DelayedTask delayTime [984] is running DelayedTask delayTime [987] is running
PriorityBlockingQueue
优先级队列,它具有可阻塞的读取操作。该队列总是取出优先级最高的对象。优先级的比较由队列内对象的compareTo方法比较,故对象应实现Comparable接口。
public class PriorityBlockingQueueDemo { public static void main(String[] args) { PriorityBlockingQueue<Runnable> queue = new PriorityBlockingQueue<Runnable>(); ExecutorService exec = Executors.newCachedThreadPool(); Random random = new Random(48); for(int i=0;i<20;i++) queue.put(new PriorityTask(random.nextInt(1000))); exec.execute(new PriorityTaskConsumer(queue)); } } class PriorityTask implements Runnable,Comparable{ private static int count=0; private final int id = count++; private final int priority; public PriorityTask(int priority){ this.priority=priority; } @Override public int compareTo(Object o) { PriorityTask that = (PriorityTask) o; if(this.priority<that.priority) return -1; if(this.priority>that.priority) return 1; return 0; } @Override public void run() { System.out.println("PriorityTask priority ["+priority+"] is runnig"); } } class PriorityTaskConsumer implements Runnable{ private PriorityBlockingQueue<Runnable> queue; Random random = new Random(28); public PriorityTaskConsumer(PriorityBlockingQueue<Runnable> queue){ this.queue = queue; } @Override public void run() { try{ while(!Thread.interrupted()){ queue.take().run(); TimeUnit.MILLISECONDS.sleep(random.nextInt(1000)); } }catch(InterruptedException e){e.printStackTrace();} } } 输出: PriorityTask priority [100] is runnig PriorityTask priority [140] is runnig PriorityTask priority [183] is runnig PriorityTask priority [244] is runnig PriorityTask priority [316] is runnig PriorityTask priority [368] is runnig PriorityTask priority [522] is runnig PriorityTask priority [562] is runnig PriorityTask priority [569] is runnig PriorityTask priority [703] is runnig PriorityTask priority [794] is runnig PriorityTask priority [804] is runnig PriorityTask priority [831] is runnig PriorityTask priority [877] is runnig PriorityTask priority [911] is runnig PriorityTask priority [926] is runnig PriorityTask priority [972] is runnig PriorityTask priority [982] is runnig PriorityTask priority [984] is runnig PriorityTask priority [987] is runnig
以上是关于新类库的构件的主要内容,如果未能解决你的问题,请参考以下文章