Java实现生产者消费者问题与读者写者问题
Posted 刀刀(2把刀)
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java实现生产者消费者问题与读者写者问题相关的知识,希望对你有一定的参考价值。
摘要: Java实现生产者消费者问题与读者写者问题
1、生产者消费者问题
生产者消费者问题是研究多线程程序时绕不开的经典问题之一,它描述是有一块缓冲区作为仓库,生产者可以将产品放入仓库,消费者则可以从仓库中取走产品。解决生产者/消费者问题的方法可分为两类:(1)采用某种机制保护生产者和消费者之间的同步;(2)在生产者和消费者之间建立一个管道。第一种方式有较高的效率,并且易于实现,代码的可控制性较好,属于常用的模式。第二种管道缓冲区不易控制,被传输数据对象不易于封装等,实用性不强。
同步问题核心在于:如何保证同一资源被多个线程并发访问时的完整性。常用的同步方法是采用信号或加锁机制,保证资源在任意时刻至多被一个线程访问。Java语言在多线程编程上实现了完全对象化,提供了对同步机制的良好支持。在Java中一共有五种方法支持同步,其中前四个是同步方法,一个是管道方法。
- wait() / notify()方法
- await() / signal()方法
- BlockingQueue阻塞队列方法
- Semaphore方法
- PipedInputStream / PipedOutputStream
1.1 wait() / notify()方法
wait() / nofity()方法是基类Object的两个方法,也就意味着所有Java类都会拥有这两个方法,这样,我们就可以为任何对象实现同步机制。
wait()方法:当缓冲区已满/空时,生产者/消费者线程停止自己的执行,放弃锁,使自己处于等等状态,让其他线程执行。
notify()方法:当生产者/消费者向缓冲区放入/取出一个产品时,向其他等待的线程发出可执行的通知,同时放弃锁,使自己处于等待状态。
各起了4个生产者,4个消费者
package test; public class Hosee { private static Integer count = 0; private final Integer FULL = 10; private static String LOCK = "LOCK"; class Producer implements Runnable { @Override public void run() { for (int i = 0; i < 10; i++) { try { Thread.sleep(3000); } catch (Exception e) { e.printStackTrace(); } synchronized (LOCK) { while (count == FULL) { try { LOCK.wait(); } catch (Exception e) { e.printStackTrace(); } } count++; System.out.println(Thread.currentThread().getName() + "生产者生产,目前总共有" + count); LOCK.notifyAll(); } } } } class Consumer implements Runnable { @Override public void run() { for (int i = 0; i < 10; i++) { try { Thread.sleep(3000); } catch (InterruptedException e1) { e1.printStackTrace(); } synchronized (LOCK) { while (count == 0) { try { LOCK.wait(); } catch (Exception e) { // TODO: handle exception e.printStackTrace(); } } count--; System.out.println(Thread.currentThread().getName() + "消费者消费,目前总共有" + count); LOCK.notifyAll(); } } } } public static void main(String[] args) throws Exception { Hosee hosee = new Hosee(); new Thread(hosee.new Producer()).start(); new Thread(hosee.new Consumer()).start(); new Thread(hosee.new Producer()).start(); new Thread(hosee.new Consumer()).start(); new Thread(hosee.new Producer()).start(); new Thread(hosee.new Consumer()).start(); new Thread(hosee.new Producer()).start(); new Thread(hosee.new Consumer()).start(); } }
(需要注意的是,用什么加锁就用什么notify和wait,实例中使用的是LOCK)
部分打印结果:由于生产者和消费者说明一致,所以最多都是在2左右,当减少一个消费者时,则会加到10。
1.2 await() / signal()方法
首先,我们先来看看await()/signal()与wait()/notify()的区别:
- wait()和notify()必须在synchronized的代码块中使用 因为只有在获取当前对象的锁时才能进行这两个操作 否则会报异常 而await()和signal()一般与Lock()配合使用。
- wait是Object的方法,而await只有部分类有,如Condition。
- await()/signal()和新引入的锁定机制Lock直接挂钩,具有更大的灵活性。
那么为什么有了synchronized还要提出Lock呢?
1.2.1 对synchronized的改进
synchronized并不完美,它有一些功能性的限制 —— 它无法中断一个正在等候获得锁的线程,也无法通过投票得到锁,如果不想等下去,也就没法得到锁。同步还要求锁的释放只能在与获得锁所在的堆栈帧相同的堆栈帧中进行,多数情况下,这没问题(而且与异常处理交互得很好),但是,确实存在一些非块结构的锁定更合适的情况。
1.2.2 ReentrantLock 类
java.util.concurrent.lock 中的 Lock 框架是锁定的一个抽象,它允许把锁定的实现作为 Java 类,而不是作为语言的特性来实现(更加面向对象)。这就为 Lock 的多种实现留下了空间,各种实现可能有不同的调度算法、性能特性或者锁定语义。 ReentrantLock 类实现了 Lock ,它拥有与 synchronized 相同的并发性和内存语义,但是添加了类似锁投票、定时锁等候和可中断锁等候的一些特性。此外,它还提供了在激烈争用情况下更佳的性能。(换句话说,当许多线程都想访问共享资源时,JVM 可以花更少的时候来调度线程,把更多时间用在执行线程上。)
reentrant 锁意味着什么呢?简单来说,它有一个与锁相关的获取计数器,如果拥有锁的某个线程再次得到锁,那么获取计数器就加1,然后锁需要被释放两次才能获得真正释放(重入锁)。这模仿了 synchronized 的语义;如果线程进入由线程已经拥有的监控器保护的 synchronized 块,就允许线程继续进行,当线程退出第二个(或者后续) synchronized 块的时候,不释放锁,只有线程退出它进入的监控器保护的第一个synchronized 块时,才释放锁。
简单解释下重入锁:
public class Child extends Father implements Runnable{
final static Child child = new Child();//为了保证锁唯一
public static void main(String[] args) {
for (int i = 0; i < 50; i++) {
new Thread(child).start();
}
}
public synchronized void doSomething() {
System.out.println("1child.doSomething()");
doAnotherThing(); // 调用自己类中其他的synchronized方法
}
private synchronized void doAnotherThing() {
super.doSomething(); // 调用父类的synchronized方法
System.out.println("3child.doAnotherThing()");
}
@Override
public void run() {
child.doSomething();
}
}
class Father {
public synchronized void doSomething() {
System.out.println("2father.doSomething()");
}
}
上述代码的锁都是child对象,当执行child.doSomething时,该线程获得child对象的锁,在doSomething方法内执行doAnotherThing时再次请求child对象的锁,因为synchronized是重入锁,所以可以得到该锁,继续在doAnotherThing里执行父类的doSomething方法时第三次请求child对象的锁,同理可得到,如果不是重入锁的话,那这后面这两次请求锁将会被一直阻塞,从而导致死锁。
在查看下面代码示例时,可以看到 Lock 和 synchronized 有一点明显的区别 —— lock 必须在 finally 块中释放。否则,如果受保护的代码将抛出异常,锁就有可能永远得不到释放!这一点区别看起来可能没什么,但是实际上,它极为重要。忘记在 finally 块中释放锁,可能会在程序中留下一个定时炸弹,当有一天炸弹爆炸时,您要花费很大力气才有找到源头在哪。而使用同步,JVM 将确保锁会获得自动释放。
Lock lock = new ReentrantLock();
lock.lock();
try {
// update object state
}
finally {
lock.unlock();
}
除此之外,与目前的 synchronized 实现相比,争用下的 ReentrantLock 实现更具可伸缩性。(在未来的 JVM 版本中,synchronized 的争用性能很有可能会获得提高。)这意味着当许多线程都在争用同一个锁时,使用 ReentrantLock 的总体开支通常要比 synchronized 少得多。
1.2.3 什么时候选择用 ReentrantLock 代替 synchronized
在 Java1.5 中,synchronized 是性能低效的。因为这是一个重量级操作,需要调用操作接口,导致有可能加锁消耗的系统时间比加锁以外的操作还多。相比之下使用 Java 提供的 Lock 对象,性能更高一些。但是到了 Java1.6,发生了变化。synchronized 在语义上很清晰,可以进行很多优化,有适应自旋,锁消除,锁粗化,轻量级锁,偏向锁等等。导致在 Java1.6 上 synchronized 的性能并不比 Lock 差。官方也表示,他们也更支持 synchronized,在未来的版本中还有优化余地。
所以在确实需要一些 synchronized 所没有的特性的时候,比如时间锁等候、可中断锁等候、无块结构锁、多个条件变量或者锁投票使用ReentrantLock。ReentrantLock 还具有可伸缩性的好处,应当在高度争用的情况下使用它,但是请记住,大多数 synchronized 块几乎从来没有出现过争用,所以可以把高度争用放在一边。我建议用 synchronized 开发,直到确实证明 synchronized 不合适,而不要仅仅是假设如果使用 ReentrantLock “性能会更好”。请记住,这些是供高级用户使用的高级工具。(而且,真正的高级用户喜欢选择能够找到的最简单工具,直到他们认为简单的工具不适用为止。)。一如既往,首先要把事情做好,然后再考虑是不是有必要做得更快。
1.2.4 接下来我们使用ReentrantLock来实现生产者消费者问题
package test;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class Hosee {
private static Integer count = 0;
private final Integer FULL = 10;
final Lock lock = new ReentrantLock();
final Condition NotFull = lock.newCondition();
final Condition NotEmpty = lock.newCondition();
class Producer implements Runnable {
@Override
public void run() {
for (int i = 0; i < 10; i++) {
try {
Thread.sleep(3000);
} catch (Exception e) {
e.printStackTrace();
}
lock.lock();
try {
while (count == FULL) {
try {
NotFull.await();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
count++;
System.out.println(Thread.currentThread().getName()
+ "生产者生产,目前总共有" + count);
NotEmpty.signal();
} finally {
lock.unlock();
}
}
}
}
class Consumer implements Runnable {
@Override
public void run() {
for (int i = 0; i < 10; i++) {
try {
Thread.sleep(3000);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
lock.lock();
try {
while (count == 0) {
try {
NotEmpty.await();
} catch (Exception e) {
// TODO: handle exception
e.printStackTrace();
}
}
count--;
System.out.println(Thread.currentThread().getName()
+ "消费者消费,目前总共有" + count);
NotFull.signal();
} finally {
lock.unlock();
}
}
}
}
public static void main(String[] args) throws Exception {
Hosee hosee = new Hosee();
new Thread(hosee.new Producer()).start();
new Thread(hosee.new Consumer()).start();
new Thread(hosee.new Producer()).start();
new Thread(hosee.new Consumer()).start();
new Thread(hosee.new Producer()).start();
new Thread(hosee.new Consumer()).start();
new Thread(hosee.new Producer()).start();
new Thread(hosee.new Consumer()).start();
}
}
运行结果与第一个类似。上述代码用了两个Condition,其实用一个也是可以的,只不过要signalall()。
1.3 BlockingQueue阻塞队列方法
BlockingQueue是JDK5.0的新增内容,它是一个已经在内部实现了同步的队列,实现方式采用的是我们第2种await() / signal()方法。它可以在生成对象时指定容量大小。它用于阻塞操作的是put()和take()方法。
put()方法:类似于我们上面的生产者线程,容量达到最大时,自动阻塞。
take()方法:类似于我们上面的消费者线程,容量为0时,自动阻塞。
package test;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class Hosee {
private static Integer count = 0;
final BlockingQueue<Integer> bq = new ArrayBlockingQueue<Integer>(10);
class Producer implements Runnable {
@Override
public void run() {
for (int i = 0; i < 10; i++) {
try {
Thread.sleep(3000);
} catch (Exception e) {
e.printStackTrace();
}
try {
bq.put(1);
count++;
System.out.println(Thread.currentThread().getName()
+ "生产者生产,目前总共有" + count);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
class Consumer implements Runnable {
@Override
public void run() {
for (int i = 0; i < 10; i++) {
try {
Thread.sleep(3000);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
try {
bq.take();
count--;
System.out.println(Thread.currentThread().getName()
+ "消费者消费,目前总共有" + count);
} catch (Exception e) {
// TODO: handle exception
e.printStackTrace();
}
}
}
}
public static void main(String[] args) throws Exception {
Hosee hosee = new Hosee();
new Thread(hosee.new Producer()).start();
new Thread(hosee.new Consumer()).start();
new Thread(hosee.new Producer()).start();
new Thread(hosee.new Consumer()).start();
new Thread(hosee.new Producer()).start();
new Thread(hosee.new Consumer()).start();
new Thread(hosee.new Producer()).start();
new Thread(hosee.new Consumer()).start();
}
}
其实这个BlockingQueue比较难用代码来演示,因为put()与take()方法无法与输出语句保证同步,当然你可以自己去实现 BlockingQueue(BlockingQueue是用await()/signal() 实现的)。所以在输出结果上你会发现不匹配。
例如:当缓冲区已满,生产者在put()操作时,put()内部调用了await()方法,放弃了线程的执行,然后消费者线程执行,调用take()方法,take()内部调用了signal()方法,通知生产者线程可以执行,致使在消费者的println()还没运行的情况下生产者的println()先被执行,所以有了输出不匹配的情况。
对于BlockingQueue大家可以放心使用,这可不是它的问题,只是在它和别的对象之间的同步有问题。
1.4 Semaphore方法
Semaphore 信号量,就是一个允许实现设置好的令牌。也许有1个,也许有10个或更多。谁拿到令牌(acquire)就可以去执行了,如果没有令牌则需要等待。
执行完毕,一定要归还(release)令牌,否则令牌会被很快用光,别的线程就无法获得令牌而执行下去了。
package test;
import java.util.concurrent.Semaphore;
public class Hosee
{
int count = 0;
final Semaphore notFull = new Semaphore(10);
final Semaphore notEmpty = new Semaphore(0);
final Semaphore mutex = new Semaphore(1);
class Producer implements Runnable
{
@Override
public void run()
{
for (int i = 0; i < 10; i++)
{
try
{
Thread.sleep(3000);
}
catch (Exception e)
{
e.printStackTrace();
}
try
{
notFull.acquire();//顺序不能颠倒,否则会造成死锁。
mutex.acquire();
count++;
System.out.println(Thread.currentThread().getName()
+ "生产者生产,目前总共有" + count);
}
catch (Exception e)
{
e.printStackTrace();
}
finally
{
mutex.release();
notEmpty.release();
}
}
}
}
class Consumer implements Runnable
{
@Override
public void run()
{
for (int i = 0; i < 10; i++)
{
try
{
Thread.sleep(3000);
}
catch (InterruptedException e1)
{
e1.printStackTrace();
}
try
{
notEmpty.acquire();//顺序不能颠倒,否则会造成死锁。
mutex.acquire();
count--;
System.out.println(Thread.currentThread().getName()
+ "消费者消费,目前总共有" + count);
}
catch (Exception e)
{
e.printStackTrace();
}
finally
{
mutex.release();
notFull.release();
}
}
}
}
public static void main(String[] args) throws Exception
{
Hosee hosee = new Hosee();
new Thread(hosee.new Producer()).start();
new Thread(hosee.new Consumer()).start();
new Thread(hosee.new Producer()).start();
new Thread(hosee.new Consumer()).start();
new Thread(hosee.new Producer()).start();
new Thread(hosee.new Consumer()).start();
new Thread(hosee.new Producer()).start();
new Thread(hosee.new Consumer()).start();
}
}
注意notFull.acquire()与mutex.acquire()的位置不能互换,如果先得到互斥锁再发生等待,会造成死锁。
1.5 PipedInputStream / PipedOutputStream
这个类位于java.io包中,是解决同步问题的最简单的办法,一个线程将数据写入管道,另一个线程从管道读取数据,这样便构成了一种生产者/消费者的缓冲区编程模式。PipedInputStream/PipedOutputStream只能用于多线程模式,用于单线程下可能会引发死锁。
package test;
import java.io.IOException;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
public class Hosee {
final PipedInputStream pis = new PipedInputStream();
final PipedOutputStream pos = new PipedOutputStream();
{
try {
pis.connect(pos);
} catch (IOException e) {
e.printStackTrace();
}
}
class Producer implements Runnable {
@Override
public void run() {
try{
while(true){
int b = (int) (Math.random() * 255);
System.out.println("Producer: a byte, the value is " + b);
pos.write(b);
pos.flush();
}
}catch(Exception e){
e.printStackTrace();
}finally{
try{
pos.close();
pis.close();
}catch(IOException e){
System.out.println(e);
}
}
}
}
class Consumer implements Runnable {
@Override
public void run() {
try{
while(true){
int b = pis.read();
System.out.println("Consumer: a byte, the value is " + String.valueOf(b));
}
}catch(Exception e){
e.printStackTrace();
}finally{
try{
pos.close();
pis.close();
}catch(IOException e){
System.out.println(e);
}
}
}
}
public static void main(String[] args) throws Exception {
Hosee hosee = new Hosee();
new Thread(hosee.new Producer()).start();
new Thread(hosee.new Consumer()).start();
}
}
与阻塞队列一样,由于read()/write()方法与输出方法不一定同步,输出结果方面会发生不匹配现象,为了使结果更加明显,这里只有1个消费者和1个生产者。
2、读者写者问题
读者—写者问题(Readers-Writers problem)也是一个经典的并发程序设计问题,是经常出现的一种同步问题。计算机系统中的数据(文件、记录)常被多个进程共享,但其中某些进程可能只要求读数据(称为读者Reader);另一些进程则要求修改数据(称为写者Writer)。就共享数据而言,Reader和Writer是两组并发进程共享一组数据区,要求:
(1)允许多个读者同时执行读操作;
(2)不允许读者、写者同时操作;
(3)不允许多个写者同时操作。
Reader和Writer的同步问题分为读者优先、弱写者优先(公平竞争)和强写者优先三种情况,它们的处理方式不同。
首先我们都只考虑公平竞争的情况下,看看Java有哪些方法可以实现读者写者问题
2.1 读写锁
ReentrantReadWriteLock会使用两把锁来解决问题,一个读锁,一个写锁
线程进入读锁的前提条件:
没有其他线程的写锁,
没有写请求或者有写请求,但调用线程和持有锁的线程是同一个
线程进入写锁的前提条件:
没有其他线程的读锁
没有其他线程的写锁
到ReentrantReadWriteLock,首先要做的是与ReentrantLock划清界限。它和后者都是单独的实现,彼此之间没有继承或实现的关系。然后就是总结这个锁机制的特性了:
- 重入(在上文ReentrantLock处已经介绍了)方面其内部的WriteLock可以获取ReadLock,但是反过来ReadLock想要获得WriteLock则永远都不要想。
- WriteLock可以降级为ReadLock,顺序是:先获得WriteLock再获得ReadLock,然后释放WriteLock,这时候线程将保持Readlock的持有。反过来ReadLock想要升级为WriteLock则不可能,为什么?参看(1),呵呵.
- ReadLock可以被多个线程持有并且在作用时排斥任何的WriteLock,而WriteLock则是完全的互斥。这一特性最为重要,因为对于高读取频率而相对较低写入的数据结构,使用此类锁同步机制则可以提高并发量。
- 不管是ReadLock还是WriteLock都支持Interrupt,语义与ReentrantLock一致。
- WriteLock支持Condition并且与ReentrantLock语义一致,而ReadLock则不能使用Condition,否则抛出UnsupportedOperationException异常。
看下ReentrantReadWriteLock这个类的两个构造函数
public ReentrantReadWriteLock() {
this(false);
}
/**
* Creates a new {@code ReentrantReadWriteLock} with
* the given fairness policy.
*
* @param fair {@code true} if this lock should use a fair ordering policy
*/
public ReentrantReadWriteLock(boolean fair) {
sync = (fair)? new FairSync() : new NonfairSync();
readerLock = new ReadLock(this);
writerLock = new WriteLock(this);
}
fair这个参数表示是否是创建一个公平的读写锁,还是非公平的读写锁。也就是抢占式还是非抢占式。
公平和非公平:公平表示获取的锁的顺序是按照线程加锁的顺序来分配获取到锁的线程时最先加锁的线程,是按照FIFO的顺序来分配锁的;非公平表示获取锁的顺序是无需的,后来加锁的线程可能先获得锁,这种情况就导致某些线程可能一直没获取到锁。
公平锁为啥会影响性能,从code上来看看公平锁仅仅是多了一项检查是否在队首会影响性能,如不是,那么又是在什么地方影响的?假如是闯入的线程,会排在队尾并睡觉(parking)等待前任节点唤醒,这样势必会比非公平锁添加很多paking和unparking的操作
一般的应用场景是: 如果有多个读线程,一个写线程,而且写线程在操作的时候需要阻塞读线程,那么此时就需要使用公平锁,要不然可能写线程一直获取不到锁,导致线程饿死。
再简单说下锁降级
重入还允许从写入锁降级为读取锁,其实现方式是:先获取写入锁,然后获取读取锁,最后释放写入锁。但是,从读取锁升级到写入锁是不可能的。rwl.readLock().lock();
if (!cacheValid) {
// Must release read lock before acquiring write lock
rwl.readLock().unlock();
rwl.writeLock().lock();
if (!cacheValid) {
data = ...
cacheValid = true;
}
rwl.readLock().lock();
rwl.writeLock().unlock(); // 降级:先获取读锁再释放写锁
}
下面我们用读写锁来实现读者写者问题
import java.util.Random;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class ReadWriteLockTest {
public static void main(String[] args) {
final Queue3 q3 = new Queue3();
for (int i = 0; i < 3; i++) {
new Thread() {
public void run() {
while (true) {
q3.get();
}
}
}.start();
}
for (int i = 0; i < 3; i++) {
new Thread() {
public void run() {
while (true) {
q3.put(new Random().nextInt(10000));
}
}
}.start();
}
}
}
class Queue3 {
private Object data = null;// 共享数据,只能有一个线程能写该数据,但可以有多个线程同时读该数据。
private ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
public void get() {
rwl.readLock().lock();// 上读锁,其他线程只能读不能写
System.out.println(Thread.currentThread().getName()
+ " be ready to read data!");
try {
Thread.sleep((long) (Math.random() * 1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()
+ "have read data :" + data);
rwl.readLock().unlock(); // 释放读锁,最好放在finnaly里面
}
public void put(Object data) {
rwl.writeLock().lock();// 上写锁,不允许其他线程读也不允许写
System.out.println(Thread.currentThread().getName()
+ " be ready to write data!");
try {
Thread.sleep((long) (Math.random() * 1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
this.data = data;
System.out.println(Thread.currentThread().getName()
+ " have write data: " + data);
rwl.writeLock().unlock();// 释放写锁
}
}
运行结果:
Thread-0 be ready to read data!
Thread-1 be ready to read data!
Thread-2 be ready to read data!
Thread-0have read data :null
Thread-2have read data :null
Thread-1have read data :null
Thread-5 be ready to write data!
Thread-5 have write data: 6934
Thread-5 be ready to write data!
Thread-5 have write (王道408考研操作系统)第二章进程管理-第三节9:读者写者问题
经典进程的同步问题(生产者--消费者问题哲学家进餐问题读者--写者问题)
操作系统 | 经典进程的同步问题(生产者--消费者问题哲学家进餐问题读者--写者问题)