Java多线程-----实现生产者消费者模式的几种方式

Posted fengfuwanliu

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java多线程-----实现生产者消费者模式的几种方式相关的知识,希望对你有一定的参考价值。

   1 生产者消费者模式概述

     生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,

直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。

     在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,

才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这种生产消费能力不均衡的问题,所以便有了生产者和消费者模式

   2 实现生产者消费者模式

   产品类

package com.thread.pc.blockingqueue;

import java.util.UUID;

/**
 * 产品类
 * 
 * @author yyx 2018年12月22日
 */
public class Product {
    private UUID proCode; // 产品唯一编码

    public Product(UUID proCode) {
        super();
        this.proCode = proCode;
    }

    public UUID getProCode() {
        return proCode;
    }

    public void setProCode(UUID proCode) {
        this.proCode = proCode;
    }

}

   生产者

package com.thread.pc.blockingqueue;

/**
 * 生产者
 * @author yyx 2018年12月22日
 */
public class Producer implements Runnable {
    private Warehouse warehouse;

    public Producer(Warehouse warehouse) {
        super();
        this.warehouse = warehouse;
    }

    @Override
    public void run() {
        while (true) {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            warehouse.addProduct();
        }
    }

}

   消费者

package com.thread.pc.blockingqueue;
/**
 * 消费者
 * @author yyx 2018年12月22日
 */
public class Consumer implements Runnable {
    private Warehouse warehouse;

    public Consumer(Warehouse warehouse) {
        super();
        this.warehouse = warehouse;
    }

    @Override
    public void run() {
        while (true) {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            warehouse.removeProduct();
        }
    }
}

2.1 使用lock、condition和await、singalAll

   仓库类

package com.thread.pc.lockcondition;

import java.util.List;
import java.util.UUID;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

/**
 * 仓库类
 * 
 * @author yyx 2018年12月22日
 */
public class Warehouse {
    private final int MAX_SIZE = 10;
    private List<Product> listProduct;
    private Lock lock;
    private Condition producerCondition;
    private Condition consumerCondition;

    public Warehouse(List<Product> listProduct, Lock lock, Condition producerCondition, Condition consumerCondition) {
        super();
        this.listProduct = listProduct;
        this.lock = lock;
        this.producerCondition = producerCondition;
        this.consumerCondition = consumerCondition;
    }

    public void addProduct() {
        lock.lock();
        try {
            String currentName = Thread.currentThread().getName();
            if (listProduct.size() >= MAX_SIZE) {
                try {
                    System.out.println("产品列表已满,不再生产!" + currentName + "进入等待");
                    producerCondition.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            } else {
                Product product = new Product(UUID.randomUUID());
                System.out.println(currentName + "生产了一个产品,它的编号是:" + product.getProCode().toString());
                listProduct.add(product);
                consumerCondition.signalAll();
            }
        } finally {
            lock.unlock();
        }
    }

    public void removeProduct() {
        lock.lock();
        try {
            String currentName = Thread.currentThread().getName();
            if (listProduct.size() <= 0) {
                try {
                    System.out.println("产品列表不足,不再消费!" + currentName + "进入等待");
                    consumerCondition.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            } else {
                Product product = listProduct.get(0);
                System.out.println(currentName + "消费了一个产品,它的编号是:" + product.getProCode().toString());
                listProduct.remove(0);
                producerCondition.signalAll();
            }
        } finally {
            lock.unlock();
        }
    }
}

   测试类

package com.thread.pc.lockcondition;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
 * 测试类
 * @author 2018年12月22日
 */
public class TestModel {
    public static void main(String[] args) {
        List<Product> listProduct=new ArrayList<Product>();
        Lock lock = new ReentrantLock();
        Condition producerCondition = lock.newCondition();
        Condition consumerCondition = lock.newCondition();
        
        Warehouse warehouse = new Warehouse(listProduct,lock, producerCondition, consumerCondition);
        Producer producer = new Producer(warehouse);
        Consumer consumer = new Consumer(warehouse);
        
        new Thread(producer).start();
        new Thread(producer).start();
        new Thread(consumer).start();
    }
}

2.2 使用synchronized修饰方法

   仓库类

package com.thread.pc.synchronizedmethod;

import java.util.List;
import java.util.UUID;

/**
 * 仓库类
 * 
 * @author yyx 2018年12月22日
 */
public class Warehouse {
    private final int MAX_SIZE = 10;
    private List<Product> listProduct;

    public Warehouse(List<Product> listProduct) {
        super();
        this.listProduct = listProduct;
    }

    public void addProduct() {
        synchronized (listProduct) {
            String currentName = Thread.currentThread().getName();
            if (listProduct.size() >= MAX_SIZE) {
                try {
                    System.out.println("产品列表已满,不再生产!" + currentName + "进入等待");
                    listProduct.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            } else {
                Product product = new Product(UUID.randomUUID());
                System.out.println(currentName + "生产了一个产品,它的编号是:" + product.getProCode().toString());
                listProduct.add(product);
                listProduct.notifyAll();
            }
        }
    }

    public void removeProduct() {
        synchronized (listProduct) {
            String currentName = Thread.currentThread().getName();
            if (listProduct.size() <= 0) {
                try {
                    System.out.println("产品列表不足,不再消费!" + currentName + "进入等待");
                    listProduct.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            } else {
                Product product = listProduct.get(0);
                System.out.println(currentName + "消费了一个产品,它的编号是:" + product.getProCode().toString());
                listProduct.remove(0);
                listProduct.notifyAll();
            }
        }
    }
}

   测试类

package com.thread.pc.synchronizedmethod;

import java.util.ArrayList;
import java.util.List;

/**
 * 测试类
 * 
 * @author yyx 2018年12月22日
 */
public class TestModel {
    public static void main(String[] args) {
        List<Product> listProduct = new ArrayList<Product>();

        Warehouse warehouse = new Warehouse(listProduct);
        Producer producer = new Producer(warehouse);
        Consumer consumer = new Consumer(warehouse);

        new Thread(producer).start();
        new Thread(producer).start();
        new Thread(consumer).start();
        new Thread(consumer).start();
    }
}

2.3 使用synchronized修饰代码块

   仓库类

package com.thread.pc.synchronizedcodeblock;

import java.util.List;
import java.util.UUID;
/**
 * 仓库类
 * @author yyx 2018年12月22日
 */
public class Warehouse {
    private final int MAX_SIZE = 10;
    private List<Product> listProduct;

    public Warehouse(List<Product> listProduct) {
        super();
        this.listProduct = listProduct;
    }

    public synchronized void addProduct() {
        String currentName = Thread.currentThread().getName();
        if (listProduct.size() >= MAX_SIZE) {
            try {
                System.out.println("产品列表已满,不再生产!" + currentName + "进入等待");
                wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        } else {
            Product product = new Product(UUID.randomUUID());
            System.out.println(currentName + "生产了一个产品,它的编号是:" + product.getProCode().toString());
            listProduct.add(product);
            notifyAll();
        }
    }

    public synchronized void removeProduct() {
        String currentName = Thread.currentThread().getName();
        if (listProduct.size() <= 0) {
            try {
                System.out.println("产品列表不足,不再消费!" + currentName + "进入等待");
                wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        } else {
            Product product = listProduct.get(0);
            System.out.println(currentName + "消费了一个产品,它的编号是:" + product.getProCode().toString());
            listProduct.remove(0);
            notifyAll();
        }
    }
}

   测试类

package com.thread.pc.synchronizedcodeblock;

import java.util.ArrayList;
import java.util.List;
/**
 * 测试类
 * @author yyx 2018年12月22日
 */
public class TestModel {
    public static void main(String[] args) {
        List<Product> listProduct=new ArrayList<Product>();
        
        Warehouse warehouse = new Warehouse(listProduct);
        Producer producer = new Producer(warehouse);
        Consumer consumer = new Consumer(warehouse);
        
        new Thread(producer).start();
        new Thread(producer).start();
        new Thread(consumer).start();
    }
}

2.4 使用BlockingQueue

   仓库类

package com.thread.pc.blockingqueue;

import java.util.UUID;
import java.util.concurrent.BlockingQueue;
/**
 * 仓库
 * @author yyx 2018年12月22日
 */
public class Warehouse {
    private final int MAX_SIZE = 10;
    private BlockingQueue<Product> blockingQueue;

    public Warehouse(BlockingQueue<Product> blockingQueue) {
        super();
        this.blockingQueue = blockingQueue;
    }

    public void addProduct() {
        String currentName = Thread.currentThread().getName();
        if (blockingQueue.size() >= MAX_SIZE) {
            System.out.println("产品列表已满,不再生产!" + currentName + "进入等待");
        } else {
            Product product = new Product(UUID.randomUUID());
            System.out.println(currentName + "生产了一个产品,它的编号是:" + product.getProCode().toString());
            try {
                blockingQueue.put(product);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public void removeProduct() {
        String currentName = Thread.currentThread().getName();
        if (blockingQueue.size() <= 0) {
            System.out.println("产品列表不足,不再消费!" + currentName + "进入等待");
        } else {
            try {
                Product product = blockingQueue.take();
                System.out.println(currentName + "消费了一个产品,它的编号是:" + product.getProCode().toString());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

   测试类

package com.thread.pc.blockingqueue;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
/**
 * 测试类
 * @author yyx 2018年12月22日
 */
public class TestModel {
    public static void main(String[] args) {
        BlockingQueue<Product> blockingQueue = new LinkedBlockingQueue<>(10);
        
        Warehouse warehouse = new Warehouse(blockingQueue);
        Producer producer = new Producer(warehouse);
        Consumer consumer = new Consumer(warehouse);
        
        new Thread(producer).start();
        new Thread(producer).start();
        new Thread(consumer).start();
    }
}

 

以上是关于Java多线程-----实现生产者消费者模式的几种方式的主要内容,如果未能解决你的问题,请参考以下文章

Java的多线程实现生产/消费模式

java 多线程并发系列之 生产者消费者模式的两种实现

C++多线程怎么实现

Java多线程-生产者/消费者模式实现

java 多线程 22 :生产者/消费者模式 进阶 利用await()/signal()实现

java中的多线程的实现生产者消费者模式