Java实现锁公平锁读写锁信号量阻塞队列线程池等常用并发工具

Posted jeysin

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java实现锁公平锁读写锁信号量阻塞队列线程池等常用并发工具相关的知识,希望对你有一定的参考价值。

锁的实现

锁的实现其实很简单,主要使用Java中synchronized关键字。

public class Lock {

    private volatile boolean isLocked = false;

    private Thread lockingThread = null;

    public synchronized void lock() throws InterruptedExpection {
        while(isLocked){
            wait();
        }
        isLocked = true;
        lockingThread = Thread.currentThread();
    }

    public synchronized void unlock() {
        if(this.lockingThread != Thread.currentThread()){
            throw new IllegalMonitorStateException("Calling thread has not locked this lock");
        }
        isLocked = false;
        lockingThread = null;
        notify();
    }
}

公平锁的实现

上面的锁的实现严格意义上说是会存在线程饥饿现象的(也就是说在多线程竞争的条件下,存在一种极端情况,即某个线程一直阻塞在锁上,永远都是其他线程被优先唤醒,导致自己得不到执行)。下面是公平锁的实现:

/**
 * @Author: Jeysin
 * @Date: 2019/4/16 12:16
 * @Desc: 公平锁的实现,不会存在线程饿死现象。
 * 实现原理:每个线程在不同的对象上调用wait方法,Lock类可以决定调用哪个对象的notify方法,所以可以做到唤醒特定的线程
 */

public class FairLock {

    private volatile boolean isLocked = false;

    private Thread lockingThread = null;

    private List<QueueObject> waitingThreads = new ArrayList<QueueObject>();

    public void lock() throws InterruptedException{
        QueueObject queueObject = new QueueObject();//首先给每个要加锁的线程new一个QueueObject对象
        boolean isLockedForThisThread = true;
        synchronized (this){
            waitingThreads.add(queueObject);//将这个对象添加到链表里,注意用synchronize关键字做并发控制
        }
        while(isLockedForThisThread){
            synchronized (this) {
                //判断一下当前锁是否没有被占用,并且判断当前线程对应的QueueObject是否是链表中的第一个(因为默认链表中第一个线程首先获得锁)
                isLockedForThisThread = isLocked || waitingThreads.get(0) != queueObject;
                if (!isLockedForThisThread) {
                    isLocked = true;
                    waitingThreads.remove(queueObject);
                    lockingThread = Thread.currentThread();
                    return;//链表中第一个线程加锁成功后从链表中移除自身对应的QueueObject对象,并从这条语句返回
                }
            }
            try{
                queueObject.doWait();//其他线程阻塞在这条语句上
            }catch (InterruptedException e){
                synchronized (this){
                    waitingThreads.remove(queueObject);
                    throw e;
                }
            }
        }
    }

    public synchronized void unlock(){
        if(this.lockingThread != Thread.currentThread()){
            throw new IllegalMonitorStateException("Calling thread has not locked this lock");
        }
        isLocked = false;
        lockingThread = null;
        if(waitingThreads.size() > 0){
            waitingThreads.get(0).doNotify();//默认唤醒链表中第一个对象对应的线程,达到公平的目的
        }
    }
}
/**
 * @Author: Jeysin
 * @Date: 2019/4/16 12:20
 * @Desc:
 */

public class QueueObject {

    private boolean isNotified = false;

    public synchronized void doWait() throws InterruptedException{
        while(!isNotified){
            this.wait();
        }
        this.isNotified = false;
    }

    public synchronized void doNotify(){
        this.isNotified = true;
        this.notify();
    }

    @Override
    public boolean equals(Object obj) {
        return this == obj;
    }
}

读写锁的实现

还记得秋招面试美团的时候,二面面试官的第一道编程题就是实现一个读写锁,当时不会Java,用C++写的,还记得当时用的是Linux下的pthread_mutex(也就是互斥量),耗了半个小时死活没有实现出一个读写锁,感觉怎么写都不对,都有点怀疑人生了,毫无疑问那场面试挂掉了。当时我就在想,肯定是一开始思路就错了,pthread_mutex虽然也可以实现一个锁的功能,但是离实现读写锁还是差了太远,一个pthread_mutex肯定是不行的(甚至用两个也不行,别问我是怎么知道的,我在那半个小时的面试里尝试了无数次最后还是不行)。直到最近看了Java版本的一个实现,synchronized加上wait和notify完美解决问题,我才意识到果然是一开始思路就错了,也许当时我用一个pthread_mutex和一个pthread_cond就可以解决问题。现在想来,要实现一个读写锁最关键的地方要有线程的唤醒机制,notify可以做到,pthread_cond也可以做到,但是光用pthread_mutex是不可能做到的。啥也不说了,Java大法好。

/**
 * @Author: Jeysin
 * @Date: 2019/4/16 22:01
 * @Desc: 不可重入的读写锁实现
 */

public class ReadWriteLock {
    private volatile int readers = 0;
    private volatile int writers = 0;
    private volatile int writeRequests = 0;

    public synchronized void lockRead() throws InterruptedException{
        while(writers > 0 || writeRequests > 0){
            this.wait();
        }
        ++readers;
    }

    public synchronized void unlockRead(){
        --readers;
        this.notifyAll();
    }

    public synchronized void lockWrite() throws InterruptedException{
        ++writeRequests;
        while(readers > 0 || writers > 0){
            wait();
        }
        --writeRequests;
        ++writers;
    }

    public synchronized void unlockWrite(){
        --writers;
        notifyAll();
    }
}

顺带附上一个可重入版本的读写锁实现:

/**
 * @Author: Jeysin
 * @Date: 2019/4/16 22:33
 * @Desc: 可重入读写锁的实现
 */

public class ReentrantReadWriteLock {

    private Map<Thread, Integer> readingThreadsMap = new HashMap<Thread, Integer>();

    private volatile int writers = 0;

    private volatile int writeRequests = 0;

    private volatile Thread writingThread = null;

    public synchronized void lockRead() throws InterruptedException{
        Thread callingThread = Thread.currentThread();
        while(!canGrantReadAccess(callingThread)){
            wait();
        }
        readingThreadsMap.put(callingThread,getAccessCount(callingThread) + 1);
    }

    public synchronized void unlockRead(){
        Thread callingThread = Thread.currentThread();
        int count = getAccessCount(callingThread);
        if(count == 1){
            readingThreadsMap.remove(callingThread);
        }else {
            readingThreadsMap.put(callingThread, count-1);
        }
        notifyAll();
    }

    public synchronized void lockWrite() throws InterruptedException{
        ++writeRequests;
        Thread callingThread = Thread.currentThread();
        while(!canGrantWriteAccess(callingThread)){
            wait();
        }
        --writeRequests;
        ++writers;
        writingThread = callingThread;
    }

    public synchronized void unlockWrite(){
        --writers;
        if(writers == 0){
            writingThread = null;
        }
        notifyAll();
    }

    private boolean canGrantWriteAccess(Thread callingThread){
        if(readingThreadsMap.size() > 0){
            return false;
        }
        if(writers > 0 && writingThread != callingThread){
            return false;
        }
        return true;
    }

    private boolean canGrantReadAccess(Thread callingThread){
        if(writers > 0){
            return false;
        }
        if(readingThreadsMap.get(callingThread) != null){
            return true;
        }
        if(writeRequests > 0){
            return false;
        }
        return true;
    }

    private Integer getAccessCount(Thread callingThread){
        Integer count = readingThreadsMap.get(callingThread);
        if(count == null){
            return 0;
        }
        return count;
    }
}

信号量

信号量的实现同样也可以借用synchronized关键字,不得不说,synchronized大法好啊~

/**
 * @Author: Jeysin
 * @Date: 2019/4/18 15:16
 * @Desc: 信号量的实现
 */

public class Semaphore {

    private volatile boolean signal = false;

    public synchronized void take(){
        this.signal = true;
        this.notify();
    }

    public synchronized void release() throws InterruptedException{
        while(!this.signal){
            wait();
        }
        this.signal = false;
    }
}
/**
 * @Author: Jeysin
 * @Date: 2019/4/18 15:21
 * @Desc: 有上限的信号量的实现
 */

public class BoundedSemaphore {

    private volatile int signal = 0;

    private volatile int bound = 0;

    public BoundedSemaphore(int bound){
        this.bound = bound;
    }

    public synchronized void take() throws InterruptedException{
        while(this.signal == this.bound){
            wait();
        }
        ++signal;
        notify();
    }

    public synchronized void release() throws InterruptedException{
        while(signal == 0){
            wait();
        }
        --signal;
        notify();
    }
}

阻塞队列

/**
 * @Author: Jeysin
 * @Date: 2019/4/18 15:43
 * @Desc: 阻塞队列的实现
 */

public class BlockQueue {

    private List queue = new LinkedList();

    private volatile int limit = 10;

    public BlockQueue(int limit){
        this.limit = limit;
    }

    public synchronized void enqueue(Object object) throws InterruptedException{
        while(this.queue.size() > limit){
            wait();
        }
        if(this.queue.size() == 1){
            notifyAll();
        }
        queue.add(object);
    }

    public synchronized Object dequeue() throws InterruptedException{
        while(this.queue.size() == 0){
            wait();
        }
        if(this.queue.size() == limit){
            notifyAll();
        }
        return this.queue.remove(0);
    }
}

线程池

有了阻塞队列,线程池的实现就很简单了

/**
 * @Author: Jeysin
 * @Date: 2019/4/18 16:07
 * @Desc: 线程池的实现
 */

public class ThreadPool {

    private BlockingQueue<Runnable> taskQueue = null;

    private List<PoolThread> threads = new ArrayList<PoolThread>();

    private volatile boolean isStopped = false;

    public ThreadPool(int threadNums, int maxTaskNums){
        this.taskQueue = new LinkedBlockingQueue<Runnable>(maxTaskNums);
        for(int i=0; i<threadNums; ++i){
            threads.add(new PoolThread(taskQueue));
        }
        for(PoolThread poolThread : threads){
            poolThread.start();
        }
    }

    public synchronized void execute(Runnable task){
        if(this.isStopped){
            throw new IllegalStateException("Thread pool is stopped");
        }
        this.taskQueue.add(task);
    }

    public synchronized void stop(){
        this.isStopped = true;
        for(PoolThread poolThread : threads){
            poolThread.toStop();
        }
    }
}
/**
 * @Author: Jeysin
 * @Date: 2019/4/18 16:09
 * @Desc:
 */

public class PoolThread extends Thread {

    private BlockingQueue<Runnable> taskQueue = null;

    private volatile boolean isStopped = false;

    public PoolThread(BlockingQueue<Runnable> queue){
        this.taskQueue = queue;
    }

    @Override
    public void run() {
        while(!isStopped){
            try{
                Runnable runnable = taskQueue.take();
                runnable.run();
            }catch (Exception e){
                e.printStackTrace();
            }
        }
    }

    public synchronized void toStop(){
        isStopped = true;
        this.interrupt();
    }
}

参考文章:
http://tutorials.jenkov.com/java-concurrency/index.html

以上是关于Java实现锁公平锁读写锁信号量阻塞队列线程池等常用并发工具的主要内容,如果未能解决你的问题,请参考以下文章

java锁总结

Java核心---线程进阶

Java核心---线程进阶

Java核心---线程进阶

Java多线程Java面试题

Java多线程Java面试题