Java编程思想-并发

Posted vanpersie_9987

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java编程思想-并发相关的知识,希望对你有一定的参考价值。

生产者消费者队列

使用同步队列来解决任务协作的问题比使用wait和notify更加方便,因为后者需要在每次交互时都握手。而同步队列在任何时刻只允许一个任务插入或移除元素。在java.util.concurrent.BlockingQueue接口中提供了这个队列。该接口提供了大量的标准实现:LinkedBlockingQueue、ArrayBlockingQueue等。

如果消费者任务试图从队列中获取对象,而该队列此时为空,那么这些队列还可以挂起消费者任务。所以,阻塞队列比传统的wait和notify简单、可靠。

下面的示例将多了LiftOff对象的执行串行化了,消费者是LiftOffRunner,它将每个LiftOff对象从BlockingQueue中推行并直接运行。(即,BlockingQueue将显式地调用run而使用自己的线程运行,而不是为每一个任务启动一个新的线程。

class LiftOffRunner implements Runnable 
    private BlockingQueue<LiftOff> rockets;
    public LiftOffRunner(BlockingQueue<LiftOff> queue) 
        rockets = queue;
    
    public void add(LiftOff lo) 
        try 
            rockets.put(lo);
         catch(InterruptedException e) 
            System.out.print("Interrupted during put()");
        

    
    //当使用BlockingQueue加入或移除任务时,BlockingQueue实际上时=是直接调用了run方法,将run在BlockingQueue开辟的线程中执行,而不是新开一个任线程调用run方法。
    @Override
    public void run() 
        try 
            while(!Thread.interrupted()) 
                LoftOff rocket = rockets.take();
                rocket.run();
            
         catch(InterruptedException e) 
            System.out.print("waking from take()");
        
        System.out.print("Exiting LoftOffRunner");
    


public class Test 
    static void getKey() 
        try 
            new BufferedReader(new InputStreamReader(System.in)).readLine();
         catch(IOException e) 
            throw new RuntimeException(e);

        
    
    static void getKey(String message) 
        System.out.print(message);
        getKey();

    
    static void test(String msg, BlockingQueue<LiftOff> queue) 
        System.out.print(msg);
        LiftOffRunner runner = new LiftOffRunner(queue);
        Thread t = new Thread(runner);
        t.start();
        for(int i = 0; i < 5; ++i) 
            runner.add(new LiftOff(5));
            getKey("Press 'enter' (" + msg + ")");
            t.interrupt();
            System.out.print("Finished " + msg + " test");
        
    
    public static void main(String[] args) 
        //无限大小的队列
        test("LinkedBlockingQueue", new LinkedBlockingQueue<LiftOff>());
        //固定大小的队列
        test("ArrayBlockingQueue", new ArrayBlockingQueue<LiftOff>());
        //size为1的队列
        test("SynchronizedQueue", new SynchronizedQueue<LiftOff>());
    

主线程中启动了一个子线程t,并传入一个LiftOffRunner任务,该子线程t将并发执行LiftOffRunner的run方法,而在run方法中显式调用了LiftOff的run方法(LiftOff也是一个Runnable),并非启动了一个新的线程,也就是说,这个程序只有两个线程,一个主线程,一个子线程,主线程负责监测从键盘中读入的enter,当敲入回车,程序退出。所有的Blocking的add操作都是在唯一的子线程中执行的,由于使用了BlockingQueue ,所有的同步可以忽略。

一个BlockingQueue的示例:吐司 BlockingQueue

下面这个BlockingQueue的示例,模拟了一台机器执行的三个任务:一个制作吐司、一个给吐司抹黄油、另一个在抹过黄油的吐司上涂果酱。我们可以使用三个BlockingQueue来模拟这个过程:

class Toast 
    public enum Status 
        DRY, BUTTERED, JAMMED
    
    private Status status = Status.DRY;
    private final int id;
    public Toast(int idn) 
        id = idn;
    
    public void butter() 
        status = Status.BUTTERED;
    
    public void jam() 
        status = Status.JAMMED;
    
    public Status getStatus() 
        return status;
    
    public int getId() 
        return id;
    
    public String toString() 
        return "Toast " + id + ": " + status;
    



class ToastQueue extends LinkedBlockingQueue<Toast> 



class Toaster implements Runnable 
    private ToastQueue toastQueue;
    private int count = 0;
    private Random random = new Random(47);
    public Toaster(ToastQueue tq) 
        toastQueue = tq;
    
    public void run() 
        try 
            while(!Thread.interrupted()) 
                TimeUnit.MILLSECONDS.sleep(500);
                Toast t = new Toast(count++);
                System.out.print(t);
                toastQueue.put(t);
            
         catch(InterruptedException e) 
            System.out.print("Toaster interrupted");

        
        System.out.print("Toaster off");

    


class Butterer implements Runnable 
    private ToastQueue dryQueue, butteredQueue;
    public Buttterer(ToastQueue dry, ToastQueue buttered) 
        dryQueue = dry;
        butteredQueue = buttered;
    
    public void run() 
        try 
            while(!Thread.interrupted()) 
                Toast t = dryQueue.take();
                t.butter();
                System.out.print(t);
                butteredQueue.put(t);
            
         catch(InterruptedException e) 
            System.out.print("Butterer interrupted");
        
        System.out.print("Butterer off");
    




class Jammer implements Runnable 
    private ToastQueue butteredQueue, finishedQueue;

    public Jammer(ToastQueue buttered, ToastQueue finishedQueue) 
        this.butteredQueue = buttered;
        this.finishedQueue = finishedQueue;
    
    public void run() 
        try 
            while(!Thread.interrupted()) 
                Toast t = butteredQueue.take();
                System.out.print(t);
                finishedQueue.put(t);
            
         catch(InterruptedException e) 
            System.out.print("Jammer interrupted");

        
        System.out.print("Jammer off");

    


class Eater implements Runnable 
    public void run() 
        try 
            while(!Thread.interrupted()) 
                Toast t = finishedQueue.take();
                if(t.getId() != Toast.Status.JAMMED) 
                    System.out.print("Error: " + t);
                 else 
                    System.out.print("Chomp! " + t);
                
             catch(InterrruptedException e) 
                System.out.print("Eater interrupted");
            
        
        System.out.print("Eater off");

    



public class ToasterMatic 
    public static void main(String[] args) 
        ToastQueue dryQueue = new ToastQueue();
        ToastQueue butteredQueue = new ToastQueue();
        ToastQueue finishedQueue = new ToastQueue();
        ExecutorService exec = Executors.newCachedThreadPool();
        exec.exceute(new Toaster(dryQueue));
        exec.exceute(new Butterer(dryQueue, butteredQueue));
        exec.exceute(new Jammer(butteredQueue, finishedQueue));
        exec.exceute(new Eater(finishedQueue));
        TimeUnit.SECONDS.sleep(5);
        exec.shutdownNow();
    

任务间使用管道进行输入/输出

通过输入/输出在线程间进行通信通常很有用。提供线程功能的类库以“管道”的形式对线程间的输入/输出类库中的对应物就是PipedWriter类(允许任务向管道写)和PipedReader类(允许不同任务从同一个管道中读取)。管道也是生产者/消费者问题的一个解决方案。管道是一个阻塞队列。在引入BlockingQueue之前,管道非常常用。

下面的例子将使用管道在两个任务间通信:

class Sender implements Runnable 
    private Random random = new Random(47);
    private PipedWriter out = new PipedWriter();
    public PipedWriter getPipedWriter() 
        return out;
    
    public void run() 
        try 
            while(true) 
                for(char c = 'A'; c <= 'z'; ++c) 
                    out.write(c);
                    TimeUnit.write(c);
                            TimeUnit.MILLSECONDS.sleep(rand.nextInt(500));
                
            
         catch(IOException e) 
            System.out.print(e + " Sender write exception");
         catch(InterruptedException e) 
            System.out.print(e + " Sender sleep interrupted");
        
    


class Receiver implements Runnable 
    private PipedReader in;
    public Receiver(Sender sender) throws IOException 
        in = new PipedReader(sender.getPipedWriter());

    
    public void run() 
        try 
            while(true) 
            System.out.print("Read: " + (char)in.read() + ", ");
            
         catch(IOException e) 
            System.out.print(e + "Receiver read exception");
        

    


public class PipedIO 
    public static void main(String[] args) 
        Sender sender = new Sender();
        Receiver receiver = new Receiver(sender);
        ExecutorService exec = Executors.newCachedThreadPool();
        exec.execute(sender);
        exec.execute(receiver);
        TimeUnit.SECONS.sleep(4);
        exec.shutdown();
    
//输出
Read: A, Read: B, Read: B, Read: D, Read: E, Read: F, Read: G, Read: H, Read: I, Read: J, Read: K, Read: L, Read: M, Read: N, Read: O, java.langInterruptedException: sleep interrupted Sender sleep interrupted
java.lang.io.InterruptedIOException Receiver read exception

Sender 和Receiver表示两个需要通信的任务。Sender创建一个PipedWriter用于向“管道”中写入,而Receiver建立一个PipedReader,这个PipedReader必须知道从哪里读取,所需必须传入一个PipedWriter参数。程序将并发执行这两个任务,Sender每次向管道按照A-z写入一个英文字母,然后随机休眠0-500毫秒数,接着由Receiver读取这个英文字母并打印,程序将在4秒后中断。

从输出还可以看到,调用shutdown时,程序立马中断了,说明PipedReader是可以中断的。这也是它与普通IO的区别,后者将不能被打断(即System.in.read()将不能被打断)。

以上是关于Java编程思想-并发的主要内容,如果未能解决你的问题,请参考以下文章

Java编程思想之并发

Java编程思想之二十 并发

Java编程思想-并发

Java编程思想读书笔记--第21章并发

Java编程思想-并发

Java编程思想-并发