Java编程思想-并发
Posted vanpersie_9987
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java编程思想-并发相关的知识,希望对你有一定的参考价值。
生产者消费者队列
使用同步队列来解决任务协作的问题比使用wait和notify更加方便,因为后者需要在每次交互时都握手。而同步队列在任何时刻只允许一个任务插入或移除元素。在java.util.concurrent.BlockingQueue接口中提供了这个队列。该接口提供了大量的标准实现:LinkedBlockingQueue、ArrayBlockingQueue等。
如果消费者任务试图从队列中获取对象,而该队列此时为空,那么这些队列还可以挂起消费者任务。所以,阻塞队列比传统的wait和notify简单、可靠。
下面的示例将多了LiftOff对象的执行串行化了,消费者是LiftOffRunner,它将每个LiftOff对象从BlockingQueue中推行并直接运行。(即,BlockingQueue将显式地调用run而使用自己的线程运行,而不是为每一个任务启动一个新的线程。)
class LiftOffRunner implements Runnable
private BlockingQueue<LiftOff> rockets;
public LiftOffRunner(BlockingQueue<LiftOff> queue)
rockets = queue;
public void add(LiftOff lo)
try
rockets.put(lo);
catch(InterruptedException e)
System.out.print("Interrupted during put()");
//当使用BlockingQueue加入或移除任务时,BlockingQueue实际上时=是直接调用了run方法,将run在BlockingQueue开辟的线程中执行,而不是新开一个任线程调用run方法。
@Override
public void run()
try
while(!Thread.interrupted())
LoftOff rocket = rockets.take();
rocket.run();
catch(InterruptedException e)
System.out.print("waking from take()");
System.out.print("Exiting LoftOffRunner");
public class Test
static void getKey()
try
new BufferedReader(new InputStreamReader(System.in)).readLine();
catch(IOException e)
throw new RuntimeException(e);
static void getKey(String message)
System.out.print(message);
getKey();
static void test(String msg, BlockingQueue<LiftOff> queue)
System.out.print(msg);
LiftOffRunner runner = new LiftOffRunner(queue);
Thread t = new Thread(runner);
t.start();
for(int i = 0; i < 5; ++i)
runner.add(new LiftOff(5));
getKey("Press 'enter' (" + msg + ")");
t.interrupt();
System.out.print("Finished " + msg + " test");
public static void main(String[] args)
//无限大小的队列
test("LinkedBlockingQueue", new LinkedBlockingQueue<LiftOff>());
//固定大小的队列
test("ArrayBlockingQueue", new ArrayBlockingQueue<LiftOff>());
//size为1的队列
test("SynchronizedQueue", new SynchronizedQueue<LiftOff>());
主线程中启动了一个子线程t,并传入一个LiftOffRunner任务,该子线程t将并发执行LiftOffRunner的run方法,而在run方法中显式调用了LiftOff的run方法(LiftOff也是一个Runnable),并非启动了一个新的线程,也就是说,这个程序只有两个线程,一个主线程,一个子线程,主线程负责监测从键盘中读入的enter,当敲入回车,程序退出。所有的Blocking的add操作都是在唯一的子线程中执行的,由于使用了BlockingQueue ,所有的同步可以忽略。
一个BlockingQueue的示例:吐司 BlockingQueue
下面这个BlockingQueue的示例,模拟了一台机器执行的三个任务:一个制作吐司、一个给吐司抹黄油、另一个在抹过黄油的吐司上涂果酱。我们可以使用三个BlockingQueue来模拟这个过程:
class Toast
public enum Status
DRY, BUTTERED, JAMMED
private Status status = Status.DRY;
private final int id;
public Toast(int idn)
id = idn;
public void butter()
status = Status.BUTTERED;
public void jam()
status = Status.JAMMED;
public Status getStatus()
return status;
public int getId()
return id;
public String toString()
return "Toast " + id + ": " + status;
class ToastQueue extends LinkedBlockingQueue<Toast>
class Toaster implements Runnable
private ToastQueue toastQueue;
private int count = 0;
private Random random = new Random(47);
public Toaster(ToastQueue tq)
toastQueue = tq;
public void run()
try
while(!Thread.interrupted())
TimeUnit.MILLSECONDS.sleep(500);
Toast t = new Toast(count++);
System.out.print(t);
toastQueue.put(t);
catch(InterruptedException e)
System.out.print("Toaster interrupted");
System.out.print("Toaster off");
class Butterer implements Runnable
private ToastQueue dryQueue, butteredQueue;
public Buttterer(ToastQueue dry, ToastQueue buttered)
dryQueue = dry;
butteredQueue = buttered;
public void run()
try
while(!Thread.interrupted())
Toast t = dryQueue.take();
t.butter();
System.out.print(t);
butteredQueue.put(t);
catch(InterruptedException e)
System.out.print("Butterer interrupted");
System.out.print("Butterer off");
class Jammer implements Runnable
private ToastQueue butteredQueue, finishedQueue;
public Jammer(ToastQueue buttered, ToastQueue finishedQueue)
this.butteredQueue = buttered;
this.finishedQueue = finishedQueue;
public void run()
try
while(!Thread.interrupted())
Toast t = butteredQueue.take();
System.out.print(t);
finishedQueue.put(t);
catch(InterruptedException e)
System.out.print("Jammer interrupted");
System.out.print("Jammer off");
class Eater implements Runnable
public void run()
try
while(!Thread.interrupted())
Toast t = finishedQueue.take();
if(t.getId() != Toast.Status.JAMMED)
System.out.print("Error: " + t);
else
System.out.print("Chomp! " + t);
catch(InterrruptedException e)
System.out.print("Eater interrupted");
System.out.print("Eater off");
public class ToasterMatic
public static void main(String[] args)
ToastQueue dryQueue = new ToastQueue();
ToastQueue butteredQueue = new ToastQueue();
ToastQueue finishedQueue = new ToastQueue();
ExecutorService exec = Executors.newCachedThreadPool();
exec.exceute(new Toaster(dryQueue));
exec.exceute(new Butterer(dryQueue, butteredQueue));
exec.exceute(new Jammer(butteredQueue, finishedQueue));
exec.exceute(new Eater(finishedQueue));
TimeUnit.SECONDS.sleep(5);
exec.shutdownNow();
任务间使用管道进行输入/输出
通过输入/输出在线程间进行通信通常很有用。提供线程功能的类库以“管道”的形式对线程间的输入/输出类库中的对应物就是PipedWriter类(允许任务向管道写)和PipedReader类(允许不同任务从同一个管道中读取)。管道也是生产者/消费者问题的一个解决方案。管道是一个阻塞队列。在引入BlockingQueue之前,管道非常常用。
下面的例子将使用管道在两个任务间通信:
class Sender implements Runnable
private Random random = new Random(47);
private PipedWriter out = new PipedWriter();
public PipedWriter getPipedWriter()
return out;
public void run()
try
while(true)
for(char c = 'A'; c <= 'z'; ++c)
out.write(c);
TimeUnit.write(c);
TimeUnit.MILLSECONDS.sleep(rand.nextInt(500));
catch(IOException e)
System.out.print(e + " Sender write exception");
catch(InterruptedException e)
System.out.print(e + " Sender sleep interrupted");
class Receiver implements Runnable
private PipedReader in;
public Receiver(Sender sender) throws IOException
in = new PipedReader(sender.getPipedWriter());
public void run()
try
while(true)
System.out.print("Read: " + (char)in.read() + ", ");
catch(IOException e)
System.out.print(e + "Receiver read exception");
public class PipedIO
public static void main(String[] args)
Sender sender = new Sender();
Receiver receiver = new Receiver(sender);
ExecutorService exec = Executors.newCachedThreadPool();
exec.execute(sender);
exec.execute(receiver);
TimeUnit.SECONS.sleep(4);
exec.shutdown();
//输出
Read: A, Read: B, Read: B, Read: D, Read: E, Read: F, Read: G, Read: H, Read: I, Read: J, Read: K, Read: L, Read: M, Read: N, Read: O, java.langInterruptedException: sleep interrupted Sender sleep interrupted
java.lang.io.InterruptedIOException Receiver read exception
Sender 和Receiver表示两个需要通信的任务。Sender创建一个PipedWriter用于向“管道”中写入,而Receiver建立一个PipedReader,这个PipedReader必须知道从哪里读取,所需必须传入一个PipedWriter参数。程序将并发执行这两个任务,Sender每次向管道按照A-z写入一个英文字母,然后随机休眠0-500毫秒数,接着由Receiver读取这个英文字母并打印,程序将在4秒后中断。
从输出还可以看到,调用shutdown时,程序立马中断了,说明PipedReader是可以中断的。这也是它与普通IO的区别,后者将不能被打断(即System.in.read()将不能被打断)。
以上是关于Java编程思想-并发的主要内容,如果未能解决你的问题,请参考以下文章