        继上篇《HBase源代码分析之HRegion上MemStore的flsuh流程(一)》之后。我们继续分析下HRegion上MemStore flush的核心方法internalFlushcache()。它的主要流程如图所看到的:



   * Flush the memstore. Flushing the memstore is a little tricky. We have a lot of updates in the
   * memstore, all of which have also been written to the wal. We need to write those updates in the
   * memstore out to disk, while being able to process reads/writes as much as possible during the
   * flush operation.
   * <p>This method may block for some time.  Every time you call it, we up the regions
   * sequence id even if we don‘t flush; i.e. the returned region id will be at least one larger
   * than the last edit applied to this region. The returned id does not refer to an actual edit.
   * The returned id can be used for say installing a bulk loaded file just ahead of the last hfile
   * that was the result of this flush, etc.
   * @return object describing the flush‘s state
   * @throws IOException general io exceptions
   * @throws DroppedSnapshotException Thrown when replay of wal is required
   * because a Snapshot was not properly persisted.
  protected FlushResult internalFlushcache(MonitoredTask status)
      throws IOException {
    return internalFlushcache(this.wal, -1, status);

   * @param wal Null if we‘re NOT to go via wal.
   * @param myseqid The seqid to use if <code>wal</code> is null writing out flush file.
   * @return object describing the flush‘s state
   * @throws IOException
   * @see #internalFlushcache(MonitoredTask)
  protected FlushResult internalFlushcache(
      final WAL wal, final long myseqid, MonitoredTask status) throws IOException {
	//  假设RegionServerServices类型的rsServices不为空,且为夭折的。直接抛出异常
	if (this.rsServices != null && this.rsServices.isAborted()) {
      // Don‘t flush when server aborting, it‘s unsafe
      throw new IOException("Aborting flush because server is aborted...");
	// 获取開始时间
    final long startTime = EnvironmentEdgeManager.currentTime();
    // If nothing to flush, return, but we need to safely update the region sequence id
    // 假设没有能够刷新的缓存,直接返回,可是我们须要安全的更新Region的sequence id
    if (this.memstoreSize.get() <= 0) {
      // Take an update lock because am about to change the sequence id and we want the sequence id
      // to be at the border of the empty memstore.
      // 获取一个更新锁。由于我们即将要更新一个序列ID,而且我们想让这个序列ID成为一个空的memstore的边界
      MultiVersionConsistencyControl.WriteEntry w = null;
      // 获取更新锁的写锁
      try {
        if (this.memstoreSize.get() <= 0) {
          // Presume that if there are still no edits in the memstore, then there are no edits for
          // this region out in the WAL subsystem so no need to do any trickery clearing out
          // edits in the WAL system. Up the sequence number so the resulting flush id is for
          // sure just beyond the last appended region edit (useful as a marker when bulk loading,
          // etc.)
          // wal can be null replaying edits.
          // 假设假设有memstore仍然没有数据。
          if (wal != null) {
            w = mvcc.beginMemstoreInsert();
            long flushSeqId = getNextSequenceId(wal);
            FlushResult flushResult = new FlushResult(
                FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, flushSeqId, "Nothing to flush");
            w = null;
            return flushResult;
          } else {
            return new FlushResult(FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY,
                "Nothing to flush");
      } finally {
        if (w != null) {

    LOG.info("Started memstore flush for " + this +
      ", current region memstore size " +
      StringUtils.byteDesc(this.memstoreSize.get()) +
      ((wal != null)?

"": "; wal is null, using passed sequenceid=" + myseqid)); // Stop updates while we snapshot the memstore of all of these regions‘ stores. We only have // to do this for a moment. It is quick. We also set the memstore size to zero here before we // allow updates again so its value will represent the size of the updates received // during flush // 当我们更新全部这些region存储的memstore的快照时,停止更新操作。

// 我们这样做一瞬间,它是非常迅速的。

// 在我们同意再次更新时,我们也设置memstore的大小为0,所以它的大小也代表了在flush期间接收到的更新的大小 // 创建多版本号一致性控制器中的写条目 MultiVersionConsistencyControl.WriteEntry w = null; // We have to take an update lock during snapshot, or else a write could end up in both snapshot // and memstore (makes it difficult to do atomic rows then) // 我们须要在快照期间的一个更新锁。否则一个写入终于在快照与内存之前完毕(届时将非常难做原子行的保证) // 获得锁以堵塞并发的更新 // 设置状态跟踪器的状态:获取锁以堵塞并发的更新 status.setStatus("Obtaining lock to block concurrent updates"); // block waiting for the lock for internal flush // 堵塞,等待flush的锁 // 获得updatesLock的写锁。堵塞全部对于该Region的更新操作。 this.updatesLock.writeLock().lock(); long totalFlushableSize = 0; // 设置状态跟踪器的状态:正在准备通过创建存储的快照刷新 status.setStatus("Preparing to flush by snapshotting stores in " + getRegionInfo().getEncodedName()); // 创建两个缓存容器:storeFlushCtxs列表和committedFiles映射集合,用来存储刷新过程中的刷新上下文和已完毕文件路径 List<StoreFlushContext> storeFlushCtxs = new ArrayList<StoreFlushContext>(stores.size()); TreeMap<byte[], List<Path>> committedFiles = new TreeMap<byte[], List<Path>>( Bytes.BYTES_COMPARATOR); // 刷新的序列号ID long flushSeqId = -1L; long trxId = 0; try { try { // mvcc推进一次写操作事务。此时w中的写序号为0 w = mvcc.beginMemstoreInsert(); // 获取刷新序列号ID,假设wal不为空。通过wal取下一个序列号,否则设置为-1 if (wal != null) {// 假设wal不为空 // startCacheFlush实际上做了两件事: // 1、调用closeBarrier.beginOp()方法。确定開始一个flush操作; // 2、Region名相应的近期序列化Id从数据结构 // oldestUnflushedRegionSequenceIds移动到lowestFlushingRegionSequenceIds中 // 疑问:oldestUnflushedRegionSequenceIds中数据是何时放入的?用它来做什么呢? // 在FSHLog的append()方法中。假设entry.isInMemstore()。putIfAbsent放入的 if (!wal.startCacheFlush(this.getRegionInfo().getEncodedNameAsBytes())) { // This should never happen. String msg = "Flush will not be started for [" + this.getRegionInfo().getEncodedName() + "] - because the WAL is closing."; status.setStatus(msg); return new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg); } // Get a sequence id that we can use to denote the flush. It will be one beyond the last // edit that made it into the hfile (the below does not add an edit, it just asks the // WAL system to return next sequence edit). // wal不为空的话。获取下一个序列号 flushSeqId = getNextSequenceId(wal); } else { // use the provided sequence Id as WAL is not being used for this flush. // 这里myseqid传递进来的是-1 flushSeqId = myseqid; } // 循环该Region全部的store。预处理storeFlushCtxs、committedFiles // 1、累加每一个store能够flush的memstore大小至totalFlushableSize; // 2、初始化storeFlushCtxs,为每一个store创建相应的flush上下文信息StoreFlusherImpl实例,这些上下文实例携带了同一个刷新序列号 // 2、将每一个store相应的StoreFlushContext加入到ArrayList列表storeFlushCtxs中。实际生成的是StoreFlusherImpl实例 // 3、将每一个store相应的FamilyName加入到TreeMap集合committedFiles中。以备 // 3、初始化committedFiles:将每一个store相应的列名放置到committedFiles的key中。value临时为null for (Store s : stores.values()) { totalFlushableSize += s.getFlushableSize(); // 这里仅仅是构造一个StoreFlusherImpl对象,该对象仅仅有cacheFlushSeqNum一个变量被初始化为flushSeqId // 然后。加入到storeFlushCtxs列表 storeFlushCtxs.add(s.createFlushContext(flushSeqId)); committedFiles.put(s.getFamily().getName(), null); // for writing stores to WAL } // write the snapshot start to WAL // 在WAL中写一个刷新的開始标记,并获取一个事务ID if (wal != null) { // 事实上就是往WAL中append一条记录:row为Region所在的startKey, // family为METAFAMILY。 // qualifier为HBASE::FLUSH。 // value为FlushDescriptor FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.START_FLUSH, getRegionInfo(), flushSeqId, committedFiles); trxId = WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(), desc, sequenceId, false); // no sync. Sync is below where we do not hold the updates lock } // Prepare flush (take a snapshot) // 循环storeFlushCtxs。为每一个StoreFlushContext做准备工作,主要是生成memstore的快照 for (StoreFlushContext flush : storeFlushCtxs) { /** * 刷新前的准备工作 * 1、获取memstore的快照,并赋值到snapshot。 * 2、获取flush的数目。即待刷新cell数目,并赋值到cacheFlushCount; * 3、获取flush的大小。并赋值到cacheFlushSize; * 4、创建空的已提交文件列表,大小为1。 */ flush.prepare(); } } catch (IOException ex) { if (wal != null) { if (trxId > 0) { // check whether we have already written START_FLUSH to WAL try { FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.ABORT_FLUSH, getRegionInfo(), flushSeqId, committedFiles); WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(), desc, sequenceId, false); } catch (Throwable t) { LOG.warn("Received unexpected exception trying to write ABORT_FLUSH marker to WAL:" + StringUtils.stringifyException(t)); // ignore this since we will be aborting the RS with DSE. } } // we have called wal.startCacheFlush(), now we have to abort it // 我们已经调用了wal的startCacheFlush()方法,如今我们不得不放弃它。 // 1、将Region名相应的SeqId从数据结构lowestFlushingRegionSequenceIds移回至oldestUnflushedRegionSequenceIds // 2、调用closeBarrier.endOp(),终止一个操作 wal.abortCacheFlush(this.getRegionInfo().getEncodedNameAsBytes()); throw ex; // let upper layers deal with it. } } finally { // 快照创建好后,释放写锁updatesLock this.updatesLock.writeLock().unlock(); } // 设置状态跟踪器的状态:完毕了memstore的snapshot创建 String s = "Finished memstore snapshotting " + this + ", syncing WAL and waiting on mvcc, flushsize=" + totalFlushableSize; status.setStatus(s); if (LOG.isTraceEnabled()) LOG.trace(s); // sync unflushed WAL changes // see HBASE-8208 for details if (wal != null) { try { wal.sync(); // ensure that flush marker is sync‘ed } catch (IOException ioe) { LOG.warn("Unexpected exception while wal.sync(), ignoring. Exception: " + StringUtils.stringifyException(ioe)); } } // wait for all in-progress transactions to commit to WAL before // we can start the flush. This prevents // uncommitted transactions from being written into HFiles. // We have to block before we start the flush, otherwise keys that // were removed via a rollbackMemstore could be written to Hfiles. // 在我们能够開始flush之前等待全部进行中的事务提交到WAL。这能够防止未提交的事务被写入HFiles。

// 我们在開始刷新之前。不得不堵塞,否则通过一个rollbackMemstore被删除的keys可能被写入到Hfiles。 // 真正flush之前。先设置一个多版本号一致性控制器的写序号,值为本次flush的序列号 w.setWriteNumber(flushSeqId); // 然后,调用多版本号控制器的方法。等待其它的事务完毕 mvcc.waitForPreviousTransactionsComplete(w); // set w to null to prevent mvcc.advanceMemstore from being called again inside finally block // 设置w为null,防止mvcc.advanceMemstore在finally模块再次被调用 w = null; // 设置状态跟踪器的状态:刷新stores进行中... s = "Flushing stores of " + this; status.setStatus(s); if (LOG.isTraceEnabled()) LOG.trace(s); } finally { if (w != null) { // in case of failure just mark current w as complete // 失败的情况下,标记当前w为已完毕 mvcc.advanceMemstore(w); } } // Any failure from here on out will be catastrophic requiring server // restart so wal content can be replayed and put back into the memstore. // Otherwise, the snapshot content while backed up in the wal, it will not // be part of the current running servers state. boolean compactionRequested = false; try { // A. Flush memstore to all the HStores. // Keep running vector of all store files that includes both old and the // just-made new flush store file. The new flushed file is still in the // tmp directory. // 循环storeFlushCtxs。对每一个StoreFlushContext运行刷新操作flushCache,将数据真正写入文件 for (StoreFlushContext flush : storeFlushCtxs) { // 调用HStore对象的flushCache()方法,将数据真正写入文件 flush.flushCache(status); } // Switch snapshot (in memstore) -> new hfile (thus causing // all the store scanners to reset/reseek). Iterator<Store> it = stores.values().iterator(); // stores.values() and storeFlushCtxs have // same order // 循环storeFlushCtxs,对每一个StoreFlushContext运行commit操作 for (StoreFlushContext flush : storeFlushCtxs) { boolean needsCompaction = flush.commit(status); if (needsCompaction) { compactionRequested = true; } committedFiles.put(it.next().getFamily().getName(), flush.getCommittedFiles()); } storeFlushCtxs.clear(); // Set down the memstore size by amount of flush. // 设置flush之后的memstore的大小 this.addAndGetGlobalMemstoreSize(-totalFlushableSize); if (wal != null) { // write flush marker to WAL. If fail, we should throw DroppedSnapshotException // 将flush标记写入WAL。同一时候运行sync FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.COMMIT_FLUSH, getRegionInfo(), flushSeqId, committedFiles); WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(), desc, sequenceId, true); } } catch (Throwable t) { // An exception here means that the snapshot was not persisted. // The wal needs to be replayed so its content is restored to memstore. // Currently, only a server restart will do this. // We used to only catch IOEs but its possible that we‘d get other // exceptions -- e.g. HBASE-659 was about an NPE -- so now we catch // all and sundry. if (wal != null) { try { FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.ABORT_FLUSH, getRegionInfo(), flushSeqId, committedFiles); WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(), desc, sequenceId, false); } catch (Throwable ex) { LOG.warn("Received unexpected exception trying to write ABORT_FLUSH marker to WAL:" + StringUtils.stringifyException(ex)); // ignore this since we will be aborting the RS with DSE. } wal.abortCacheFlush(this.getRegionInfo().getEncodedNameAsBytes()); } DroppedSnapshotException dse = new DroppedSnapshotException("region: " + Bytes.toStringBinary(getRegionName())); dse.initCause(t); status.abort("Flush failed: " + StringUtils.stringifyException(t)); // Callers for flushcache() should catch DroppedSnapshotException and abort the region server. // However, since we may have the region read lock, we cannot call close(true) here since // we cannot promote to a write lock. Instead we are setting closing so that all other region // operations except for close will be rejected. this.closing.set(true); if (rsServices != null) { // This is a safeguard against the case where the caller fails to explicitly handle aborting rsServices.abort("Replay of WAL required. Forcing server shutdown", dse); } throw dse; } // If we get to here, the HStores have been written. if (wal != null) { // 调用WAL的completeCacheFlush()方法完毕MemStore的flush // 将Region相应的近期一次序列化ID从数据结构lowestFlushingRegionSequenceIds中删除 // 调用closeBarrier.endOp()终止一个操作 wal.completeCacheFlush(this.getRegionInfo().getEncodedNameAsBytes()); } // Record latest flush time // 记录当前时间为上次flush时间 this.lastFlushTime = EnvironmentEdgeManager.currentTime(); // Update the last flushed sequence id for region. TODO: This is dup‘d inside the WAL/FSHlog. // 将本次flush序列号ID赋值给lastFlushSeqId this.lastFlushSeqId = flushSeqId; // C. Finally notify anyone waiting on memstore to clear: // e.g. checkResources(). // 最后唤醒等待memstore的线程 synchronized (this) { notifyAll(); // FindBugs NN_NAKED_NOTIFY } long time = EnvironmentEdgeManager.currentTime() - startTime; long memstoresize = this.memstoreSize.get(); String msg = "Finished memstore flush of ~" + StringUtils.byteDesc(totalFlushableSize) + "/" + totalFlushableSize + ", currentsize=" + StringUtils.byteDesc(memstoresize) + "/" + memstoresize + " for region " + this + " in " + time + "ms, sequenceid=" + flushSeqId + ", compaction requested=" + compactionRequested + ((wal == null)? "; wal=null": ""); LOG.info(msg); // 设置状态追踪状态:完毕 status.setStatus(msg); // 返回flush结果 return new FlushResult(compactionRequested ? FlushResult.Result.FLUSHED_COMPACTION_NEEDED : FlushResult.Result.FLUSHED_NO_COMPACTION_NEEDED, flushSeqId); }


        3、假设没有能够刷新的缓存。直接返回。可是我们须要安全的更新Region的sequence id;
        4、设置状态跟踪器的状态:获取锁以堵塞并发的更新,即Obtaining lock to block concurrent updates。
        6、设置状态跟踪器的状态:正在准备通过创建存储的快照刷新。即Preparing to flush by snapshotting stores in...;
                10.2、 wal不为空的话。获取下一个序列号。赋值给flushSeqId。




















