基于ReentrantLock实现简易生产者消费者模型
Posted Dream_it_possible!
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了基于ReentrantLock实现简易生产者消费者模型相关的知识,希望对你有一定的参考价值。
目录
生产者、消费者模型
实际开发中,我们可能会需要一个队列场景,往队列投入资源的为生产者,往队列中拿元素的为消费者,那么ReentrantLock能帮助我们实现这种生产者、消费者模型。
实现原理: 我们可以借助ReentrantLock定义两个信号量: notFull和notEmpty, notFull.await()阻塞生产者投放资源,同时提醒消费者可以拿资源,notFull.signal() 用来提醒生产者可以继续投放资源 ; notEmpty.await用来阻塞消费者拿资源并提醒生产者往队列里投放资源。
注: 在多线程情况下, 因为生产者投放资源和消费者获取资源不是线程安全的,所以需要在run()方法里使用lock()方法给同步块加锁,在Finally释放锁。
package com.example.jucdemo.lock;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
/**
* 生产者消费者模型
*
* @param <E>
*/
public class ProviderConsumerDemo<E>
private static int queueSize = 10;
private LinkedBlockingDeque<Node<E>> blockingDeque = new LinkedBlockingDeque<>(queueSize);
public LinkedBlockingDeque<Node<E>> getQueue()
return blockingDeque;
private static ReentrantLock lock = new ReentrantLock();
/**
* 定义信号量 notFull,给生产者使用,没有满,那么可以继续投放元素
*/
private static Condition notFull = lock.newCondition();
/**
* 定义信号量notEmpty
*/
private static Condition notEmpty = lock.newCondition();
public static void main(String[] args)
ProviderConsumerDemo<Integer> providerConsumerDemo = new ProviderConsumerDemo<>();
LinkedBlockingDeque<Node<Integer>> blockingDeque = providerConsumerDemo.getQueue();
for (int i = 0; i < 10; i++)
new Thread(new Provider(blockingDeque)).start();
for (int j = 0; j < 10; j++)
new Thread(new Consumer(blockingDeque)).start();
static class Node<E>
private E item;
private Node<E> next;
public Node(E item)
this.item = item;
public void setNext(Node<E> next)
this.next = next;
public String traverse()
String currentValue = this.item.toString();
return this.next == null ? currentValue : this.next.traverse();
@Override
public String toString()
return "" + this.item;
/**
* 队列没满就往队列里添加元素
*/
static class Provider implements Runnable
private LinkedBlockingDeque<Node<Integer>> blockingDeque;
public Provider(LinkedBlockingDeque<Node<Integer>> blockingDeque)
this.blockingDeque = blockingDeque;
@Override
public void run()
lock.lock();
try
/**
* 队列满了,那么就停止投放
*/
while (blockingDeque.size() == queueSize)
notFull.await();
Node<Integer> node = new Node<>((int) (Math.random() * 20));
blockingDeque.add(node);
System.out.println(Thread.currentThread().getName() + "往队列中投入一个元素:" + node.item.toString());
/**
* 队列里有元素了,可以唤醒阻塞线程拿元素
*/
notEmpty.signal();
catch (InterruptedException e)
e.printStackTrace();
finally
lock.unlock();
/**
* 队列里有元素,那么就从队列里拿出一个元素
*/
static class Consumer implements Runnable
private LinkedBlockingDeque<Node<Integer>> blockingDeque;
public Consumer(LinkedBlockingDeque<Node<Integer>> blockingDeque)
this.blockingDeque = blockingDeque;
@Override
public void run()
lock.lock();
try
/**
* 等待投放
*/
while (blockingDeque.isEmpty())
notEmpty.await();
consumer(blockingDeque);
/**
* 可继续投放
*/
notFull.signal();
catch (InterruptedException e)
e.printStackTrace();
finally
lock.unlock();
private void consumer(LinkedBlockingDeque<Node<Integer>> blockingDeque)
System.out.println(Thread.currentThread().getName() + "从队列中拿到一个元素:" + blockingDeque.poll());
打印结果:
Thread-0往队列中投入一个元素:11
Thread-16从队列中拿到一个元素:11
Thread-1往队列中投入一个元素:19
Thread-2往队列中投入一个元素:1
Thread-4往队列中投入一个元素:14
Thread-5往队列中投入一个元素:6
Thread-3往队列中投入一个元素:10
Thread-6往队列中投入一个元素:10
Thread-7往队列中投入一个元素:10
Thread-8往队列中投入一个元素:0
Thread-9往队列中投入一个元素:11
Thread-10从队列中拿到一个元素:19
Thread-11从队列中拿到一个元素:1
Thread-12从队列中拿到一个元素:14
Thread-13从队列中拿到一个元素:6
Thread-14从队列中拿到一个元素:10
Thread-15从队列中拿到一个元素:10
Thread-17从队列中拿到一个元素:10
Thread-18从队列中拿到一个元素:0
Thread-19从队列中拿到一个元素:11
Process finished with exit code 0
我们可以将程序多执行几次,观察结果的差异,但最终的元素进出队列是守恒的。
ArrayBlockingQueue
Java.util.concurrent包下的ArrayBlockingQueue就是基于ReetrantLock实现同步的。
ArrayBlockingQueue提供阻塞和非阻塞的存取元素两种方式:
1. offer()方法和poll()方法是非阻塞的。
2. put()方法和take()方法是阻塞的。
put方法, 队列满了等待空了再投放:
public void put(E e) throws InterruptedException
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try
while (count == items.length)
notFull.await();
enqueue(e);
finally
lock.unlock();
take()方法,队列没有元素,等待队列投放元素再取出
public E take() throws InterruptedException
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try
while (count == 0)
notEmpty.await();
return dequeue();
finally
lock.unlock();
ArrayblcokingQueue示例代码:
import java.util.concurrent.ArrayBlockingQueue;
public class ArrayBlockedQueueDemo
/**
* 1. ArrayBlockQueue元素投放时线程安全的,因为使用了ReentrantLock。
* 2. 如果使用offer()方法投放的元素个数超过了队列的大小,那么会报错: queue full
* 3. offer()方法和poll()方法时非阻塞的,但put()方法和take()方法是阻塞的,即当队列满时,那么会等待队列有位置才能投放元素,当列表里没有元素时,那么线程会等待元素投放。
*/
private static ArrayBlockingQueue<Integer> arrayBlockedQueue = new ArrayBlockingQueue<>(5);
public static void main(String[] args)
for (int j = 0; j < 10; j++)
new Thread(new Consumer()).start();
for (int i = 0; i < 10; i++)
new Thread(new Provider()).start();
static class Provider implements Runnable
@Override
public void run()
int element = (int) (Math.random() * 10);
try
arrayBlockedQueue.put(element);
catch (InterruptedException e)
e.printStackTrace();
System.out.println(Thread.currentThread().getName() + "投放一个元素:" + element);
static class Consumer implements Runnable
@Override
public void run()
try
System.out.println(Thread.currentThread().getName() + "取出一个元素:" + arrayBlockedQueue.take());
catch (InterruptedException e)
e.printStackTrace();
打印结果:
Thread-13投放一个元素:7
Thread-10投放一个元素:2
Thread-14投放一个元素:1
Thread-11投放一个元素:7
Thread-15投放一个元素:5
Thread-0取出一个元素:7
Thread-16投放一个元素:3
Thread-2取出一个元素:2
Thread-1取出一个元素:7
Thread-12投放一个元素:6
Thread-7取出一个元素:7
Thread-17投放一个元素:7
Thread-6取出一个元素:6
Thread-5取出一个元素:3
Thread-4取出一个元素:5
Thread-18投放一个元素:2
Thread-9取出一个元素:2
Thread-3取出一个元素:1
Thread-19投放一个元素:8
Thread-8取出一个元素:8
Process finished with exit code 0
以上是关于基于ReentrantLock实现简易生产者消费者模型的主要内容,如果未能解决你的问题,请参考以下文章
java ReentrantLock结合条件队列 实现生产者-消费者模式
java——利用生产者消费者模式思想实现简易版handler机制