基于ReentrantLock实现简易生产者消费者模型

Posted Dream_it_possible!

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了基于ReentrantLock实现简易生产者消费者模型相关的知识,希望对你有一定的参考价值。

目录

生产者、消费者模型

ArrayBlockingQueue


生产者、消费者模型

        实际开发中,我们可能会需要一个队列场景,往队列投入资源的为生产者,往队列中拿元素的为消费者,那么ReentrantLock能帮助我们实现这种生产者、消费者模型。

        实现原理: 我们可以借助ReentrantLock定义两个信号量: notFull和notEmpty, notFull.await()阻塞生产者投放资源,同时提醒消费者可以拿资源,notFull.signal() 用来提醒生产者可以继续投放资源 ; notEmpty.await用来阻塞消费者拿资源并提醒生产者往队列里投放资源。

        注:  在多线程情况下, 因为生产者投放资源和消费者获取资源不是线程安全的,所以需要在run()方法里使用lock()方法给同步块加锁,在Finally释放锁。

package com.example.jucdemo.lock;

import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

/**
 * 生产者消费者模型
 *
 * @param <E>
 */
public class ProviderConsumerDemo<E> 


    private static int queueSize = 10;

    private LinkedBlockingDeque<Node<E>> blockingDeque = new LinkedBlockingDeque<>(queueSize);

    public LinkedBlockingDeque<Node<E>> getQueue() 
        return blockingDeque;
    

    private static ReentrantLock lock = new ReentrantLock();

    /**
     * 定义信号量 notFull,给生产者使用,没有满,那么可以继续投放元素
     */
    private static Condition notFull = lock.newCondition();

    /**
     * 定义信号量notEmpty
     */
    private static Condition notEmpty = lock.newCondition();


    public static void main(String[] args) 
        ProviderConsumerDemo<Integer> providerConsumerDemo = new ProviderConsumerDemo<>();
        LinkedBlockingDeque<Node<Integer>> blockingDeque = providerConsumerDemo.getQueue();
        for (int i = 0; i < 10; i++) 
            new Thread(new Provider(blockingDeque)).start();
        

        for (int j = 0; j < 10; j++) 
            new Thread(new Consumer(blockingDeque)).start();
        

    


    static class Node<E> 

        private E item;

        private Node<E> next;

        public Node(E item) 
            this.item = item;
        

        public void setNext(Node<E> next) 
            this.next = next;
        


        public String traverse() 
            String currentValue = this.item.toString();
            return this.next == null ? currentValue : this.next.traverse();
        

        @Override
        public String toString() 
            return "" + this.item;
        
    

    /**
     * 队列没满就往队列里添加元素
     */
    static class Provider implements Runnable 

        private LinkedBlockingDeque<Node<Integer>> blockingDeque;

        public Provider(LinkedBlockingDeque<Node<Integer>> blockingDeque) 
            this.blockingDeque = blockingDeque;
        

        @Override
        public void run() 
            lock.lock();
            try 
                /**
                 *  队列满了,那么就停止投放
                 */
                while (blockingDeque.size() == queueSize) 
                    notFull.await();
                
                Node<Integer> node = new Node<>((int) (Math.random() * 20));
                blockingDeque.add(node);
                System.out.println(Thread.currentThread().getName() + "往队列中投入一个元素:" + node.item.toString());
                /**
                 * 队列里有元素了,可以唤醒阻塞线程拿元素
                 */
                notEmpty.signal();
             catch (InterruptedException e) 
                e.printStackTrace();
             finally 
                lock.unlock();
            

        
    


    /**
     * 队列里有元素,那么就从队列里拿出一个元素
     */
    static class Consumer implements Runnable 

        private LinkedBlockingDeque<Node<Integer>> blockingDeque;

        public Consumer(LinkedBlockingDeque<Node<Integer>> blockingDeque) 
            this.blockingDeque = blockingDeque;
        

        @Override
        public void run() 
            lock.lock();
            try 
                /**
                 *  等待投放
                 */
                while (blockingDeque.isEmpty()) 
                    notEmpty.await();
                
                consumer(blockingDeque);
                /**
                 *  可继续投放
                 */
                notFull.signal();
             catch (InterruptedException e) 
                e.printStackTrace();
             finally 
                lock.unlock();
            
        

        private void consumer(LinkedBlockingDeque<Node<Integer>> blockingDeque) 
            System.out.println(Thread.currentThread().getName() + "从队列中拿到一个元素:" + blockingDeque.poll());
        
    

打印结果: 

Thread-0往队列中投入一个元素:11
Thread-16从队列中拿到一个元素:11
Thread-1往队列中投入一个元素:19
Thread-2往队列中投入一个元素:1
Thread-4往队列中投入一个元素:14
Thread-5往队列中投入一个元素:6
Thread-3往队列中投入一个元素:10
Thread-6往队列中投入一个元素:10
Thread-7往队列中投入一个元素:10
Thread-8往队列中投入一个元素:0
Thread-9往队列中投入一个元素:11
Thread-10从队列中拿到一个元素:19
Thread-11从队列中拿到一个元素:1
Thread-12从队列中拿到一个元素:14
Thread-13从队列中拿到一个元素:6
Thread-14从队列中拿到一个元素:10
Thread-15从队列中拿到一个元素:10
Thread-17从队列中拿到一个元素:10
Thread-18从队列中拿到一个元素:0
Thread-19从队列中拿到一个元素:11

Process finished with exit code 0

我们可以将程序多执行几次,观察结果的差异,但最终的元素进出队列是守恒的。

ArrayBlockingQueue

Java.util.concurrent包下的ArrayBlockingQueue就是基于ReetrantLock实现同步的。

ArrayBlockingQueue提供阻塞和非阻塞的存取元素两种方式:

1. offer()方法和poll()方法是非阻塞的。

2. put()方法和take()方法是阻塞的。


put方法, 队列满了等待空了再投放:

 public void put(E e) throws InterruptedException 
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try 
            while (count == items.length)
                notFull.await();
            enqueue(e);
         finally 
            lock.unlock();
        
    

take()方法,队列没有元素,等待队列投放元素再取出

 public E take() throws InterruptedException 
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try 
            while (count == 0)
                notEmpty.await();
            return dequeue();
         finally 
            lock.unlock();
        
    

 ArrayblcokingQueue示例代码:

import java.util.concurrent.ArrayBlockingQueue;

public class ArrayBlockedQueueDemo 
    /**
     * 1. ArrayBlockQueue元素投放时线程安全的,因为使用了ReentrantLock。
     * 2. 如果使用offer()方法投放的元素个数超过了队列的大小,那么会报错: queue full
     * 3. offer()方法和poll()方法时非阻塞的,但put()方法和take()方法是阻塞的,即当队列满时,那么会等待队列有位置才能投放元素,当列表里没有元素时,那么线程会等待元素投放。
     */
    private static ArrayBlockingQueue<Integer> arrayBlockedQueue = new ArrayBlockingQueue<>(5);


    public static void main(String[] args) 

        for (int j = 0; j < 10; j++) 
            new Thread(new Consumer()).start();
        

        for (int i = 0; i < 10; i++) 
            new Thread(new Provider()).start();
        


    

    static class Provider implements Runnable 

        @Override
        public void run() 
            int element = (int) (Math.random() * 10);
            try 
                arrayBlockedQueue.put(element);
             catch (InterruptedException e) 
                e.printStackTrace();
            
            System.out.println(Thread.currentThread().getName() + "投放一个元素:" + element);
        
    

    static class Consumer implements Runnable 

        @Override
        public void run() 
            try 
                System.out.println(Thread.currentThread().getName() + "取出一个元素:" + arrayBlockedQueue.take());
             catch (InterruptedException e) 
                e.printStackTrace();
            
        
    

打印结果:

Thread-13投放一个元素:7
Thread-10投放一个元素:2
Thread-14投放一个元素:1
Thread-11投放一个元素:7
Thread-15投放一个元素:5
Thread-0取出一个元素:7
Thread-16投放一个元素:3
Thread-2取出一个元素:2
Thread-1取出一个元素:7
Thread-12投放一个元素:6
Thread-7取出一个元素:7
Thread-17投放一个元素:7
Thread-6取出一个元素:6
Thread-5取出一个元素:3
Thread-4取出一个元素:5
Thread-18投放一个元素:2
Thread-9取出一个元素:2
Thread-3取出一个元素:1
Thread-19投放一个元素:8
Thread-8取出一个元素:8

Process finished with exit code 0

以上是关于基于ReentrantLock实现简易生产者消费者模型的主要内容,如果未能解决你的问题,请参考以下文章

java ReentrantLock结合条件队列 实现生产者-消费者模式

[linux环境] 基于thrift模拟游戏的简易匹配机制

简易版的生产者消费者实现业务异步事务分离

java——利用生产者消费者模式思想实现简易版handler机制

java并发之ReentrantLock.Condition实现原理

多线程:简易版本生产消费者模式纯语言概述