《并发系列二》自己动手实现互斥锁

Posted PIGP

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了《并发系列二》自己动手实现互斥锁相关的知识,希望对你有一定的参考价值。

经过上一篇文章的学习我们已经大致了解了AQS互斥锁的原理,本文将使用AQS实现自己的互斥锁,通过自己实现互斥锁来加深对AQS的理解,同时讲解锁的可重入性。

自定义实现互斥锁

使用AQS时,通常将在类的内部定义一个子类继承AQS,实现互斥锁需要实现的方法,通过子类使锁的实现透明化,代码如下:

 
   
   
 
  1. public class Mutex {

  2.    Sync sync = new Sync();

  3.    public void lock(){

  4.        sync.acquire(1);

  5.    }

  6.    public void unlock(){

  7.        sync.release(0);

  8.    }

  9.    class Sync extends AbstractQueuedSynchronizer{

  10.        @Override

  11.        protected boolean tryAcquire(int arg) {

  12.            if(compareAndSetState(0, 1)){

  13.                return true;

  14.            }

  15.            return false;

  16.        }

  17.        @Override

  18.        protected boolean tryRelease(int arg) {

  19.            if(getState()==0){

  20.                throw new IllegalArgumentException("state is error");

  21.            }

  22.            setState(0);

  23.            return true;

  24.        }

  25.    }

  26. }

由代码可以看出,我们定义了一个内部类Sync继承了AbstractQueuedSynchronizer,实现与互斥锁相关的方法tryAcquire与tryRelease方法。 在该类中,我们设计为当state为0是表示锁可用,state为1表示锁已经被其他线程占有,获取锁需要等待。

  • tryAcquire

通过CAS原子操作将state的值由0修改为1,如果state值为0,则可以修改成功获取锁,如果state值为1,则CAS操作失败,按照上节将的流程进入等待队列

  • tryRelease

判断state的状态是否合法,如果state为0, 则状态出错,抛出异常;否则,状态正常,直接设置sate置为0,由于互斥锁中获得锁的线程只有一个,释放锁的线程就只有一个,不会出现并发问题,直接使用setState()方法进行设置即可

编写测试类进行测试

测试代码中,我们启动5个线程,使用自定义锁Mutex去控制线程为串行执行,代码如下:

 
   
   
 
  1. public class MutexTest {

  2.    static class Worker implements Runnable{

  3.        Mutex mutex;

  4.        public Worker(Mutex mutex) {

  5.            this.mutex = mutex;

  6.        }

  7.        @Override

  8.        public void run() {

  9.            mutex.lock();

  10.            try {

  11.                System.out.println(Thread.currentThread().getName()+"====获取锁");

  12.                Thread.sleep(1000);

  13.                System.out.println(Thread.currentThread().getName() + new Date().toString());

  14.            } catch (InterruptedException e) {

  15.                e.printStackTrace();

  16.            }finally {

  17.                System.out.println(Thread.currentThread().getName()+"====释放锁");

  18.                mutex.unlock();

  19.            }

  20.        }

  21.    }

  22.    public static void main(String[] args) {

  23.        Mutex mutex = new Mutex();

  24.        new Thread(new Worker(mutex), "mutex-1").start();

  25.        new Thread(new Worker(mutex), "mutex-2").start();

  26.        new Thread(new Worker(mutex), "mutex-3").start();

  27.        new Thread(new Worker(mutex), "mutex-4").start();

  28.        new Thread(new Worker(mutex), "mutex-5").start();

  29.    }

  30. }

使用互斥锁,上述线程会串行的执行,串行执行的意思是一个一个的执行,由于线程调度时机不同,线程mutex-1不一定会在mutex-2前面执行,只是保证多个线程之间执行锁内代码逻辑不会交叉,执行结果如下:

 
   
   
 
  1. mutex-2====获取锁

  2. mutex-2Thu Jul 12 17:01:59 CST 2018

  3. mutex-2====释放锁

  4. mutex-1====获取锁

  5. mutex-1Thu Jul 12 17:02:00 CST 2018

  6. mutex-1====释放锁

  7. mutex-5====获取锁

  8. mutex-5Thu Jul 12 17:02:01 CST 2018

  9. mutex-5====释放锁

  10. mutex-4====获取锁

  11. mutex-4Thu Jul 12 17:02:02 CST 2018

  12. mutex-4====释放锁

  13. mutex-3====获取锁

  14. mutex-3Thu Jul 12 17:02:03 CST 2018

  15. mutex-3====释放锁

不可重入锁存在的问题

针对上面实现的锁,如果线程在获得锁之后再次申请锁时,是申请不到的,会出现自己等待自己的死锁的现象。修改上述的测试代码:

 
   
   
 
  1. package com.qunar.alpaca.Test;

  2. import java.util.Date;

  3. /**

  4. * Created by xinfeng.xu on 2018/7/12.

  5. */

  6. public class MutexTest {

  7.    static class Worker implements Runnable{

  8.        Mutex mutex;

  9.        public Worker(Mutex mutex) {

  10.            this.mutex = mutex;

  11.        }

  12.        @Override

  13.        public void run() {

  14.            mutex.lock();

  15.            try {

  16.                System.out.println(Thread.currentThread().getName()+"====获取锁");

  17.                Thread.sleep(1000);

  18.                System.out.println(Thread.currentThread().getName()+"====准备重新获取锁");

  19.                mutex.lock();

  20.                System.out.println(Thread.currentThread().getName()+"====成功重入锁");

  21.                System.out.println(Thread.currentThread().getName() + new Date().toString());

  22.            } catch (InterruptedException e) {

  23.                e.printStackTrace();

  24.            }finally {

  25.                System.out.println(Thread.currentThread().getName()+"====释放锁");

  26.                mutex.unlock();

  27.            }

  28.        }

  29.    }

  30.    public static void main(String[] args) {

  31.        Mutex mutex = new Mutex();

  32.        new Thread(new Worker(mutex), "mutex-1").start();

  33.        new Thread(new Worker(mutex), "mutex-2").start();

  34.        new Thread(new Worker(mutex), "mutex-3").start();

  35.        new Thread(new Worker(mutex), "mutex-4").start();

  36.        new Thread(new Worker(mutex), "mutex-5").start();

  37.    }

  38. }

先忽略lock两次,unlock一次的问题。在上面的代码中,在第一次lock并休眠1s后,再次lock获取锁,会发生什么样的事情,执行代码,结果如下:

 
   
   
 
  1. mutex-2====获取锁

  2. mutex-2====准备重新获取锁

从结果中可以看出,线程阻塞了,自己在第二次执行lock方法时,在等待自己释放锁,也就是说这段代码发生了死锁。

可重入锁使用场景

说实话我也不是很清楚什么时候会用锁重入的情况,就举一个简单的例子吧,如果执行一个递归方法,递归方法内部加锁了,就会出现锁重入的情况

可重入锁

现在我们需要修改之前写的互斥锁,主要修改部分如下: (1)获取锁成功后,需要将线程设置为占有锁的线程 (2)在获取锁失败后,需要判断当前线程是否为占有锁的线程,如果是则占有锁,并增加state的值 (3)为了记录线程重入的次数,保证线程lock几次,就需要unlock几次,state的值不能仅有0与1,而是修改为0表示可以获取锁,>0表示锁已经被占有,state的值表示重入的次数 (4)锁释放时,判断state值是否为0,如果为0才是真正的释放,真正释放后需要将占有锁的线程标志置空 修改后的代码如下:

 
   
   
 
  1. public class Mutex {

  2.    Sync sync = new Sync();

  3.    public void lock(){

  4.        sync.acquire(1);

  5.    }

  6.    public void unlock(){

  7.        sync.release(0);

  8.    }

  9.    class Sync extends AbstractQueuedSynchronizer{

  10.        @Override

  11.        protected boolean tryAcquire(int arg) {

  12.            int state = getState();

  13.            if(state<0){

  14.                throw new IllegalArgumentException("state is error");

  15.            }

  16.            if(state==0 && compareAndSetState(0, 1)) {

  17.                setExclusiveOwnerThread(Thread.currentThread());

  18.                return true;

  19.            }

  20.            if(Thread.currentThread()==getExclusiveOwnerThread()){

  21.                setState(state+1);

  22.                return true;

  23.            }

  24.            return false;

  25.        }

  26.        @Override

  27.        protected boolean tryRelease(int arg) {

  28.            int state = getState();

  29.            if(state==0){

  30.                throw new IllegalArgumentException("state is error");

  31.            }

  32.            setState(state-1);

  33.            if(state==1){

  34.                setExclusiveOwnerThread(null);

  35.            }

  36.            return true;

  37.        }

  38.    }

  39. }

由此,我们就完成了可重入的互斥锁的实现

总结

锁的实现,我们只需要继承AQS,实现响应的扩展方法即可,对于互斥锁我们只需要实现tryAcquire与tryRelease方法即可。下一节将讲解AQS对于共享锁的实现原理

欢迎扫描下方二维码,关注公众号,我们可以进行技术交流,共同成长


以上是关于《并发系列二》自己动手实现互斥锁的主要内容,如果未能解决你的问题,请参考以下文章

自己动手写数据库:并发管理器的实现,以及并发交易流程的分析

自己动手写数据库:并发管理器的实现,以及并发交易流程的分析

自己动手写数据库:并发管理组件lock_table的原理和实现

自己动手写数据库:并发管理组件lock_table的原理和实现

Go语言自学系列 | golang并发编程之Mutex互斥锁实现同步

高并发基石多线程守护线程线程安全线程同步互斥锁