JavaLearn#(16)多线程提升训练:生产者和消费者问题Lock锁ReadWriteLockBlockingQueuevolatile线程池线程同步练习

Posted LRcoding

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了JavaLearn#(16)多线程提升训练:生产者和消费者问题Lock锁ReadWriteLockBlockingQueuevolatile线程池线程同步练习相关的知识,希望对你有一定的参考价值。

1. 生产者消费者扩展

1.1 多个生产者、多个消费者

由一个生产者、一个消费者、一个商品 ==》 扩展为多个生产者、多个消费者、多个商品

  • 最多有 10 个商品,最少有 0 个商品
  • 已经有 10 个商品后,生产者就不再生产,还要通知消费者进行消费
  • 没有商品时,消费者不再消费,还要通知生产者进行生产

生产者线程(任务)ProduceRunnable

public class ProduceRunnable implements Runnable
    private ProductFactory  factory;

    public ProduceRunnable() 
    

    // 使用构造方法的方式赋值,保证为同一个商品
    public ProduceRunnable(ProductFactory factory) 
        this.factory = factory;
    

    @Override
    public void run() 
        int i = 0;
        while (true) 
            try 
                Thread.sleep(1000);
             catch (InterruptedException e) 
                e.printStackTrace();
            
            // 生产一个商品
            factory.produce("生产商品" + i);
            i++;
        
    

消费者线程(任务)ConsumeRunnable

public class ConsumeRunnable implements Runnable
    private ProductFactory factory;

    public ProductFactory getProductFactory() 
        return factory;
    

    // 使用 set 方法的形式赋值
    public void setProductFactory(ProductFactory productFactory) 
        this.factory = productFactory;
    

    @Override
    public void run() 
        int i = 0;
        while (true) 
            try 
                Thread.sleep(1000);
             catch (InterruptedException e) 
                e.printStackTrace();
            
            // 消费一个商品
            factory.consume("消费一个商品" + i);
            i++;
        
    

ProductFactory商品工厂类

public class ProductFactory 
    /** 存储商品 */
    List<String> list = new LinkedList<>();
    int max = 10;

    /** 生产商品 */
    public synchronized void produce(String name) 
        // 仓库已满,停止生产
        while (list.size() == max)  // 使用 while 循环,当一直为满的状态时,一直等待
            try 
                this.wait();
             catch (InterruptedException e) 
                e.printStackTrace();
            
        
        list.add(name);
        System.out.println(Thread.currentThread().getName() + "【生产】了商品,当前商品总数:" + list.size());

        // 生产满后,通知消费者
        if (list.size() == max) 
            this.notifyAll();   // 唤醒所有,保证肯定可以唤醒对方的线程
        
    

    /** 消费商品 */
    public synchronized void consume(String name) 
        // 仓库为空,就等待
        while (list.size() == 0) 
            try 
                this.wait();
             catch (InterruptedException e) 
                e.printStackTrace();
            
        

        list.remove(0);
        System.out.println(Thread.currentThread().getName() + "【消费】了商品,当前商品总数:" + list.size());

        // 仓库为空,通知生产者
        if (list.size() == 0) 
            this.notifyAll();
        
    

测试类

public static void main(String[] args) 
    ProductFactory factory = new ProductFactory();

    // 创建 10 个生产者线程,并启动
    Runnable runnable1 = new ProduceRunnable(factory);
    for (int i = 0; i < 10; i++) 
        new Thread(runnable1, "生产者" + i).start();
    

    // 创建 20 个消费者线程,并启动
    ConsumeRunnable runnable2 = new ConsumeRunnable();
    runnable2.setProductFactory(factory);
    for (int i = 0; i < 20; i++) 
        new Thread(runnable2, "消费者" + i).start();
    

1.2 使用匿名内部类

可以将 ConsumeRunnable 和 ProduceRunnable 两个线程要做的任务类,在测试类中,直接写成匿名内部类的方式进行实现

  • 匿名内部类中使用的局部变量,必须为 final 的, JDK8中可以省略 final
public static void main(String[] args) 
    ProductFactory factory = new ProductFactory();

    // 创建 10 个生产者线程,并启动   【匿名内部类】
    Runnable runnable1 = new Runnable() 
        @Override
        public void run() 
            int i = 0;
            while (true) 
                try 
                    Thread.sleep(1000);
                 catch (InterruptedException e) 
                    e.printStackTrace();
                
                // 生产一个商品
                factory.produce("生产商品" + i);   // factory直接为本类的, 修饰为 final 的
                i++;
            
        
    ;
    for (int i = 0; i < 10; i++) 
        new Thread(runnable1, "生产者" + i).start();
    


    // 创建 20 个消费者线程,并启动  【匿名内部类】
    Runnable runnable2 = new Runnable() 
        @Override
        public void run() 
            int i = 0;
            while (true) 
                try 
                    Thread.sleep(1000);
                 catch (InterruptedException e) 
                    e.printStackTrace();
                
                // 消费一个商品
                factory.consume("消费一个商品" + i);   // factory直接为本类的
                i++;
            
        
    ;
    for (int i = 0; i < 20; i++) 
        new Thread(runnable2, "消费者" + i).start();
    

1.3 使用 Lock 锁

进一步优化:每次唤醒线程都是唤醒所有生产者和消费者( this.notifyAll() ) ==》 使用Lock锁 + Condition解决

public class ProductFactory 
    /** 存储商品 */
    List<String> list = new LinkedList<>();
    int max = 10;

    // !!! 使用 Lock 锁 + Condition
    Lock lock = new ReentrantLock();
    Condition produceCondition = lock.newCondition();
    Condition consumeCondition = lock.newCondition();

    /** 生产商品 */
    public void produce(String name) 
        lock.lock();
        try 
            // 仓库已满,停止生产
            while (list.size() == max) 
                try 
                    produceCondition.await();          // 生产者进入自己的等待队列
                 catch (InterruptedException e) 
                    e.printStackTrace();
                
            
            list.add(name);
            System.out.println(Thread.currentThread().getName() + "【生产】了商品,当前商品总数:" + list.size());

            // 生产一个后,就唤醒消费者
            consumeCondition.signal();                 // 唤醒消费者等待队列中的随机一个
         finally 
            lock.unlock();
        
    

    /** 消费商品 */
    public void consume(String name) 
        lock.lock();
        try 
            // 仓库为空,就等待
            while (list.size() == 0) 
                try 
                    consumeCondition.await();
                 catch (InterruptedException e) 
                    e.printStackTrace();
                
            

            list.remove(0);
            System.out.println(Thread.currentThread().getName() + "【消费】了商品,当前商品总数:" + list.size());

            // 消费一个后,就通知生产者进行生产
            produceCondition.signal();
         finally 
            lock.unlock();
        
    

  • 使用 synchronized 不需要用户去手动释放锁,它是Java语言的关键字,当出现异常时,JVM会自动释放被占用的锁
  • Lock 是一个类,必须用户手动去释放锁,不然可能会出现死锁的现象
  • Condition 是 JDK1.5中出现的,使用 await(), signal()实现线程间协作更加安全和高效
    • 可以实现一个同步队列和多个等待队列,从而能够更精准的控制多线程的休眠与唤醒
    • 必须在 lock.lock() 和 lock.unlock() 之间才可以使用

2. 认识 Lock 锁API

2.1 Lock

  • 可重入锁:Lock 、ReadWriteLock 、synchronized 都是可重入锁(自己可进入自己的锁)
  • 独占锁和共享锁:WriteLock、ReentrantLock、synchronized 独占锁 ReadLock 共享锁
  • 公平锁和非公平锁
Lock lock = new ReentrantLock();
// 获取锁1: 拿不到锁就一直等待,拿到就执行后续代码
lock.lock();
// 获取锁2: 拿不到就返回false,拿到就返回true
lock.tryLock();
// 获取锁3: 如果拿不到锁,就尝试指定的时间,时间到了还是没拿到,才放弃
lock.tryLock(10, TimeUnit.MICROSECONDS);
// 获取锁4: 拿不到锁就一直等待,中途可以被其他线程中断
lock.lockInterruptibly();

// 解锁
lock.unlock();

// 创建一个等待队列
lock.newCondition();

2.2 ReadWriteLock

ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
// 写锁
Lock readLock = rwLock.readLock();
Lock readLock1 = rwLock.readLock();
System.out.println(readLock1 == readLock);     // !!! 返回的是【同一把】锁
// 读锁
Lock writeLock = rwLock.writeLock();

实例:多个读操作可以同时进行,读写操作、写写操作都是互斥的

public class TestReadWriteLock 
    public static void main(String[] args) 
        final Operator operator = new Operator();
        // 创建 5 个读线程并启动
        Runnable readRunnable = new Runnable() 
            @Override
            public void run() 
                operator.read();
            
        ;
        for (int i = 0; i < 10; i++) 
            new Thread(readRunnable, "读线程" + i).start(); // 5个线程同时进行一个读操作
        

        // 创建 5 个写线程并启动
        Runnable writeRunnable = new Runnable() 
            @Override
            public void run() 
                operator.write();
            
        ;
        for (int i = 0; i < 10; i++) 
            new Thread(writeRunnable, "写线程" + i).start();
        
    


class Operator 

    // private Lock lock = new ReentrantLock();     !!! 使用 Lock 会导致读线程进入后,必须执行完才能执行其他的

    private ReadWriteLock rwLock = new ReentrantReadWriteLock();   // !!! 使用 ReadWriteLock 可以达到多个读同时进行,写操作互斥

    public void read() 
        // lock.lock();
        rwLock.readLock().lock();
        try 
            System.out.println(Thread.currentThread().getName() + "读数据【开始】!");
            try 
                Thread.sleep(10);
             catch (InterruptedException e) 
                e.printStackTrace();
            
            System.out.println(Thread.currentThread().getName() + "读数据【结束】!");
         finally 
            // lock.unlock();
            rwLock.readLock().unlock();
        
    

    public void write() 
        // lock.lock();
        rwLock.writeLock().lock();
        try 
            System.out.println(Thread.currentThread().getName() + "写数据【开始】!");
            try 
                Thread.sleep(10);
             catch (InterruptedException e) 
                e.printStackTrace();
            
            System.out.println(Thread.currentThread().getName() + "写数据【结束】!");
         finally 
            // lock.unlock();
            rwLock.writeLock().unlock();
        
    

3. BlockingQueue

BlockingQueue即阻塞队列,位于 java.util.concurrent (JDK5)包中,被阻塞的情况主要有如下两种:

  • 当队列满了的时候,进行入队操作
  • 当队列空了的时候,进行出队操作

BlockingQueue 的方法

可能抛出异常的操作特殊值可能会产生阻塞的操作超时
插入add(e)offer(e)put(e)offer(e, time, unit)
移除remove()poll()take()poll(time, unit)
检查element()peek()

使用 BlockingQueue 实现生产者消费者问题

public class ProductFactory 
    /** 存储商品 */
    BlockingQueue list = new ArrayBlockingQueue(10);    // 使用 BlockQueue 实现

    /** 生产商品 */
    public void produce(String name) 
        try 
            list.put(name);           // 放入值,满了之后再放会阻塞
         catch (InterruptedException e) 
            e.printStackTrace();
        
        System.out.println(Thread.currentThread()以上是关于JavaLearn#(16)多线程提升训练:生产者和消费者问题Lock锁ReadWriteLockBlockingQueuevolatile线程池线程同步练习的主要内容,如果未能解决你的问题,请参考以下文章

JavaLearn#(15)集合提升训练:手写ArrayList单链表LinkedListHashMapHashSet新一代并发集合类

JavaLearn#(15)集合提升训练:手写ArrayList单链表LinkedListHashMapHashSet新一代并发集合类

JavaLearn# (13)多线程:线程生命周期线程控制线程同步线程通信线程池ForkJoin框架

用阻塞队列实现生产者消费者模式二(多线程消费)

java 22 - 16 多线程之生产者和消费者的问题

云原生训练营模块二 Go语言进阶