ReentrantReadWriteLock读写锁原理解析

Posted 踩踩踩从踩

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ReentrantReadWriteLock读写锁原理解析相关的知识,希望对你有一定的参考价值。

ReentrantLock及Condition原理解析

前言

上篇文章主要介绍锁的概念,及ReentrantLock及Condition原理解析  ;如何不使用synchronized也能创建一个锁,并且相对于关键字来说更加优化。

本篇文章继续介绍juc包下面的ReentrantReadWriteLock,比可重入锁,粒度更小,更适合与读写操作,在不同的应用场景下有更好的使用

读写锁

为什么读锁需要加锁,如果只在写的地方加锁,我们只能保证写数据时出现原子性操作,但是同时另外个线程同时读取数据,是会出现数据不一致的后果 ;这就是读写之间没有同步加锁导致的问题

public class ReadWrite {

	static volatile int i = 0;
	static volatile int j = 0;

	public static void read() {
		System.out.println("i=" + i + ";j=" + j);
	}

	public static void write() throws InterruptedException {
		synchronized (ReadWrite.class) {
			i++;

			Thread.sleep(1000);
			j++;
		}

	}

	public static void main(String[] args) throws InterruptedException {

		for (int i = 0; i < 10; i++) {
			int n = i;
			new Thread(new Runnable() {
				@Override
				public void run() {
					while (true) {
						try {
							write();
						} catch (InterruptedException e) {
							// TODO Auto-generated catch block
							e.printStackTrace();
						}
					}

				}
			}).start();
		}

		for (;;) {
			read();
			Thread.sleep(500);
		}
	}

}

通过这个小例子可以看出 如果不用读锁,会造成i和j的值读取的时候不一致,失掉原子性,例如常见的多线程写入文件,还有读取文件时,会造成数据不一致原子性问题

i=5;j=5
i=7;j=6
i=7;j=7
i=9;j=8
i=9;j=9

解决办法

  • 可以同时给读和写操作加锁,达到多线程间同步操作,保证读数据的正常

  • 还有就是这篇文章会介绍的ReadWriteLock  ,当读操作时可以多线程同时共享锁,写操作时,多线程互斥锁,读写操作也会互斥锁

 

locks层次结构

 

 概念

维护一对关联锁,一个只用于 操作,一个只用于 操作;读锁可以由多个线程同时持有,写锁是排他的。同一段时间,两把锁不能被不同的线程持有。
怎么读写锁ReadWriteLock,以下面的例子读写锁的使用
public class ReadWrite {

	static volatile int i = 0;
	static volatile int j = 0;

	static ReadWriteLock rwLock = new ReentrantReadWriteLock();

	public static void read() {
		rwLock.readLock().lock();
		System.out.println("i=" + i + ";j=" + j);
		rwLock.readLock().unlock();
	}

	public static void write() throws InterruptedException {
		rwLock.writeLock().lock();
		i++;

		Thread.sleep(100);
		j++;
		rwLock.writeLock().unlock();

	}

	public static void main(String[] args) throws InterruptedException {

		for (int i = 0; i < 10; i++) {
			int n = i;
			new Thread(new Runnable() {
				@Override
				public void run() {
					while (true) {
						try {
							if (n == 0) {
								write();
							} else {
								read();
							}

						} catch (InterruptedException e) {
							// TODO Auto-generated catch block
							e.printStackTrace();
						}
					}

				}
			}).start();
		}
	}

}

使用单线程去写数据,多线程去读数据,读到的数据最后都是相等的。

当我把n多加几个线程时,这个效率也会下降很多,这就能引出下面读写锁的适用场景

使用场景

适合 读取 操作 多于写入 操作的场景,改进互斥锁的性能,
比如: 集合的并发线程安全性改造 缓存组件

原理

提升互斥锁的性能,源码中的属性

 /** 提供读锁的内部类 */
    private final ReentrantReadWriteLock.ReadLock readerLock;
    /** 提供写锁的内部类 */
    private final ReentrantReadWriteLock.WriteLock writerLock;
    /** 执行所有同步机制 */
    final Sync sync;

 演示实例

public class ReadWriteMap {
    private final Map<String, Object> m = new HashMap<>();

    private final ReadWriteLock lock = new ReentrantReadWriteLock();
    private final Lock r = lock.readLock();
    private final Lock w = lock.writeLock();

    public Object get(String key){
        r.lock();
        try {
            return m.get(key);
        }finally {
            r.unlock();
        }
    }

    public Object[] allKeys(){
        r.lock();
        try {
            return m.keySet().toArray();
        }finally {
            r.unlock();
        }
    }

    public Object put(String key, Object value){
        w.lock();
        try {
            return m.put(key, value);
        }finally {
            w.unlock();
        }
    }

    public void clear(){
        w.lock();
        try {
            m.clear();
        }finally {
            w.unlock();
        }

    }



}

特性:

  • 获取顺序

       在默认构造方法中设置

  public ReentrantReadWriteLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
        readerLock = new ReadLock(this);
        writerLock = new WriteLock(this);
    }

         非公平模式(默认)

    当以非公平初始化时,读锁和写锁的获取的顺序是不确定的。非公平锁主张竞争获取,可能 会延缓一个或多个读或写线程,但是会比公平锁有更高的吞吐量。

         公平模式

    当以公平模式初始化时,线程将会以队列的顺序获取锁。当当前线程释放锁后,等待时间最长的写锁线程就会被分配写锁;或者有一组读线程组等待时间比写线程长,那么这组读线程组将会被分配读锁。
    当有写线程持有写锁或者有等待的写线程时,一个尝试获取公平的读锁(非重入)的线程就会阻塞。这个线程直到等待时间最长的写锁获得锁后并释放掉锁后才能获取到读锁。

  • 可重入

    允许读锁可写锁可重入。写锁可以获得读锁,读锁不能获得写锁。

  • 锁降级

    允许写锁降低为读锁中断锁的获取在读锁和写锁的获取过程中支持中断

  • 支持Condition

    写锁提供Condition实现监控提供确定锁是否被持有等辅助方法

缓存雪崩、及锁降级

读写锁应用在缓存上,防止缓存雪崩,产生缓存雪崩的原因

  //读数据,加读锁
        rwl.readLock().lock();
        try {
            if (cacheValid){
                data = Redis.data.get(dataKey);
            }else{
                data= DataBase.queryUserInfo();       //可能存在缓存雪崩的场景
                //useData()....
            }

            return data;
        }finally {
            rwl.readLock().unlock();
        }

问题来源,我们去获取数据是加读锁,多个线程可以共享锁,读取数据,但是当缓存可能失效的情况,会需要在读取数据库数据,更新缓存,如果大量线程同一时间进来,就有可能导致同时访问数据库,造成缓存雪崩;

解决办法就是利用读写锁中的 锁降级机制解决问题

//data= DataBase.queryUserInfo();       //可能存在缓存雪崩的场景
                rwl.readLock().unlock();

                //加写锁之后,并不会马上获取到所,会等到所有的读锁释放
                rwl.writeLock().lock();
                try {
                    if (!cacheValid){
                        data = DataBase.queryUserInfo();
                        Redis.data.put(dataKey,data);

                        cacheValid = true;
                    }
                    //获取读锁,进行锁降级
                    rwl.readLock().lock();

                }finally {
                    rwl.writeLock().unlock();
                }

一定要写锁内部添加读锁进行锁降级。

写锁和读锁是互斥的,但是锁降级可以使写锁内部添加读锁

锁降级

指的是 写锁降级成为读锁 。持有写锁的同时,再获取读锁,随后释放写锁的过程。
写锁是线程独占,读锁是共享,所以写->读是降级。(读->写,是不能实现的)
前提是同一个线程。

读写锁的原理

  • 第一次进行去获取写锁,判断读锁释放被占用(readstate==0),如果未被占用则在判断读锁 释放被占用(writestate==0),未被占用则writestate+1,并且将owner==th1当前线程

  • 第二次相同线程还是写锁进来,在判断读锁是否为0,然后 判断写锁状态,并writestate +1

  • 锁降级操作,写锁状态不等于0,并且线程是自己,就要锁降级,利用cas进行readstate修改

  •  读锁获取,判断写锁是否被占用并且不是本线程,抢锁失败,直接放到队列中

  • 然后继续下午不断的读锁,进来都会进入等待队列中

  • 如果读锁进来,写锁没被占用那就获取到锁

 ReentrantReadWriteLock实现码

源码中构造方法中初始化对应写锁,和读锁,以及对应的 公平锁代理类和非公平锁代理类 ,不公平的锁,有可能有个线程释放锁,但是正好有一个线程过来就抢到锁,而队列头部的反而没抢到。

 public ReentrantReadWriteLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
        readerLock = new ReadLock(this);
        writerLock = new WriteLock(this);
    }
 public static class WriteLock implements Lock, java.io.Serializable {
        private static final long serialVersionUID = -4992448646407690164L;
        private final Sync sync;
  public static class ReadLock implements Lock, java.io.Serializable {
        private static final long serialVersionUID = -5992448646407690164L;
        private final Sync sync;

源码中都会持有 sync对象  ,并且隔绝开公平锁和非公平锁

ReadLock类中的lock和unlock方法:

   public void lock() {
            sync.acquireShared(1);
        }
   public void unlock() {
            sync.releaseShared(1);
        }
  • 判断共享锁的逻辑代码,在AbstractQueuedSynchronizer 方法中实现;

在共享模式下获取,忽略中断。实施人首先调用至少一次{@link#tryAcquireShared},回归成功。否则线程可能会排队反复阻塞和取消阻塞,调用直到成功。 

   public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }

尝试去获取共享锁,比较源码中 tryAcquireShared 部分代码 。

 protected final int tryAcquireShared(int unused) {
    // 获取当前线程
    Thread current = Thread.currentThread();
    // 获取状态
    int c = getState();

    //如果写锁线程数 != 0 ,且独占锁不是当前线程则返回失败,因为存在锁降级
    if (exclusiveCount(c) != 0 &&
        getExclusiveOwnerThread() != current)
        return -1;
    // 读锁数量
    int r = sharedCount(c);
    /*
     * readerShouldBlock():读锁是否需要等待(公平锁原则)
     * r < MAX_COUNT:持有线程小于最大数(65535)
     * compareAndSetState(c, c + SHARED_UNIT):设置读取锁状态
     */
     // 读线程是否应该被阻塞、并且小于最大值、并且比较设置成功
    if (!readerShouldBlock() &&
        r < MAX_COUNT &&
        compareAndSetState(c, c + SHARED_UNIT)) {
        //r == 0,表示第一个读锁线程,第一个读锁firstRead是不会加入到readHolds中
        if (r == 0) { // 读锁数量为0
            // 设置第一个读线程
            firstReader = current;
            // 读线程占用的资源数为1
            firstReaderHoldCount = 1;
        } else if (firstReader == current) { // 当前线程为第一个读线程,表示第一个读锁线程重入
            // 占用资源数加1
            firstReaderHoldCount++;
        } else { // 读锁数量不为0并且不为当前线程
            // 获取计数器
            HoldCounter rh = cachedHoldCounter;
            // 计数器为空或者计数器的tid不为当前正在运行的线程的tid
            if (rh == null || rh.tid != getThreadId(current))
                // 获取当前线程对应的计数器
                cachedHoldCounter = rh = readHolds.get();
            else if (rh.count == 0) // 计数为0
                //加入到readHolds中
                readHolds.set(rh);
            //计数+1
            rh.count++;
        }
        return 1;
    }
    return fullTryAcquireShared(current);
}
  • 以共享不间断模式获取锁
 private void doAcquireShared(int arg) {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

WriteLock类中的lock和unlock方法:

  public void lock() {
            sync.acquire(1);
        }

 public boolean tryLock( ) {
            return sync.tryWriteLock();
        }


 public void unlock() {
            sync.release(1);
        }
  • 对应 抽象同步队列中的方法
 public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
  • 包括在释放锁中的操作
 public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

AQS

源码中对应的同步器方法,提供了对资源占用、释放,线程的挂起、唤醒的逻辑。 预留了各种 try 方法给用户实现,可以用在各种需要控制资源争用的场景中。(ReentrantLock/CountDownLatch/Semphore)

预留了大量方法给我们 我们实现,  state用高8位和 低8位来做为 识别读状态和写状态,保证更好的原子性。

源码用链表实现了一个队列。

总结

整篇文章介绍了读写锁,从为什么使用读写锁,以及读写锁的使用场景。优缺点来证明出读写锁的优点,整个读写锁,什么是锁降级,在写锁中,进行读锁操作,可以进行锁降级,而反之则不能。

如果当读锁被占用时,有写锁过来线程一定会加入阻塞队列,反之则不一定。

例如源码中利用aqs中模板方法提供了很大的可扩展性,都是值得学习的。

以上是关于ReentrantReadWriteLock读写锁原理解析的主要内容,如果未能解决你的问题,请参考以下文章

[图解Java]读写锁ReentrantReadWriteLock

JUC中的读写锁(ReentrantReadWriteLock)

读写锁 ReentrantReadWriteLock

ReentrantReadWriteLock读写锁的使用

AQS系列- ReentrantReadWriteLock读写锁的加锁

ReentrantReadWriteLock读写锁