生产者/消费者问题的多种Java实现方式

Posted huxipeng

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了生产者/消费者问题的多种Java实现方式相关的知识,希望对你有一定的参考价值。

(1)wait() / notify()方法

(2)await() / signal()方法

(3)BlockingQueue阻塞队列方法

(4)PipedInputStream / PipedOutputStream

本文只介绍最常用的前三种,第四种暂不做讨论

第一种:BlockingQueue阻塞队列方法

  1 class Task {
  2 
  3     private int id;
  4     private int value;
  5 
  6     public int getId() {
  7         return id;
  8     }
  9 
 10     public void setId(int id) {
 11         this.id = id;
 12     }
 13 
 14     public int getValue() {
 15         return value;
 16     }
 17 
 18     public void setValue(int value) {
 19         this.value = value;
 20     }
 21 }
 22 
 23 /**
 24  * 生产者
 25  */
 26 class Provider implements Runnable{
 27 
 28     private BlockingQueue<Task> queue;
 29 
 30     private static AtomicInteger newId = new AtomicInteger();
 31 
 32     private static Random random = new Random();
 33 
 34     private volatile boolean isRunning = true;
 35 
 36     public Provider(BlockingQueue<Task> queue) {
 37         this.queue = queue;
 38     }
 39 
 40     @Override
 41     public void run() {
 42         while (isRunning){
 43             try {
 44                 Thread.sleep(random.nextInt(1000));
 45                 int id = newId.incrementAndGet();
 46                 Task task = new Task();
 47                 task.setId(id);
 48                 task.setValue(random.nextInt(1000));
 49                 System.out.println("当前线程:" + Thread.currentThread().getName() + ", 获取了数据,id为:" + id + ", 进行装载到公共缓冲区中...");
 50                 this.queue.put(task);
 51             } catch (Exception e){
 52                 e.printStackTrace();
 53             }
 54         }
 55     }
 56 
 57     public void stop() {
 58         this.isRunning = false;
 59     }
 60 }
 61 
 62 /**
 63  * 消费者
 64  */
 65 class Consumer implements Runnable{
 66 
 67     private BlockingQueue<Task> queue;
 68 
 69     public Consumer(BlockingQueue queue) {
 70         this.queue = queue;
 71     }
 72 
 73     @Override
 74     public void run() {
 75         while (true){
 76             try {
 77                 Task task = queue.take();
 78                 Thread.sleep(new Random().nextInt(1000));
 79                 System.out.println("当前消费线程:" + Thread.currentThread().getName() + ", 消费成功,消费数据为id: " + task.getId());
 80             } catch (Exception e) {
 81                  e.printStackTrace();
 82             }
 83         }
 84 
 85     }
 86 }
 87 
 88 public static void main(String[] args) {
 89 
 90         ArrayBlockingQueue<Task> queue = new ArrayBlockingQueue<Task>(10);
 91         Provider provider = new Provider(queue);
 92         Provider provider1 = new Provider(queue);
 93         Provider provider2 = new Provider(queue);
 94 
 95         Consumer consumer = new Consumer(queue);
 96         Consumer consumer1 = new Consumer(queue);
 97         Consumer consumer2 = new Consumer(queue);
 98 
 99         ThreadPoolExecutor executor = new ThreadPoolExecutor(5, Integer.MAX_VALUE,
100                 60L, TimeUnit.SECONDS,
101                 new ArrayBlockingQueue<Runnable>(12));
102         executor.execute(provider);
103         executor.execute(provider1);
104         executor.execute(provider2);
105         executor.execute(consumer);
106         executor.execute(consumer1);
107         executor.execute(consumer2);
108 
109 
110         try {
111             Thread.sleep(3000);
112         } catch (Exception e) {
113             e.printStackTrace();
114         }
115         provider.stop();
116         provider1.stop();
117         provider2.stop();
118 
119         try {
120             Thread.sleep(2000);
121         } catch (InterruptedException e) {
122             e.printStackTrace();
123         }
124     }

运行结果:

当前线程:pool-1-thread-2, 获取了数据,id为:1, 进行装载到公共缓冲区中...
当前线程:pool-1-thread-1, 获取了数据,id为:2, 进行装载到公共缓冲区中...
当前线程:pool-1-thread-3, 获取了数据,id为:3, 进行装载到公共缓冲区中...
当前线程:pool-1-thread-3, 获取了数据,id为:4, 进行装载到公共缓冲区中...
当前线程:pool-1-thread-3, 获取了数据,id为:5, 进行装载到公共缓冲区中...
当前消费线程:pool-1-thread-5, 消费成功,消费数据为id: 1
当前消费线程:pool-1-thread-4, 消费成功,消费数据为id: 2
当前线程:pool-1-thread-2, 获取了数据,id为:6, 进行装载到公共缓冲区中...
当前线程:pool-1-thread-1, 获取了数据,id为:7, 进行装载到公共缓冲区中...
当前线程:pool-1-thread-2, 获取了数据,id为:8, 进行装载到公共缓冲区中...
当前消费线程:pool-1-thread-4, 消费成功,消费数据为id: 4
当前线程:pool-1-thread-1, 获取了数据,id为:9, 进行装载到公共缓冲区中...
当前消费线程:pool-1-thread-5, 消费成功,消费数据为id: 3
当前线程:pool-1-thread-3, 获取了数据,id为:10, 进行装载到公共缓冲区中...
当前线程:pool-1-thread-2, 获取了数据,id为:11, 进行装载到公共缓冲区中...
当前消费线程:pool-1-thread-5, 消费成功,消费数据为id: 6
当前消费线程:pool-1-thread-4, 消费成功,消费数据为id: 5
当前线程:pool-1-thread-3, 获取了数据,id为:12, 进行装载到公共缓冲区中...
当前线程:pool-1-thread-1, 获取了数据,id为:13, 进行装载到公共缓冲区中...
当前线程:pool-1-thread-2, 获取了数据,id为:14, 进行装载到公共缓冲区中...
当前线程:pool-1-thread-2, 获取了数据,id为:15, 进行装载到公共缓冲区中...
当前线程:pool-1-thread-3, 获取了数据,id为:16, 进行装载到公共缓冲区中...
当前消费线程:pool-1-thread-4, 消费成功,消费数据为id: 8
当前消费线程:pool-1-thread-4, 消费成功,消费数据为id: 10
当前线程:pool-1-thread-1, 获取了数据,id为:17, 进行装载到公共缓冲区中...
当前消费线程:pool-1-thread-5, 消费成功,消费数据为id: 7
当前消费线程:pool-1-thread-3, 消费成功,消费数据为id: 9
当前消费线程:pool-1-thread-3, 消费成功,消费数据为id: 13
当前消费线程:pool-1-thread-4, 消费成功,消费数据为id: 11
当前线程:pool-1-thread-2, 获取了数据,id为:18, 进行装载到公共缓冲区中...
当前消费线程:pool-1-thread-3, 消费成功,消费数据为id: 14
当前消费线程:pool-1-thread-5, 消费成功,消费数据为id: 12
当前消费线程:pool-1-thread-3, 消费成功,消费数据为id: 16
当前消费线程:pool-1-thread-3, 消费成功,消费数据为id: 18
当前消费线程:pool-1-thread-4, 消费成功,消费数据为id: 15
当前消费线程:pool-1-thread-5, 消费成功,消费数据为id: 17

 第二种:wait() / notify()方法

  1 /**
  2  * 仓库
  3  */
  4 class Storage {
  5 
  6     private LinkedList list = new LinkedList(); // 存放产品的
  7 
  8     private final int MAX_NUM = 100;
  9 
 10 
 11     public void product(int num) {
 12 
 13         synchronized (list) {
 14 
 15             while (list.size() + num > MAX_NUM) {
 16                 try {
 17                     list.wait();
 18                 } catch (Exception e) {
 19                     e.printStackTrace();
 20                 }
 21                 System.out.println("库存达到最大,不能生产");
 22 
 23             }
 24 
 25             // 说明库存是够得,那么生产
 26             for (int i = 0; i < num; i++) {
 27                 list.add(new Object());
 28             }
 29 
 30             try {
 31                 Thread.sleep(2000);
 32             } catch (Exception e) {
 33                 e.printStackTrace();
 34             }
 35 
 36             System.out.println(Thread.currentThread().getName() + "生产完成,现在库存" + list.size());
 37 
 38             list.notifyAll();
 39 
 40         }
 41     }
 42 
 43     public void consume(int num) {
 44 
 45         synchronized (list) {
 46 
 47             while (list.size() < num) {
 48                 System.out.println("库存不足");
 49                 try {
 50                     list.wait();
 51                 } catch (Exception e) {
 52                     e.printStackTrace();
 53                 }
 54             }
 55 
 56             for (int i = 0; i < num; i++) {
 57                 list.remove();
 58             }
 59             try {
 60                 Thread.sleep(1000);
 61             } catch (Exception e) {
 62                 e.printStackTrace();
 63             }
 64 
 65             System.out.println(Thread.currentThread().getName() + "消费完成"+ num +",现在库存" + list.size());
 66             list.notifyAll();
 67         }
 68     }
 69 
 70     public LinkedList getList() {
 71         return list;
 72     }
 73 
 74     public void setList(LinkedList list) {
 75         this.list = list;
 76     }
 77 
 78 }
 79 
 80 class Producter1 implements Runnable {
 81 
 82     private int num;
 83     private Storage storage;
 84 
 85     public Producter1(int num, Storage storage) {
 86         this.num = num;
 87         this.storage = storage;
 88     }
 89 
 90     @Override
 91     public void run() {
 92         storage.product(num);
 93     }
 94 
 95     public int getNum() {
 96         return num;
 97     }
 98 
 99     public void setNum(int num) {
100         this.num = num;
101     }
102 
103     public Storage getStorage() {
104         return storage;
105     }
106 
107     public void setStorage(Storage storage) {
108         this.storage = storage;
109     }
110 }
111 
112 class Consumer1 implements Runnable {
113 
114     private int num;
115 
116     private Storage storage;
117 
118     public Consumer1(int num, Storage storage) {
119         this.num = num;
120         this.storage = storage;
121     }
122 
123     @Override
124     public void run() {
125         storage.consume(num);
126     }
127 
128     public int getNum() {
129         return num;
130     }
131 
132     public void setNum(int num) {
133         this.num = num;
134     }
135 
136     public Storage getStorage() {
137         return storage;
138     }
139 
140     public void setStorage(Storage storage) {
141         this.storage = storage;
142     }
143 }
144 
145 public static void main(String[] args) {
146 
147         Storage storage = new Storage();
148 
149         Producter1 producter = new Producter1(10,storage);
150         Producter1 producter1 = new Producter1(10,storage);
151         Producter1 producter2 = new Producter1(10,storage);
152         Producter1 producter3 = new Producter1(10,storage);
153         Producter1 producter4 = new Producter1(10,storage);
154         Producter1 producter5 = new Producter1(10,storage);
155         Producter1 producter6 = new Producter1(10,storage);
156 
157         Consumer1 consumer1 = new Consumer1(20,storage);
158         Consumer1 consumer2 = new Consumer1(30,storage);
159         Consumer1 consumer3 = new Consumer1(20,storage);
160 
161         ThreadPoolExecutor executor = new ThreadPoolExecutor(10, Integer.MAX_VALUE,
162                 60L, TimeUnit.SECONDS,
163                 new ArrayBlockingQueue<Runnable>(12));
164 
165         executor.execute(producter);
166         executor.execute(producter1);
167         executor.execute(producter2);
168         executor.execute(producter3);
169         executor.execute(producter4);
170         executor.execute(producter5);
171         executor.execute(producter6);
172         executor.execute(consumer1);
173         executor.execute(consumer2);
174         executor.execute(consumer3);
175     }

运行结果:

pool-1-thread-1生产完成,现在库存10
pool-1-thread-3生产完成,现在库存20
pool-1-thread-7生产完成,现在库存30
pool-1-thread-9消费完成30,现在库存0
pool-1-thread-5生产完成,现在库存10
库存不足
pool-1-thread-6生产完成,现在库存20
pool-1-thread-4生产完成,现在库存30
pool-1-thread-10消费完成20,现在库存10
pool-1-thread-2生产完成,现在库存20
pool-1-thread-8消费完成20,现在库存0

 第三种:await() / signal()方法

class Storage1 {

    private final int MAX_NUM = 100;

    private LinkedList list = new LinkedList();

    private final Lock lock = new ReentrantLock();

    private final Condition notEmpty = lock.newCondition();

    private final Condition notFull = lock.newCondition();

    public void product(int num) {

        lock.lock();

        try {
            while (list.size() + num > MAX_NUM) {
                notFull.await();
                System.out.println("库存达到最大,不能生产");
            }

        } catch (Exception e) {
            e.printStackTrace();
        }

        // 说明库存是够得,那么生产
        for (int i = 0; i < num; i++) {
            list.add(new Object());
        }

        try {
           // Thread.sleep(2000);
        } catch (Exception e) {
            e.printStackTrace();
        }

        System.out.println(Thread.currentThread().getName() + "生产完成,现在库存" + list.size());
        notFull.signalAll();
        notEmpty.signalAll();

        lock.unlock();

    }

    public void consume(int num) {

        lock.lock();

        try {
            while (list.size() < num) {
                notEmpty.await();
                System.out.println("库存不足");
            }

        } catch (Exception e) {
            e.printStackTrace();
        }

        for (int i = 0; i < num; i++) {
            list.remove();
        }
        try {
            Thread.sleep(1000);
        } catch (Exception e) {
            e.printStackTrace();
        }

        System.out.println(Thread.currentThread().getName() + "消费完成"+ num +",现在库存" + list.size());

        notFull.signalAll();
        notEmpty.signalAll();

        lock.unlock();
    }

}
class Producter2 implements Runnable {

    private int num;
    private Storage1 storage;

    public Producter2(int num, Storage1 storage) {
        this.num = num;
        this.storage = storage;
    }

    @Override
    public void run() {
        storage.product(num);
    }

    public int getNum() {
        return num;
    }

    public void setNum(int num) {
        this.num = num;
    }

    public Storage1 getStorage() {
        return storage;
    }

    public void setStorage(Storage1 storage) {
        this.storage = storage;
    }
}

class Consumer2 implements Runnable {

    private int num;

    private Storage1 storage;

    public Consumer2(int num, Storage1 storage) {
        this.num = num;
        this.storage = storage;
    }

    @Override
    public void run() {
        storage.consume(num);
    }

    public int getNum() {
        return num;
    }

    public void setNum(int num) {
        this.num = num;
    }

    public Storage1 getStorage() {
        return storage;
    }

    public void setStorage(Storage1 storage) {
        this.storage = storage;
    }
}

public static void main(String[] args) {
        Storage1 storage = new Storage1();

        Producter2 producter = new Producter2(10,storage);
        Producter2 producter1 = new Producter2(10,storage);
        Producter2 producter2 = new Producter2(10,storage);
        Producter2 producter3 = new Producter2(10,storage);
        Producter2 producter4 = new Producter2(10,storage);
        Producter2 producter5 = new Producter2(10,storage);
        Producter2 producter6 = new Producter2(10,storage);

        Consumer2 consumer1 = new Consumer2(20,storage);
        Consumer2 consumer2 = new Consumer2(30,storage);
        Consumer2 consumer3 = new Consumer2(20,storage);

        ThreadPoolExecutor executor = new ThreadPoolExecutor(10, Integer.MAX_VALUE,
                60L, TimeUnit.SECONDS,
                new ArrayBlockingQueue<Runnable>(12));

        executor.execute(producter);
        executor.execute(producter1);
        executor.execute(producter2);
        executor.execute(producter3);
        executor.execute(producter4);
        executor.execute(producter5);
        executor.execute(producter6);
        executor.execute(consumer1);
        executor.execute(consumer2);
        executor.execute(consumer3);
    }

运行结果:

pool-1-thread-1生产完成,现在库存10
pool-1-thread-3生产完成,现在库存20
pool-1-thread-4生产完成,现在库存30
pool-1-thread-5生产完成,现在库存40
pool-1-thread-9消费完成30,现在库存10
pool-1-thread-7生产完成,现在库存20
pool-1-thread-8消费完成20,现在库存0
pool-1-thread-2生产完成,现在库存10
pool-1-thread-6生产完成,现在库存20
pool-1-thread-10消费完成20,现在库存0

 






























































以上是关于生产者/消费者问题的多种Java实现方式的主要内容,如果未能解决你的问题,请参考以下文章

java线程之间通信,多种方式实现生产者消费者模式

kafka生产者多种实现方式

java实现生产者和消费者问题的几种方式

Carson带你学Java:解决生产者消费者问题的五种实现方式

Carson带你学Java:解决生产者消费者问题的五种实现方式

Carson带你学Java:解决生产者消费者问题的五种实现方式