线程通信,是指线程之间的消息传递。
多个线程在操作同一个资源时,它们对共享资源的操作动作可能不同;它们共享同一个资源,互为条件,相互依赖,相互通信,从而让任务向前推进。
另外,在线程的同步策略中,虽然可以解决并发更新同一个资源,保障资源的安全,但不能用来实现线程间的消息传递。因此,线程通信与线程同步往往会融合使用。
生产者消费者模型堪称是线程通信中的一个典型案例,我们接下来通过生产者消费者模式来进一步认识线程通信。在此,我们先对若干概念进行了解。
生产者:没有生产之前通知消费者等待,生产产品结束之后,马上通知消费者消费
消费者:没有消费之前通知生产者等待,消费产品结束之后,通知生产者继续生产产品以供消费
线程通信:使用java中超类Object中提供的一些方法:
1 public final void wait(); //注:long timeout=0 表示线程一直等待,直到其它线程通知 2 public final native void wait(long timeout); //线程等待指定毫秒参数的时间,超过该时间则不再等待 3 public final void wait(long timeout, int nanos); //线程等待指定毫秒、微妙的时间,timeout最大等待时间,以毫秒为单位,nanos额外的时间,在纳秒范围0-9999994 public final native void notify(); //唤醒一个处于等待状态的线程 5 public final native void notifyAll(); //唤醒同一个对象上所有调用wait()方法的线程,优先级别高的线程优先运行
需要注意的是,上述方法只能在同步方法或者同步代码块中使用,否则会抛出异常。
接下来,我们以生产A-D个产品,放入仓库,待消费者消费后,生产者再进行生产为例,看下生产者消费者模式的运行流程。
1 /** 2 * 1.共享资源缓存和操作类 3 */ 4 public class SharedCache { 5 //产品,此处使用char字符,作为存储共享数据的数据类型 6 private char cache; 7 //产品消费标识,是线程间通信的信号,为true表示未消费(生产),false表示未生产(消费) 8 private boolean flag=false; 9 /* 10 生产操作(生产者):向仓库中添加共享数据 11 */ 12 public synchronized void addSharedCacheData(char data){ 13 //产品未消费,则生产者的生产操作等待 14 if(flag){ 15 System.out.println("产品未消费,生产者的生产操作等待"); 16 try { 17 //生产者等待 18 wait(); 19 } catch (InterruptedException e) { 20 System.out.println("Thread interrupted Exception:"+e.getMessage()); 21 } 22 } 23 //产品已消费,则生产者继续生产 24 this.cache=data; 25 //标记已生产 26 flag=true; 27 //通知消费者已生产 28 notify(); 29 System.out.println("生产者--->产品:"+data+"已生产,等待消费者消费"); 30 } 31 /* 32 消费操作(消费者):向仓库中获取共享数据 33 */ 34 public synchronized char getSharedCacheData(){ 35 //如果产品未生产,则消费者等待 36 if(!flag){ 37 System.out.println("产品未生产,消费者的消费操作等待"); 38 try { 39 wait(); 40 } catch (InterruptedException e) { 41 System.out.println("Thread interrupted Exception:"+e.getMessage()); 42 } 43 } 44 //标记已消费 45 flag=false; 46 //通知生产者已消费 47 notify(); 48 System.out.println("消费者--->产品:"+this.cache+"已消费,通知生产者生产"); 49 return this.cache; 50 } 51 } 52 /** 53 * 2.生产者线程类 54 */ 55 public class Producer extends Thread{ 56 //共享缓存资源类的对象 57 private SharedCache cache; 58 //构造器,传入共享资源类的对象 59 public Producer(SharedCache cache){ 60 this.cache=cache; 61 } 62 /* 63 生产者生产产品,放入共享资源缓存类(相当于将生产的产品放入仓库里) 64 生产A-D类型的产品 65 */ 66 @Override 67 public void run() { 68 for(char product=‘A‘;product<=‘D‘;product++){ 69 try { 70 sleep((int)(Math.random()*3000)); 71 } catch (InterruptedException e) { 72 System.out.println("Thread interrupted Exception:"+e.getMessage()); 73 } 74 //生产产品,放入共享缓存数据类的对象里(相当于把生产的产品放到仓库里) 75 cache.addSharedCacheData(product); 76 } 77 } 78 } 79 /** 80 * 3.消费者线程类 81 */ 82 public class Consumer extends Thread{ 83 //共享缓存资源类的对象 84 private SharedCache cache; 85 //构造器,传入共享资源类的对象 86 public Consumer(SharedCache cache){ 87 this.cache=cache; 88 } 89 /* 90 消费者消费产品,获取共享缓存类的对象里的数据(相当于从仓库里提取产品) 91 当消费到D类型的产品时即停止消费 92 */ 93 @Override 94 public void run() { 95 char product=‘a‘; 96 do{ 97 try { 98 Thread.sleep((int)(Math.random()*3000)); 99 } catch (InterruptedException e) { 100 System.out.println("Thread interrupted Exception:"+e.getMessage()); 101 } 102 //消费,从仓库取走商品 103 product=cache.getSharedCacheData(); 104 }while (product!=‘D‘); 105 } 106 } 107 /** 108 * 4.线程通信测试类 109 */ 110 public class Test { 111 public static void main(String[] args) { 112 //生产者与消费者共享同一个资源 113 SharedCache cache = new SharedCache(); 114 //启动消费者线程 115 new Consumer(cache).start(); 116 //启动生产者线程 117 new Producer(cache).start(); 118 } 119 }
运行上述的测试类后,执行结果如下:
产品未生产,消费者的消费操作等待 生产者--->产品:A已生产,等待消费者消费 消费者--->产品:A已消费,通知生产者生产 生产者--->产品:B已生产,等待消费者消费 消费者--->产品:B已消费,通知生产者生产 生产者--->产品:C已生产,等待消费者消费 产品未消费,生产者的生产操作等待 消费者--->产品:C已消费,通知生产者生产 生产者--->产品:D已生产,等待消费者消费 消费者--->产品:D已消费,通知生产者生产
我们在上面完成的生产者消费者模型,在处理线程同步问题时,主要是用了synchronized同步方法,JDK 1.5提供了多线程升级方案,将同步synchronized替换成了显示的Lock操作,可以实现唤醒、冻结指定的线程。
接口Lock 实现提供了比使用 synchronized 方法和语句可获得的更广泛的锁定操作。Lock 可以支持多个相关的 Condition 对象,从而在使用中更加灵活。
接口Condition可以替代传统的线程间通信,用await()替换wait(),用signal()替换notify(),用signalAll()替换notifyAll()。该对象可以通过Lock锁进行获取。可以说,传统线程的通信方式,Condition都可以实现。
需要注意的是,Condition是被绑定到Lock上的,要创建一个Lock的Condition必须用newCondition()方法。
Java.util.concurrent.lock 中的Lock 框架是锁定的一个抽象,它允许把锁定的实现作为 Java 类,从而为Lock 的多种实现留下了空间,各种实现可能有不同的调度算法、性能特性或者锁定语义。
其中,ReentrantLock 类实现了Lock ,它拥有与synchronized 相同的并发性和内存语义,还添加了类似锁投票、定时锁等候和可中断锁等候的一些特性。此外,它还提供了在激烈争用情况下更佳的性能。
我们接下来通过ReentrantLock 类和Condition接口的实现类来完成一个生产者消费者模型。为此,我们需要创建一个ReentrantLock类的多态对象,即建立一把锁,然后将这把锁与两个Condition对象关联。我们接下来就用Lock与Condition实现一个生产者消费者模型,实现与上述例子相似的效果,代码具体如下:
1 import java.util.concurrent.locks.Condition; 2 import java.util.concurrent.locks.Lock; 3 import java.util.concurrent.locks.ReentrantLock; 4 /** 5 * 共享的资源 6 */ 7 public class Resource { 8 private char product; 9 // private int count = 1; 10 //产品消费标识,是线程间通信的信号,为true表示未消费(生产),false表示未生产(消费) 11 private boolean flag = false; 12 //定义一个实现Lock接口的ReentrantLock类对象 13 private Lock lock = new ReentrantLock(); 14 /* 15 Condition是被绑定到Lock上的, 16 要创建一个Lock的Condition, 17 必须用Lock对象的newCondition()方法 18 */ 19 private Condition cond_pro = lock.newCondition(); 20 //一个lock可以有多个相关的condition 21 private Condition cond_con = lock.newCondition(); 22 /* 23 定义生产方法 24 */ 25 public void produce(char product) throws InterruptedException { 26 lock.lock();//手动加同步锁 27 try { 28 while (flag) {//此时若生产完一个以后唤醒了另一个生产者,则再次判断,避免两个生产者同时生产 29 System.out.println("产品未消费,生产者的生产操作等待"); 30 cond_pro.await(); 31 } 32 this.product = product; 33 //标记已生产 34 flag = true; 35 //通知消费者已生产 36 cond_con.signal();//唤醒消费方法,利用了condition的signal()指定唤醒对象 37 System.out.println("生产者"+Thread.currentThread().getName()+"--->产品:"+product+"已生产,等待消费者消费"); 38 } finally { 39 lock.unlock();//释放锁 40 } 41 } 42 /* 43 定义消费方法 44 */ 45 public void consume() throws InterruptedException { 46 lock.lock(); 47 try { 48 while (!flag) { 49 System.out.println("产品未生产,消费者的消费操作等待"); 50 cond_con.await(); 51 } 52 //标记已消费 53 flag = false; 54 //通知生产者已消费 55 cond_pro.signal(); 56 System.out.println("消费者"+Thread.currentThread().getName()+"--->产品:"+this.product+"已消费,通知生产者生产"); 57 } finally { 58 lock.unlock(); 59 } 60 } 61 } 62 /** 63 * 生产者 64 */ 65 public class Producer implements Runnable{ 66 private Resource res; 67 public Producer(Resource res){ 68 this.res=res; 69 } 70 @Override 71 public void run() { 72 char product=‘A‘; 73 while(product<‘E‘){ 74 try { 75 res.produce(product); 76 } catch (InterruptedException e) { 77 e.printStackTrace(); 78 } 79 product++; 80 } 81 } 82 } 83 /** 84 * 消费者 85 */ 86 public class Consumer implements Runnable{ 87 private Resource res; 88 public Consumer(Resource res){ 89 this.res=res; 90 } 91 @Override 92 public void run() { 93 char product=‘A‘; 94 while(product<‘E‘){ 95 try { 96 res.consume(); 97 } catch (InterruptedException e) { 98 e.printStackTrace(); 99 } 100 product++; 101 } 102 } 103 } 104 /** 105 * 用ReentrantLock和Condition实现生产者消费者模型 106 */ 107 public class Test { 108 //入口方法 109 public static void main(String[] args) { 110 Resource res = new Resource();//生产者与消费者共享的资源 111 Producer producer = new Producer(res);//生产者 112 Consumer consumer = new Consumer(res);//消费者 113 //生产者线程与消费者线程各创建两个 114 Thread p1 = new Thread(producer); 115 Thread p2 = new Thread(producer); 116 Thread c1 = new Thread(consumer); 117 Thread c2 = new Thread(consumer); 118 p1.start(); 119 p2.start(); 120 c1.start(); 121 c2.start(); 122 } 123 }
上述代码执行结果如下:
生产者Thread-0--->产品:A已生产,等待消费者消费 产品未消费,生产者的生产操作等待 消费者Thread-2--->产品:A已消费,通知生产者生产 产品未生产,消费者的消费操作等待 生产者Thread-1--->产品:A已生产,等待消费者消费 产品未消费,生产者的生产操作等待 消费者Thread-2--->产品:A已消费,通知生产者生产 产品未生产,消费者的消费操作等待 生产者Thread-0--->产品:B已生产,等待消费者消费 产品未消费,生产者的生产操作等待 消费者Thread-3--->产品:B已消费,通知生产者生产 产品未生产,消费者的消费操作等待 生产者Thread-1--->产品:B已生产,等待消费者消费 产品未消费,生产者的生产操作等待 消费者Thread-2--->产品:B已消费,通知生产者生产 产品未生产,消费者的消费操作等待 生产者Thread-0--->产品:C已生产,等待消费者消费 产品未消费,生产者的生产操作等待 消费者Thread-3--->产品:C已消费,通知生产者生产 产品未生产,消费者的消费操作等待 生产者Thread-1--->产品:C已生产,等待消费者消费 产品未消费,生产者的生产操作等待 消费者Thread-2--->产品:C已消费,通知生产者生产 生产者Thread-0--->产品:D已生产,等待消费者消费 消费者Thread-3--->产品:D已消费,通知生产者生产 产品未生产,消费者的消费操作等待 生产者Thread-1--->产品:D已生产,等待消费者消费 消费者Thread-3--->产品:D已消费,通知生产者生产