生产者消费者模型

Posted jiarui-zjb

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了生产者消费者模型相关的知识,希望对你有一定的参考价值。

1、生产者消费者模型作用和示例如下:
1)通过平衡生产者的生产能力和消费者的消费能力来提升整个系统的运行效率 ,这是生产者消费者模型最重要的作用
2)解耦,这是生产者消费者模型附带的作用,解耦意味着生产者和消费者之间的联系少,联系越少越可以独自发展而不需要收到相互的制约
备注:对于生产者消费者模型的理解将在并发队列BlockingQueue章节进行说明,本章不做详细介绍。

技术分享图片
package threadLearning.productCustomerModel;
/*
    wait/notify 机制:以资源为例,生产者生产一个资源,通知消费者就消费掉一个资源,生产者继续生产资源,消费者消费资源,以此循环。
    wait():使一个线程处于等待(阻塞)状态,并且释放所持有的对象的锁;
    sleep(): 使一个正在运行的线程处于睡眠状态, 是一个静态方法, 调用此方法要处理 InterruptedException 异常;
    notify():唤醒一个处于等待状态的线程,当然在调用此方法的时候,并不能确切的唤醒某一个等待状态的线程而是由 JVM 确定唤醒哪个线程,而且与优先级无关;
    notityAll():唤醒所有处于等待状态的线程,该方法并不是将对象的锁给所有线程,而是让它们竞争,只有获得锁的线程才能进入就绪状态;
备注:java 5 通过 Lock 接口提供了显示的锁机制,Lock 接口中定义了加锁(lock()方法)和解锁(unLock()方法),增强了多线程编程的灵活性及对线程的协调
*/

//资源对象:包含商品名属性;提供生产和消费方法;
class Resource {
    private String name;//商品名
    private int count = 0;
    private boolean flag = false;//生产或者消费的控制开关
    public synchronized void set(String name) {
        // 生产资源
        while (flag) {
            try {
                // 线程等待。消费者消费资源
                wait();
            } catch (Exception e) {
            }
        }
        this.name = name + "---" + count++;
        System.out.println(Thread.currentThread().getName() + "...生产者..."
                + this.name);
        flag = true;
        // 唤醒等待中的消费者
        this.notifyAll();//唤醒在此对象监视器上等待的所有线程    Object.notifyAll()
    }

    public synchronized void out() {
        // 消费资源
        while (!flag) {
            // 线程等待,生产者生产资源
            try {
                wait();
            } catch (Exception e) {
            }
        }
        System.out.println(Thread.currentThread().getName() + "...消费者..."
                + this.name);
        flag = false;
        // 唤醒生产者,生产资源
        this.notifyAll();
    }
}

// 生产者
class Producer implements Runnable {
    private Resource res;

    Producer(Resource res) {
        this.res = res;
    }

    // 生产者生产资源
    public void run() {
        while (true) {
            res.set("商品");
        }
    }
}

// 消费者消费资源
class Consumer implements Runnable {
    private Resource res;

    Consumer(Resource res) {
        this.res = res;
    }

    public void run() {
        while (true) {
            res.out();
        }
    }
}

public class ProducerConsumerDemo {
    public static void main(String[] args) {
        Resource r = new Resource();
        Producer pro = new Producer(r);
        Consumer con = new Consumer(r);
        Thread t1 = new Thread(pro);
        Thread t2 = new Thread(con);
        t1.start();
        t2.start();
    }
}
View Code

2、ThreadLocal
  ThreadLocal提供一个线程的局部变量,访问某个线程拥有自己局部变量。当使用ThreadLocal维护变量时,ThreadLocal为每个使用该变量的线程提供独立的变量副本,所以每一个线程都可以独立地改变自己的副本,而不会影响其它线程所对应的副本。
ThreadLocal的接口方法只有4个方法,先来了解一下:
?void set(Object value)设置当前线程的线程局部变量的值;
?public Object get()该方法返回当前线程所对应的线程局部变量;
?public void remove()将当前线程局部变量的值删除,目的是为了减少内存的占用,该方法是JDK 5.0新增的方法。需要指出的是,当线程结束后,对应该线程的局部变量将自动被垃圾回收,所以显式调用该方法清除线程的局部变量并不是必须的操作,但它可以加快内存回收的速度;
?protected Object initialValue()返回该线程局部变量的初始值,该方法是一个protected的方法,显然是为了让子类覆盖而设计的。这个方法是一个延迟调用方法,在线程第1次调用get()或set(Object)时才执行,并且仅执行1次。ThreadLocal中的缺省实现直接返回一个null;
总的来说ThreadLocal就是一种以 空间换时间 的做法,在每个Thread里面维护了一个以开地址法实现的ThreadLocal.ThreadLocalMap,把数据进行隔离,数据不共享,自然就没有线程安全方面的问题了。
示例1

技术分享图片
package threadLearning.thredLocal;
/*
1、该类提供了线程局部 (thread-local) 变量。这些变量不同于它们的普通对应物,因为访问某个变量(通过其 get 或 set 方法)
的每个线程都有自己的局部变量,它独立于变量的初始化副本。ThreadLocal 实例通常是类中的 private static 字段,
它们希望将状态与某一个线程(例如:用户 ID 或事务 ID)相关联。
2、ThreadLocal的使用
(1) 在关联数据类中创建 private static ThreadLocal
在下面的类中,私有静态 ThreadLocal 实例(serialNum)为调用该类的静态 SerialNum.get() 方法的每个
线程维护了一个“序列号”,该方法将返回当前线程的序列号。(线程的序列号是在第一次调用 SerialNum.get() 时
分配的,并在后续调用中不会更改。 
每个线程都保持对其线程局部变量副本的隐式引用,只要线程是活动的并且 ThreadLocal 实例是可访问的;在线程消失之后,
其线程局部实例的所有副本都会被垃圾回收(除非存在对这些副本的其他引用)。 

*/ 
public class SerialNum {
    private static int nextSerialNum = 3;
    private static ThreadLocal serialNum = new ThreadLocal() {//创建一个线程本地变量
        protected synchronized Object initialValue() {
            return new Integer(nextSerialNum++);
        }
    };

    public static int get() {
        return ((Integer) (serialNum.get())).intValue();
    }
    
    public static void main(String args[]){
        Thread thead1=new Thread(new Runnable() {
            public void run() {
                System.out.println("thead1-->"+get());                
            }
        });
        Thread thead2=new Thread(new Runnable() {
            public void run() {
                System.out.println("thead2-->"+get());                
            }
        });
        thead1.start();
        thead2.start();
        /*
    同一个Thread启动第二次会报错java.lang.IllegalThreadStateExceptionThread报错的原因,并不是说,重新启动Thread导致的,
而是因为共用一个Thread导致的,因为,如果是实现Runnable的类,每次启动线程都需要new Thread(Runnable).start(),这就使得线
程没有被共用。
        while(true){
            thead2.start();
            
        }
        */
    }
}
View Code

示例2

技术分享图片
package threadLearning.thredLocal;
class Res {
    // 生成序列号共享变量
    public static Integer count = 0;
    public static ThreadLocal<Integer> threadLocal = new ThreadLocal<Integer>() {
        // 覆盖返回此线程局部变量的当前线程的“初始值”方法
        @Override
        protected Integer initialValue() {
            return 0;
        };
    };

    public Integer getNum() {
        int count = threadLocal.get() + 1;//get() 该方法返回当前线程所对应的线程局部变量
        threadLocal.set(count);//将此线程局部变量的当前线程副本中的值设置为指定值
        return count;
    }
}

public class ThreadLocaDemo2 extends Thread {
    private Res res;

    public ThreadLocaDemo2(Res res) {
        this.res = res;
    }

    @Override
    public void run() {
        for (int i = 0; i < 3; i++) {
            System.out.println(Thread.currentThread().getName() + "---" + "i---" + i + "--num:" + res.getNum());
        }

    }

    public static void main(String[] args) {
        Res res = new Res();
        ThreadLocaDemo2 threadLocaDemo1 = new ThreadLocaDemo2(res);
        ThreadLocaDemo2 threadLocaDemo2 = new ThreadLocaDemo2(res);
        ThreadLocaDemo2 threadLocaDemo3 = new ThreadLocaDemo2(res);
                      //各个线程都具有线程局部变量threadLocal值,不会出现线程安全问题
        threadLocaDemo1.start();
        threadLocaDemo2.start();
        threadLocaDemo3.start();
    }

}
View Code











以上是关于生产者消费者模型的主要内容,如果未能解决你的问题,请参考以下文章

golang生产者消费者模型示例代码

LINUX多线程(生产者消费者模型,POXIS信号量)

转: Java并发编程之十三:生产者—消费者模型(含代码)

Java生产消费者模型——代码解析

生产者消费者模型

生产者消费者模型-Java代码实现