精从HDFS的文件系统锁到读写锁

Posted Java不睡觉

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了精从HDFS的文件系统锁到读写锁相关的知识,希望对你有一定的参考价值。

通过本文,你将了解到:

  1. HDFS中的FSNamesystemLock是什么?

  2. 读写锁ReentrantReadWriteLock、写锁降级以及ReentrantReadWriteLock源码对写线程饥饿情况做的一点优化。

  3. 简要了解JDK 8的新读写锁:StampedLock。

一、HDFS中的FSNamesystemLock

了解HDFS的朋友都知道,HDFS中有一把大锁。它是HDFS中令人又爱又恨的一个存在。爱是因为:它实现方式简单,保护整个文件系统在并操作下的正确性。恨是因为:正是由于它实现方式简单,锁的粒度很粗,是限制HDFS性能的一个大石!举个例子:比如removeBlocks和completeFile这两个方法。都需要获取写锁,那这样就得等对方执行完之后释放写锁自己才可以获取写锁执行。在集群规模大起来之后,对文件系统修改的RPC请求多起来之后,这把大锁对HDFS性能的限制可想而知。

removeBlocks
completeFile

我们先来分析一下FSNamesystemLock的实现。

class FSNamesystemLock {

  protected ReentrantReadWriteLock coarseLock;
  // 构造函数,本质上是可重入读写锁
  FSNamesystemLock(Configuration conf,
      MutableRatesWithAggregation detailedHoldTimeMetrics, Timer timer) {
    boolean fair = conf.getBoolean(DFS_NAMENODE_FSLOCK_FAIR_KEY,
        DFS_NAMENODE_FSLOCK_FAIR_DEFAULT);
    FSNamesystem.LOG.info("fsLock is fair: " + fair);
    this.coarseLock = new ReentrantReadWriteLock(fair);
  }

  // 读锁
  public void readLock() {
    coarseLock.readLock().lock();
    if (coarseLock.getReadHoldCount() == 1) {
      readLockHeldTimeStampNanos.set(timer.monotonicNowNanos());
    }
  }
  // 写锁
  public void writeLock() {
    coarseLock.writeLock().lock();
    if (coarseLock.getWriteHoldCount() == 1) {
      writeLockHeldTimeStampNanos = timer.monotonicNowNanos();
    }
  }
}

可以看到FSNamesystemLock本质上就是一个读写锁,公平与非公平性由配置项获取,默认是公平锁。

由于HDFS在设计之初就是针对读多写少的场景,也即最好是:“一次写入,多次读取”。所以在读多写少的场景下,用读写锁也不失为一个解决并发问题的好选择。但是读写锁也有缺点,比如写饥饿问题等等,这个我们在读写锁一节再分析。而且随着集群规模的扩大,写请求越来越多,全局的读写锁粒度可能确实是太大了。所以很多公司HDFS团队已经有了拆锁的解决方法。

二、ReentrantReadWriteLock相关

为什么会有读写锁呢?思考一下。

计算机领域发明新事物总是朝着更高效,更便捷、榨干机器的最后一滴油水的方向迈进。

在没有读写锁之前,ReentrantLock可以保证线程安全,但是会牺牲一定的性能,因为如果是多个读操作并发执行,其实并不会产生线程安全问题,但是只能一个线程一个线程一个线程地读,可见性能不是很高。

因此设计出了读写锁,读写锁有两个部分,一个读锁、一个写锁。读锁可以被多个线程共享,也即多个线程可以同时读,读锁中是不可以获得写锁的,必须等最后一个持有读锁的线程释放读锁后,等待获取写锁的线程才有可能获得写锁。写锁是排他的,当一个线程持有写锁时,其他线程既不可以读,也不可以写。但是在写锁内是可以获取读锁的,这种操作也被称为“锁降级”。

举一个ReentrantReadWriteLock的官方示例,包含了“锁降级”(读一下相关注释):

 class CachedData {
   Object data;
   volatile boolean cacheValid;
   final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();

   void processCachedData() {
     rwl.readLock().lock();
     if (!cacheValid) {
        // Must release read lock before acquiring write lock
        rwl.readLock().unlock();
        rwl.writeLock().lock();
        try {
          // Recheck state because another thread might have
          // acquired write lock and changed state before we did.
          if (!cacheValid) {
            data = ...
            cacheValid = true;
          }
          // Downgrade by acquiring read lock before releasing write lock
         // 这里在释放写锁之前,获取了读锁。防止其他写线程对数据修改。
          rwl.readLock().lock();
        } finally {
          rwl.writeLock().unlock(); // Unlock write, still hold read
        }
     }

     try {
       use(data);
     } finally {
       rwl.readLock().unlock();
     }
   }
 }

写锁饥饿现象:

考虑这样一种情形(脱离实际JDK源码):在读多写少的场景下,当前是读线程获取了读锁,由于读锁是共享的,所以后面的读请求都可以获取读锁,从而导致写锁几乎获取不到,这就是写锁饥饿。更具体一点:
当线程 A 持有读锁读取数据时,线程 B 要获取写锁修改数据就只能到队列里排队。此时又来了线程 C 读取数据,那么线程 C 就可以获取到读锁,而要执行写操作线程 B 就要等线程 C 释放读锁。由于该场景下读操作远远大于写的操作,此时可能会有很多线程来读取数据而获取到读锁,那么要获取写锁的线程 B 就只能一直等待下去,最终导致饥饿。虽然使用“公平锁”策略可以一定程度上缓解这个问题,但是“公平”策略是以牺牲系统吞吐量为代价的。

针对写饥饿这种情形,JDK中的读写锁做了一些启发式的优化。其中在readerShouldBlock()方法中的注释如下(后文会翻译一下):

    final boolean readerShouldBlock() {
            /* As a heuristic to avoid indefinite writer starvation,
             * block if the thread that momentarily appears to be head
             * of queue, if one exists, is a waiting writer.  This is
             * only a probabilistic effect since a new reader will not
             * block if there is a waiting writer behind other enabled
             * readers that have not yet drained from the queue.
             */

            return apparentlyFirstQueuedIsExclusive();
        }

    // return中有个条件是:AQS中等待队列的第一个节点是非Share的模式。
    final boolean apparentlyFirstQueuedIsExclusive() {
        Node h, s;
        return (h = head) != null &&
            (s = h.next)  != null &&
            !s.isShared()         &&
            s.thread != null;
    }

As a heuristic to avoid indefinite writer starvation, block if the thread that momentarily appears to be head of queue, if one exists, is a waiting writer.

翻译一下这句话:作为避免无限的写饥饿的一个启发,如果等待队列中的第一个节点是等待获取写锁的线程,那就阻止当前reader获取读锁。

再用个人理解解释一下,就是说:假设当前是读写锁是被读锁hold。假设A线程要获取写锁,它会先在AQS的等待队列中自旋,并且它是第一个节点:head.next。接着,B线程要获取读锁,会进行readerShouldBlock()判断第一个等待的节点是不是等待写锁,发现是的,于是阻止自己获取读锁。这样做就能有效的防止读多写少场景下写饥饿情况的过度发生。

好的,接下来稍微简要分析一下读锁和写锁获取锁的源码(不讲获取锁失败后,入等待队列的事),需要读者有AQS的基本了解。不了解也无妨,可直接看中文总结。

首先来看写锁的tryAcquire方法,获取写锁:

注释里的Walkthrough翻译一下:

  1. 如果读线程数非零或者 写线程数非0且持锁线程不是当前线程,则获取锁失败,返回false。(后续将自己加入到等待队列,不在本文范围之内)

  2. 如果线程数饱和了,则获取锁失败,返回false。

  3. 否则,这个线程是有资格获取写锁的,不管是线程自己重入或者在等待队列中轮到它获取写锁。如果获取成功,更新state变量并设置锁的owner。

整个的尝试获取锁的代码也正如上面描述的逻辑一样。值得注意的一个小细节是:writerShouldBlock()的判断结果。对于非公平模式来说,尝试获取写锁的线程永远返回false,即不需阻止获取写锁。对于公平模式来说,如果获取写锁的线程在等待队列里有前驱节点,返回true,即需要阻止获取写锁,如果没有前驱节点,则返回false,即为不需要阻止获取写锁。

protected final boolean tryAcquire(int acquires) {
            /*
             * Walkthrough:
             * 1. If read count nonzero or write count nonzero
             *    and owner is a different thread, fail.
             * 2. If count would saturate, fail. (This can only
             *    happen if count is already nonzero.)
             * 3. Otherwise, this thread is eligible for lock if
             *    it is either a reentrant acquire or
             *    queue policy allows it. If so, update state
             *    and set owner.
             */

            Thread current = Thread.currentThread();
            int c = getState();
            int w = exclusiveCount(c);
            if (c != 0) {
                // (Note: if c != 0 and w == 0 then shared count != 0)
                if (w == 0 || current != getExclusiveOwnerThread())
                    return false;
                if (w + exclusiveCount(acquires) > MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
                // Reentrant acquire
                setState(c + acquires);
                return true;
            }
            if (writerShouldBlock() ||
                !compareAndSetState(c, c + acquires))
                return false;
            setExclusiveOwnerThread(current);
            return true;
        }

接下来看读锁的tryAcquireShared方法,尝试获取共享锁。翻译一下源码中的WalkThrough注释:

  1. 如果被一个其他线程只有写锁,那么获取共享锁失败。

  2. 否则,当前线程是有资格获取共享锁的,接着根据排队规则确认当前是否需要阻塞。如果不需要阻塞,尝试用CAS设置state。

  3. 如果步骤2失败了,要么因为线程明显没有获取共享锁的资格,要么因为CAS失败或者线程数饱和了,则进到完整的retry循环中自旋。

如何判断当前线程是有资格获取共享锁的呢?就是我们之前说的JDK对读写锁的写饥饿优化:readerShouldBlock方法。对于非公平模式来说:如果等待队列中第一个等待的线程是在尝试获取写锁,那么当前线程就进入阻塞状态,优先让写线程获取写锁。这么做是为了一定程度上缓解写饥饿,是一个启发式的做法(jdk 源码注释中这么写的)。对于公平模式来说,如果获取读锁的线程在等待队列里有前驱节点,返回true,即需要阻止获取读锁,如果没有前驱节点,则返回false,即为不需要阻止获取读锁。

protected final int tryAcquireShared(int unused) {
            /*
             * Walkthrough:
             * 1. If write lock held by another thread, fail.
             * 2. Otherwise, this thread is eligible for
             *    lock wrt state, so ask if it should block
             *    because of queue policy. If not, try
             *    to grant by CASing state and updating count.
             *    Note that step does not check for reentrant
             *    acquires, which is postponed to full version
             *    to avoid having to check hold count in
             *    the more typical non-reentrant case.
             * 3. If step 2 fails either because thread
             *    apparently not eligible or CAS fails or count
             *    saturated, chain to version with full retry loop.
             */

            Thread current = Thread.currentThread();
            int c = getState();
            if (exclusiveCount(c) != 0 &&
                getExclusiveOwnerThread() != current)
                return -1;
            int r = sharedCount(c);
            if (!readerShouldBlock() &&
                r < MAX_COUNT &&
                compareAndSetState(c, c + SHARED_UNIT)) {
                if (r == 0) {
                    firstReader = current;
                    firstReaderHoldCount = 1;
                } else if (firstReader == current) {
                    firstReaderHoldCount++;
                } else {
                    HoldCounter rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current))
                        cachedHoldCounter = rh = readHolds.get();
                    else if (rh.count == 0)
                        readHolds.set(rh);
                    rh.count++;
                }
                return 1;
            }
            return fullTryAcquireShared(current);
        }

三、简单了解一下StampedLock

StampedLock是解决什么问题的呢?
前面的ReentrantReadWriteLock可以解决多线程同时读,但只有一个线程能写的问题。但是深入思考之后,会有一个这样的限制:“如果有线程正在读,写线程需要等待读线程释放锁后才能获取写锁,即读的过程中不允许写,这是一种悲观的读锁。”

为了进一步提高表发执行效率,Java 8引入了新的读写锁:StampedLock。

StampedLock相对于ReentrantReadWriteLock的改进思想在于:读的过程中也允许获取写锁后写入!这样一来,我们读的数据就可能不一致,所以,需要一点额外的代码来判断读的过程中是否有写入,这种读锁是一种乐观锁。

乐观锁的意思就是乐观地估计读的过程中大概率不会有写入,因此被称为乐观锁。反过来,悲观锁则是读的过程中拒绝有写入,也就是写入必须等待。显然乐观锁的并发效率更高,但一旦有小概率的写入导致读取的数据不一致,需要能检测出来,再读一遍就行。

来看一下java api中给的示例代码:

class Point {
   private double x, y;
   private final StampedLock sl = new StampedLock();

   void move(double deltaX, double deltaY) // an exclusively locked method
     // 排他写锁模式
     long stamp = sl.writeLock();
     try {
       x += deltaX;
       y += deltaY;
     } finally {
       sl.unlockWrite(stamp);
     }
   }

   double distanceFromOrigin() // A read-only method
     // 乐观读
     long stamp = sl.tryOptimisticRead();
     double currentX = x, currentY = y;
     // 验证是否有写操作修改过,有的话进到if里加读锁
     if (!sl.validate(stamp)) {
        stamp = sl.readLock();
        try {
          currentX = x;
          currentY = y;
        } finally {
           sl.unlockRead(stamp);
        }
     }
     return Math.sqrt(currentX * currentX + currentY * currentY);
   }

   void moveIfAtOrigin(double newX, double newY) // upgrade
     // Could instead start with optimistic, not read mode
     long stamp = sl.readLock();
     try {
       while (x == 0.0 && y == 0.0) {
         //  升级成写锁。
         long ws = sl.tryConvertToWriteLock(stamp);
         if (ws != 0L) {
           stamp = ws;
           x = newX;
           y = newY;
           break;
         }
         else {
           sl.unlockRead(stamp);
           stamp = sl.writeLock();
         }
       }
     } finally {
       sl.unlock(stamp);
     }
   }
 }

更详细的StampedLock类的用法,大家可以参考JDK 8的API文档。部分文档如下图

一种基于功能的锁,具有三种模式来控制读/写访问。StampedLock的状态包括版本和模式。锁获取方法返回一个表示和控制对锁状态的访问的戳记;这些方法的“try”版本可能会返回特殊值0来表示获取访问失败。锁释放和转换方法需要戳作为参数,如果它们与锁的状态不匹配则失败。三种模式分别是:

写:writeLock()方法可能阻塞以等待独占访问,返回一个stamp,该stamp可以在unlockWrite(long)方法中使用以释放写锁。还提供了tryWriteLock的带获取锁超时时间版本和不带获取锁超时时间的版本。当锁以写模式持有时,读锁是无法获取到的,所有乐观读的validations都将失败。

读:readLock()方法可能阻塞以等待非排他访问,返回一个戳记,该戳记可以在unlockRead(long)方法中使用以释放锁。还提供了tryReadLock的的带获取锁超时时间版本和不带获取锁超时时间的版本。

乐观读:tryOptimisticRead()方法只有在当前锁不是以写模式持有时,返回一个非零stamp。如果在获得给定stamp后未以写模式获取过锁,方法validate(long)就返回true。这种模式可以看作是读锁的一个非常弱的版本,可以被写线程在任何时候打破。对短的只有读操作的代码段使用乐观模式通常会减少并发竞争并提高吞吐量。然而,它的使用本质上是脆弱的。乐观读代码section应该只读取字段,并将它们保存在局部变量中,以便在验证后使用。在乐观模式下读取的字段可能非常不一致,因此只有当你足够熟悉数据表示以检查一致性或者反复调用validate()方法验证是否有修改过时,再来使用该方法。例如,当第一次读取对象或数组引用,然后访问其中的一个字段、元素或方法时,通常需要这样的步骤(防止对象或数组引用被其他获取到写锁的线程修改)。

参考
https://docs.oracle.com/javase/8/docs/api/
https://www.liaoxuefeng.com/wiki/1252599548343744/1309138673991714


以上是关于精从HDFS的文件系统锁到读写锁的主要内容,如果未能解决你的问题,请参考以下文章

HDFS读写机制剖析

读写锁 与 互斥锁

从JVM锁到Redis分布式锁,对小白十分友好

图文详解HDFS 系统架构与文件数据读写流程

java中ReentrantReadWriteLock读写锁的使用

HDFS架构及文件读写流程