Java编程思想-并发
Posted vanpersie_9987
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java编程思想-并发相关的知识,希望对你有一定的参考价值。
新类库中的构件
JDK1.5引入了java.util.concurrent包,它提供了大量的新类,用于安全而高效地解决并发问题。下面将通过例子一一介绍。
CountDownLatch-倒数计数器
CountDownLatch被称为倒数计步器,它是Java内置的同步器的一种(还有信号量、CyclicBarrier等同步器,后续将作介绍)。它的功能是阻塞一个或多个线程,这些阻塞的线程需要等待其他线程中的某一个或几个条件成立,一旦成立,这些阻塞的线程将并发执行。比如,有若干运动员等待着(若干线程阻塞)发令枪响起(使解除线程阻塞的条件成立),一旦枪声响起,运动员将开始起跑(解除阻塞),这里发令枪是条件,若干运动员是等待条件成立的线程。而当所有运动员冲过终点线时,或者说最后一个运动员冲过终点线时,计时器停止计时,在这里,所有的运动员又成了条件,而计时器成了达成条件的结果——当计时器在等待着最后一名运动员冲过终点线,条件达成,计时终止。
CountDownLatch最重要的方法是countDown()和await(),前者主要是倒数一次,后者是等待倒数到0,如果没有到达0,就只有阻塞等待了。
方法说明:
public void countDown()
- 递减锁存器的计数,如果计数到达零,则释放所有等待的线程。如果当前计数大于零,则将计数减少。如果新的计数为零,出于线程调度目的,将重新启用所有的等待线程。如果当前计数等于零,则不发生任何操作。
public boolean await(long timeout,
TimeUnit unit)
throws InterruptedException- 使当前线程在锁存器倒计数至零之前一直等待,除非线程被中断或超出了指定的等待时间。如果当前计数为零,则此方法立刻返回 true 值。
如果当前计数大于零,则出于线程调度目的,将禁用当前线程,且在发生以下三种情况之一前,该线程将一直处于休眠状态:1、由于调用 countDown() 方法的次数还不够计数到达零;2、其他某个线程中断当前线程;3、已超出指定的等待时间。如果计数到达零,则该方法返回 true 值。如果当前线程:
在进入此方法时已经设置了该线程的中断状态;或者
在等待时被中断,
则抛出 InterruptedException,并且清除当前线程的已中断状态。如果超出了指定的等待时间,则返回值为 false。如果该时间小于等于零,则此方法根本不会等待。
- 使当前线程在锁存器倒计数至零之前一直等待,除非线程被中断或超出了指定的等待时间。如果当前计数为零,则此方法立刻返回 true 值。
参数:
timeout - 要等待的最长时间
unit - timeout 参数的时间单位。
返回:
如果计数到达零,则返回 true;如果在计数到达零之前超过了等待时间,则返回 false
抛出:
InterruptedException - 如果当前线程在等待时被中断
public class CountDownLatchTest
// 模拟了100米赛跑,10名选手已经准备就绪,只等裁判一声令下。当所有人都到达终点时,比赛结束。
public static void main(String[] args) throws InterruptedException
// 开始的倒数锁
final CountDownLatch begin = new CountDownLatch(1);
// 结束的倒数锁
final CountDownLatch end = new CountDownLatch(10);
// 十名选手
final ExecutorService exec = Executors.newFixedThreadPool(10);
for (int index = 0; index < 10; index++)
final int NO = index + 1;
Runnable run = new Runnable()
public void run()
try
// 如果当前计数为零,则此方法立即返回。
// 等待
begin.await();
Thread.sleep((long) (Math.random() * 10000));
System.out.println("No." + NO + " arrived");
catch (InterruptedException e)
finally
// 每个选手到达终点时,end就减一
end.countDown();
;
exec.submit(run);
System.out.println("Game Start");
// begin减一,开始游戏
begin.countDown();
// 等待end变为0,即所有选手到达终点
end.await();
System.out.println("Game Over");
exec.shutdown();
Game Start
No.9 arrived
No.6 arrived
No.8 arrived
No.7 arrived
No.10 arrived
No.1 arrived
No.5 arrived
No.4 arrived
No.2 arrived
No.3 arrived
Game Over
CyclicBarrier
CyclicBarrier和CountDownLatch一样,都是关于线程的计数器。
用法略有不同:并发执行一组任务,它们并行地执行工作然后再进行下一个步骤前等待, 直到所有的任务都完成。它使得所有的并行任务都将在栅栏处排队,因此可以一致地向前移动。它与CountDownLatch的最主要区别是前者可以多次重用,而后者只能触发一次。
public class TestCyclicBarrier
2
//并发线程数
3 private static final int THREAD_NUM = 5;
4
5 public static class WorkerThread implements Runnable
6
7 CyclicBarrier barrier;
8
9 public WorkerThread(CyclicBarrier b)
10 this.barrier = b;
11
12
13 @Override
14 public void run()
15 // TODO Auto-generated method stub
16 try
17 System.out.println("Worker's waiting");
18 //线程在这里等待,直到所有线程都到达barrier。
19 barrier.await();
20 System.out.println("ID:"+Thread.currentThread().getId()+" Working");
21 catch(Exception e)
22 e.printStackTrace();
23
24
25
26
27
28 /**
29 * @param args
30 */
31 public static void main(String[] args)
32 // TODO Auto-generated method stub
33 CyclicBarrier cb = new CyclicBarrier(THREAD_NUM, new Runnable()
34 //当所有线程到达barrier时执行
35 @Override
36 public void run()
37 // TODO Auto-generated method stub
38 System.out.println("Inside Barrier");
39
40
41 );
42
43 for(int i=0;i<THREAD_NUM;i++)
44 new Thread(new WorkerThread(cb)).start();
45
46
47
48
//输出:
51 Worker's waiting
52 Worker's waiting
53 Worker's waiting
54 Worker's waiting
55 Worker's waiting
56 Inside Barrier
57 ID:12 Working
58 ID:8 Working
59 ID:11 Working
60 ID:9 Working
61 ID:10 Working
CyclicBarrier初始化时规定一个数目,然后计算调用了CyclicBarrier.await()进入等待的线程数。当线程数达到了这个数目时,所有进入等待状态的线程被唤醒并继续。
CyclicBarrier就象它名字的意思一样,可看成是个障碍, 所有的线程必须到齐后才能一起通过这个障碍。
CyclicBarrier初始时还可带一个Runnable的参数, 此Runnable任务在CyclicBarrier的数目达到后,所有其它线程被唤醒前被执行。
DelayQueue
1、DelayQueue是一个无界的BlockingQueue,用于放置实现了Delayed接口的对象,其中的对象只能在其到期时才能从队列中取走。这种队列是有序的,即队头对象的延迟到期的时间最长。如果没有任何延迟到期,那么不会有任何元素,并且poll()将返回null。(正是因为这样,你不能将null放置到这种个队列中。)
2、Delayed
种混合风格的接口,用来标记那些应该在给定延迟时间之后执行的对象。
此接口的实现必须定义一个 compareTo 方法,该方法提供与此接口的 getDelay 方法一致的排序。
3、DelayQueue队列中保存的是实现了Delayed接口的实现类,里面必须实现getDelay()和compareTo()方法,前者用于取DelayQueue里面的元素时判断是否到了延时时间,否则不予获取,是则获取。 compareTo()方法用于进行队列内部的排序
getDelay(TimeUnit unit)
return unit.convert(time - now(),TimeUnit.NANOSECONDES);//time为设定的间隔时间
compareTo(Object object)
if(object instanceof SchuduledTask)
SchuduledTask task = (SchuduledTask) object ;
long l = this.time - task.time;
if(l > 0) return 1 ; //比当前的小则返回1,比当前的大则返回-1,否则为0
else if(l < 0 ) return -1;
else return 0;
下面是一个示例,其中的Delayed对象自身就是任务,而DelayedTaskConsumer将最“紧急”
的任务(到期时间最长的任务)从队列中取出,然后运行它。这样DelayQueue就成了优先级队列的一种变体。
class DelayedTask implements Runnable, Delayed
private static int counter = 0;
private final int id = counter++;
private final int delta;
private final long trigger;
protected static List<DelayedTask> sequence = new ArrayList<>();
public DelayedTask(int delayMilliseconds)
delta = delayMilliseconds;
trigger = System.nanoTime() + NANOSECONDS.convert(delta, MILLISECONDS);
sequence.add(this);
public long getDelay(TimeUnit unit)
return unit.convert(trigger - System.nanoTime(), NANOSECONDS);
public int compareTo(Delayed arg)
DelayedTask that = (DelayedTask)arg;
if(trigger < that.trigger)
return -1;
if(trigger > that.trigger)
return 1;
return 0;
public void run()
system.out.print(this + " ");
public String toString()
return String.format("[%1$-4d]", delta) + " Task " + id;
public String summary()
return "(" + id + ":" + delta + ")";
public static class EndSentinel extends EdlayedTask
private ExecutorService exec;
public EndSentinel(int delay, ExecutorService e)
super(delay);
exec = e;
public void run()
for(DelayTask pt : sequence)
System.out.print(pt.summary() + " ");
System.out.print();
System.out.print(this + "Calling shutdownNow()");
exec.shutdownNow();
class DelayedTaskConsumer implements Runnable
private DelayQueue<DelayedTaks> q;
public DelayedTaskConsumer(DelayQueue<DelayedTask> q)
this.q = q;
public void run()
try
while(!Thread.interrupted())
q.take().run();
catch(InterruptedException e)
System.out.print("Finished DelayedTaskConsumer");
public class DelayQueueDemo
public static void main(String[] args)
Random random = new Random(47);
ExecutorService exec = Executors.newCachedThreadPool();
DelayQueue<DelayTask> queue = new DelayQueue<>();
for(int i = 0 ; i < 20; ++i)
queue.put(new DelayTask(random.nextInt(5000)));
queue.add(new DelayedTask.EndSentinel(5000,exec));
exec.execute(new DelayedTaskConsumer(queue));
//输出
[128 ] Task 11 [200 ] Task 7 [429 ] Task 5 [520 ] Task 18 [535 ] Task 1 [961 ] Task 4 [998 ] Task 16 [1207 ] Task 9 [1693 ] Task 2 [1809 ] Task 14 [1861 ] Task 3 [2278] Task 15 [3288 ] Task 10 [3551 ] Task 12 [4258 ] Task 0 [4258 ] Task 19 [4522 ] Task 8 [4589 ] Task 13 [4861 ] Task 17 [4868 ] Task 6
(0:4258) (1:555) (2:1693) (3:1861) (4:961) (5:429) (6:4868) (7:200) (8:4522) (9:1207) (10:3288) (11:128) (12:3551) (13:4589) (14:1809) (15:2278) (16:998) (17:4861) (18:520) (19:4258) (20:5000)
[5000] Task 20 Calling shutdownNow()
Finished DelayedTaskConsumer
DelayTask包含一个被称为sequence的List<DelayedTask>
,他保存了任务创建的顺序,因此我们可以看到排序是按照实际发生的顺序执行的。
Delay接口有一个方法名为getDelay(),它可以用来告知延迟到期有多长时间,或者延迟在多长时间之前已经到期。这个方法将强制我们去使用TimeUnit类,因为这就是参数类型。这回产生一个非常方便的类,因为你可以很容易地转换单位而无需任何声明。例如,delta的值是以毫秒为单位存储的,但是Java SE5的方法System.nanoTime()产生的时间则是以纳秒为单位的。你可以转换delta的值,方法是声明它的单位以及你希望以什么单位来表示:
NANOSECONDS.convert(delta, MILLISECONDS);
在getDelay()中,希望使用的单位是作为unit参数传递进来的,你使用它将当前与处罚时间之间的差转换为调用者要求的单位,而无需知道这些单位是什么。
注意,因为DelayedTaskConsumer自身是一个任务,所以它有自己的Thread,它可以使用这个县城来运行从队列中获取的所有任务。由于任务是按照队列优先级的顺序执行的,因此在本例中不需要启动任何单独的线程来运行DelayedTask。
从输出中可以看到,任务创建的顺序没有任何影响,任务是按照所期望的延迟顺序执行的。
PriorityBlockingQueue
这是一个很基础的优先级队列,它具有可阻塞的读取操作。这面这个示例演示了PriorityBlockingQueue的用法,其中在优先级队列中的对象是按照优先级顺序从队列中出现的任务。PrioritizedTask被赋予了一个优先级数字,以此来提供这种顺序:
class PrioritizedTask implements Runnable, Comparable<PrioritizedTask>
private Random random = new Random(47);
private static int counter = 0;
private final int id = counter++;
private final int priority;
protected static List<PrioritizedTask> sequence = new ArrayList<>();
public PrioritizedTask(int priority)
this.priority = priority;
sequence.add(this);
public int compareTo(PriorityTask arg)
return priority < arg.priority ? 1 : (priority > arg.priority ? -1 : 0);
public void run()
try
TimeUnit.MILLISECOND.sleep(rand.nextInt(250));
catch(InterruptedException e)
System.out.print(this);
public String toString()
return String.format("[%1$-3d]", priority) + " Task " + id;
public String summary()
return "(" + id + ":" + priority + ")";
public static class EndSentinel extends PrioritizedTask
private ExecutorService exec;
public EndSentinel(ExecutorService e)
super(-1);
exec = e;
public void run()
int count = 0;
for(PrioritizedTask pt : sequence)
System.out.print(pt.summary());
if(++count % 5 == 0)
System.out.print();
System.out.print();
System.out.print(this + " Calling shutdownNow()");
exec.shutdownNow();
public PriorityTaskProducer implements Runnable
private Random random = new Random(47);
private Queue<Runnable> queue;
private ExecutorService exec;
public PriorityTaskProducer(Queue<Runnable> q, ExecutorService e)
this.queue = q;
this.exec = e;
public void run()
for(int i = 0; i < 20; ++i)
queue.add(new PrioritizedTask(random.nextInt(10)));
Thread.yield();
try
for(int i = 0; i < 10; ++i)
TimeUnit.MILLISECONDS.sleep(250);
queue.add(new PriorityTask(10));
for(int i = 0; i < 10; ++i)
queue.add(new PriorityTask(i));
queue.add(new PrioritizedTask.EndSentinel(exec));
catch(InterruptedException e)
System.out.print("Finished PrioritizedTaskProducer");
class PrioritizedTaskConsumer implements Runnable
private PriorityBlockingQueue<Runnable> q;
public PrioritizedTaskConsumer(PriorityBlockingQueue<Runnable> q)
this.q = q;
public void run()
try
while(!Thread.interrupted())
q.take().run();
catch(InterruptedException e)
System.out.print("Finished PrioritizedTaskConsumer");
public class PriorityBlockingQueueDemo
public static void main(String[] args)
Random random = new Random(47);
ExecutorService exec = Executors.newCachedThreadPool();
PriorityBlockingQueue<Runnable> queue = new PriorityBlockingQueue<>();
exec.execute(new PrioritizedTaskProducer(queue, exec));
exec.execute(new PrioritizedTaskConsumer(queue));
这与前一个示例相同,PrioritizedTask对象的创建序列被记录在sequence List中,用于和实际的执行顺序比较。run()方法将休眠一小段时间,然后打印对象。
在这里,不需要任何显式同步——不必考虑当你从这种队列中读取时,其中是否有元素,因为这个队列在没有元素时,将直接阻塞读取者。
Semaphore-信号量
正常的锁(如synchronized或Lock)在任何时刻都只允许一个任务访问同一项资源,而Semaphore(计数信号量)允许n个任务同时访问这个资源。你还可以将信号量看作是在向外分发使用资源的“许可证”,尽管实际上没有任何许可证对象。
下面演示了信号量的使用。并用到了一个概念——对象池,它管理这个数量有限的对象,当要使用对象时可以签出它们,而在用户使用完毕时,可以将它们签回。这种功能被封装在一个泛型类中:
public class Pool<T>
private int size;
private List<T> items = new ArrayList<T>();
private volatile boolean[] checkOut;
private Semapore available;
public Pool(Class<T> classObject, int size)
this.size = size;
this.checkOut = new boolean[size];
this.available = new Semaphore(size, true);
for(int i = 0; i < size; ++i)
try
items.add(classObject.newInstance());
catch(Exception e)
throw new RuntimeExecption(e);
public T checkOut() throws InterruptedException
available.acquire();
return getItem();
public void checkIn(T x)
if(releaseItem(x))
available.release();
private synchronized T getItem()
for(int i = 0; i < size; ++i)
if(!checkOut[i])
checkOut[i] = true;
return items.get(i);
return null;
private synchronized boolean releaseItem(T item)
int index = items.indexOf(item);
if(index == -1)
return false;
if(checkOut[index])
checkOut[index] = false;
return true;
return false;
在Pool中,构造方法使用newInstance把对象加载到池中,如果需要一个新的对象,可以调用checkOut(),并在使用完后,嫁给checkIn()。
在checkOut()中,如果没有任何信号量许可证可用——在池中没有更多的对象了,available将阻塞调用过程。在checkIn()中,如果被签入的对象有效,则会向信号量返回一个许可证。
下面使用Fat对象作为示例——Fat类的构造器执行起来很耗时:
public class Fat
private volatile double d;
private static int counter = 0;
private final int id = counter++;
public Fat()
for(int i = 0;i < 10000; ++i)
d += (Math.PI + Math.E) / (double)i;
public void operation()
System.out.println(this);
public String toString()
return "Fat id: " + id;
我们可以使用Pool在管理这个创建耗时的Fat对象,该任务将签出Fat对象,持有一段时间之后再将它们签入,以此来测试Pool这个类:
class CheckoutTask<T> implements Runnable
private static int counter = 0;
private final int id = counter++;
private Pool<T> pool;
public CheckoutTask(Pool<T> pool)
this.pool = pool;
public void run()
T item = pool.checkOut();
System.out.print(this + "checked out " + item);
TimeUnit.SECONDS.sleep(1);
System.out.print(this + "checking in " + item);
pool.checkIn(item);
catch(InterruptedException e)
public String toString()
return "CheckoutTask " + id + " ";
public class SemaphoreDemo
final static int SIZE = 25;
public static void main(String[] args)
final Pool<Fat> pool = new Pool<Fat>(Fat.class, SIZE);
ExecutorService exec = Executors.newCachedThreadPool();
for(int i = 0;i < SIZE; ++i)
exec.execute(new CheckoutTask<Fat>(pool));
System.out.print("All CheckoutTasks created");
List<Fat> list = new ArrayList<>();
for(int i = 0;i < SIZE; ++i)
Fat f = pool.checkOut();
System.out.print(i + ": main() thread checked out ");
f.operation();
list.add(f);
Future<?> bolcked = exec.submit(new Runnable()
public void run()
try
pool.checkOut();
catch(InterruptedException e)
System.out.print("checkOut() Interrupted");
);
TimeUnit.SECONDS.sleep(2);
blocked.cancel(true);
System.out.print("Checking in objects in " + list);
for(Fat f : list)
pool.checkIn(f);
for(Fat f : list)
pool.checkIn(f);
exec.shutdown();
这个示例依赖于Pool客户端严格地并愿意签入所持有的的对象, 当其工作时,这是最简单的解决方案。
以上是关于Java编程思想-并发的主要内容,如果未能解决你的问题,请参考以下文章