事件驱动的HLog写入模型

Posted bryceforphy

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了事件驱动的HLog写入模型相关的知识,希望对你有一定的参考价值。

HLog的作用:

HBase写入数据时会同时写入到WAL和Memstore中,其中Memstore是位于内存中的store,类似于写缓存,当Memstore的大小超过限定的阈值时会触发flush行为,将内存中的数据刷写到磁盘做持久化。其中的wal也称为hlog,作用类似于mysql中的binlog,记录了客户端的每次update动作,只有当wal写入成功之后,本次写事务才会返回。

我们知道内存中的数据是易失的,当regionserver宕机时,HMaster会切割按region切割宕机regionserver上的hlog,并将它分发到region被迁移到的新regionserver上以恢复该在memstore中还未来得及刷盘的数据。相应地如果hlog里面的记录已经完成了flush,则该hlog会被regionServer移动到.oldlog目录下,由HMaster上的定时线程LogCleaner周期性地扫描该目录,删除掉不再使用的hlog。

此外hlog还有一个意义就是用于hbase的replication,hbase的replication是通过将主集群的hlog推送到备集群,然后在备集群上reply来实现的。这个推送过程是异步完成的,因此会存在.oldlog目录下的hlog还未被replication推送完成的情况,此时HMaster会将这些未推送完成的hlog记录在zk上。方便在清除.oldlog目录时跳过有zk指向的hlog文件。

WALFactory类:

先从WALFactory开始分析,HRegionServer中管理着一个WALFactory变量,定义的格式如下:

protected volatile WALFactory walFactory

下面分析一下walFactory在regionserver中的应用姿势,首先在RegionServer中维护着一个与Master之间的心跳逻辑,这段代码在RegionServer的主循环run()里,如下所示:

while(keepLooping()) {
	RegionServerStartupResponse w = repartForDuty();
	if (w == null) {
		this.sleeper.sleep();
	} else {
		handleReportForDutyResponse(w);
		break;
	}
}
reportForDuty是RegionServer向master上报注册信息,master会回应一个key/value格式的信息给regionserver,以标识regionserver本次register成功。

regionserver收到master的回应消息后,开始调用handleReportForDutyResponse,这个函数的主要逻辑列在下面:

try{
     根据master的返回值处理hostname逻辑;
     ZNodeClearer.writeMyEphemeralNodeOnDisk(getMyEphemeralNodePath());
     
     this.walFactory = setupWALAndReplication();          //启动walFactory
     初始化metricsRegionServer;
     startServiceThreads();                              //启动各路服务线程;
     startHeapMemoryManager();                   //启动内存管理了;
                         
     synchronized (online) {
          online.set(true);
          online.notifyAll();
     }
}

setupWALAndReplication创建并返回了WALFactory类,setupWALAndRepliaction中的几个主要步骤摘录如下:

private WALFactory setupWALAndReplication() throws IOException {
     final Path oldLogDir = new Paht(……..)               //获取old WAL日志的路径
     final String logName=DefaultWALProvider.getWALDirectoryName(this.serverName.toString());
               
     Path lodger = new Path(rootDir,  logName);
               
     //如果设置了replication相关,初始化replication manager
     createNewReplicationInstance(conf, this, this.fs, logdir, oldLogDir);

     listeners添加,添加了WALActionsListener;
     final List<WALActionsListener> listeners = new ArrayList<>();
     return new WALFactory(conf, listeners, serverName.toString);
}
WALFactory的构造函数中除了设置超时时间等之外,还初始化了一个DefaultWALProvider类型的变量,几乎所有与wal文件操作相关的方法都定义在这个接口类中。它里面分别定义了Reader&Writer,用于对WAL文件的读写。WALFactory中提供了两个接口createReader&createWriter,实际也是初始化了DefaultWALProvider中的这两个类。Reader&Writer实现了对hlog文件的读写。

DefaultWALProvider中还包括了一个FSHLog类型的成员变量,FSHLog管理了将WAL持久化的线程模型。下面详细分析FSHLog的实现。

FSHLog与HLog写入模型:

首先从hbase的写入路径入手分析,前面分析过客户端的put操作在服务端最终调用的是doMiniBatchMutation。数据在被成功写入到memstore之后,会收集此次写入动作的table name、region info等信息并构造成一个HlogKey结构的对象记为walkey,并将当前写入的数据作为walEdit,然后将walkey和walEdit共同组装成一个entry之后将之append到内存中一个ringbuffer类型的缓冲区中。返回值txid用于标识本次写事务在缓冲区的写入序号。

if (walEdit.size() > 0) {
   walKey = new ReplayHLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
         this.htableDescriptor.getTableName(), now, m.getClusterIds(),
         currentNonceGroup, currentNonce);
   txid = this.wal.append(this.htableDescriptor,  this.getRegionInfo(),  walKey,
         walEdit, getSequenceId(), true, null);
   walEdit = new WALEdit(isInReplay);
   walKey = null;
}
待所有的锁释放之后,再将buffer中的数据刷写到磁盘,最后将版本号向前推进,提交本次写事务,其中刷写磁盘的代码如下:
if(taxied != 0) {
    syncOrDefer(txid, durabiliby);
}

syncOrDefer会根据客户端设置的持久化级别选择是否将日志数据落盘,其中client可以选择的wal持久化等级划分为如下四个等级:

SKIP_WAL:数据只写memstore,不写Hlog,此时写入性能最高,但是数据有丢失风险;

ASYNC_WAL:异步将数据写入HLog文件中;

SYNC_WAL:同步将数据写入日志文件,但是此时数据只是被写入文件系统缓存,并没有真正落盘;

FSYNC_WAL:同步将数据写入日志文件并强制落盘,可以保证数据不丢失,但是性能最差;

客户端可以通过如:put.setDurability(Durability.SYNC_WAL)设置WAL持久化级别,不设置时默认是SYNC_WAL。syncOrDefer通过调用this.wal.sync(txid)将数据落盘,

经过前面的分析,可以看到hlog的写入最终是调用了FSHLog向外暴露的append()&sync()两个方法来实现的,可见FSHLog中实现了hlog的写线程模型。因此想要分析写线程模型的实现,分析的入口就在上面两个方法,在具体分析方法细节前,先看看构造FSHLog时都初始化了哪些变量:

this.appendExecutor = Executors.
   newSingleThreadExecutor(Threads.getNamedThreadFactory(hostingThreadName + ".append"));
    
final int preallocatedEventCount =
   this.conf.getInt("hbase.regionserver.wal.disruptor.event.count", 1024 * 16);
this.disruptor =
   new Disruptor<RingBufferTruck>(RingBufferTruck.EVENT_FACTORY, preallocatedEventCount,
       this.appendExecutor, ProducerType.MULTI, new BlockingWaitStrategy());
this.disruptor.getRingBuffer().next();
this.ringBufferEventHandler =
   new RingBufferEventHandler(conf.getInt("hbase.regionserver.hlog.syncer.count", 5),
       maxHandlersCount);
this.disruptor.handleExceptionsWith(new RingBufferExceptionHandler());
this.disruptor.handleEventsWith(new RingBufferEventHandler [] {this.ringBufferEventHandler});
this.syncFuturesByHandler = new ConcurrentHashMap<Thread, SyncFuture>(maxHandlersCount);
this.disruptor.start();

其中最关键的就是变量disruptor,它是一个Disruptor<RingBufferTruck>类型的成员变量,Distuptor是LMAX开发的一个高性能无锁队列,本质还是个生产者-消费者模型,它采用一个环形数组结构,称为RingBuffer来复用内存,同时Buffer上的读写序列号经过优化可以避免伪共享,多线程并发访问该序列号时通过CPU级别的CAS自旋来获得,以此实现了lock free。这样只要buffer中有事件就会被递交到消费者线程池去处理。

回头看disruptor构造函数中的各参数的含义,首先第一个参数指定了通过Disruptor交换的事件类型,这里定义为RingBufferTruck类型,参数EVENT_FACTORY指代事件工厂,用于Disruptor通过该工厂在RingBuffer中预创建Event实例。参数preallocatedEventCount指定了ringBuffer的大小。ProducerType是数据生产方式,客户端写入数据时,调用this.wal.append()方法实际上就是以生产者的身份将数据写入到RingBuffer中。append关键代码如下:

public long append(final HTableDescriptor htd, final HRegionInfo hri, final WALKey key,
      final WALEdit edits, final AtomicLong sequenceId, final boolean inMemstore, 
      final List<Cell> memstoreCells) throws IOException {
    
    FSWALEntry entry = null;
    long sequence = this.disruptor.getRingBuffer().next();
    try {
      RingBufferTruck truck = this.disruptor.getRingBuffer().get(sequence);
      
      entry = new FSWALEntry(sequence, key, edits, sequenceId, inMemstore, htd, hri, memstoreCells);  //用key和edits构造一个对象entry
      truck.loadPayload(entry, scope.detach());   //将上面构造的对象包装为RingBufferTruck事件并添加到Ring Buffer
    } finally {
      this.disruptor.getRingBuffer().publish(sequence);
    }
    return sequence;
}
消费者线程由appendExecutor指定,这里用到的是newSingleThreadExecutor定义的单线程线程。BlockingWaitStrategy()指定了consumer的等待策略。appendExecutor并不处理具体的event,而是从Ringbuffer中接收之后将它转交给后端的ringBufferEventHandler来处理,因为appendExecutor中不包含事件处理逻辑,所以非常轻量,只需一个线程就可以处理生产端高并发的请求。

真正的事件处理在ringBufferEventHandler中完成,如下面定义,hbase中默认是5个handler:

this.ringBufferEventHandler = new 
	RingBufferEventHandler(conf.getInt(“hbase.regionserver.hlog.syncer.count”,5), maxHandersCount);

从RingBufferEventHandler起分析event的处理逻辑,每个RingBufferEventHandler中定义了两组主要的线程数组,如下所示:

class RingBufferEventHandler implements EventHandler<RingBufferTruck>, LifecycleAware {
    private final SyncRunner [] syncRunners;
    private final SyncFuture [] syncFutures;
    
    //其它变量
}

每接收到RingBufferTruck事件,RingBufferEventHandler便会调用onEvent对该事件进行处理,主要的处理逻辑代码列在了下面:

public void onEvent(final RingBufferTruck truck, final long sequence, boolean endOfBatch)
    throws Exception {
      
      try {
        if (truck.hasSyncFuturePayload()) {
          this.syncFutures[this.syncFuturesCount++] = truck.unloadSyncFuturePayload();        
          if (this.syncFuturesCount == this.syncFutures.length) endOfBatch = true;
        } else if (truck.hasFSWALEntryPayload()) {
          TraceScope scope = Trace.continueSpan(truck.unloadSpanPayload());
          try {
            append(truck.unloadFSWALEntryPayload());
          } catch (Exception e) {
            。。。。
          } 
        } else {
          return;
        }

        int index = Math.abs(this.syncRunnerIndex++) % this.syncRunners.length;
        try {
          this.syncRunners[index].offer(sequence, this.syncFutures, this.syncFuturesCount);
        } catch (Exception e) {
          cleanupOutstandingSyncsOnException(sequence, e);
          throw e;
        }
        attainSafePoint(sequence);
        this.syncFuturesCount = 0;
      } catch (Throwable t) {
        LOG.error("UNEXPECTED!!! syncFutures.length=" + this.syncFutures.length, t);
      }
}

RingBufferTruck中封装可能封装了两种不同类型的对象,分别是WALEntry或者SyncFuture,消费线程的执行方法onEvent()中对上述对象的处理也不同,如果是WALEntry,则调用append方法,使用writer将输入的WALEntry经protobuf序列化后写入hadoop文件。如果是SyncFuture,则把该对象放入RingBufferEventHandler自身维护的SyncFutures[]数组中。

然后,onEvent从syncRunners[]中取出一个线程,并调用它的offer方法,offer中将该EventHandler中的所有syncFuture添加到SyncRunner自身维护的阻塞队列中,在syncRunner线程的run方法里等到写满一批syncFuture之后,会调用writer.sync()将数据落盘,待数据成功刷到磁盘后,释放syncFuture,并将其中的scope置位。之所以如此设计,是因为对比客户端的屡次append操作,刷盘是相对比较耗时的,以此采用写文件缓存并结合异步刷盘的方式平衡对client端API的友好和客户端写吞吐。run方法简化后的主干代码如下:

public void run() {
      long currentSequence;
      while (!isInterrupted()) {
        int syncCount = 0;
        SyncFuture takeSyncFuture;
        try {
          while (true) {
            takeSyncFuture = this.syncFutures.take();
            currentSequence = this.sequence;
            long syncFutureSequence = takeSyncFuture.getRingBufferSequence();
            long currentHighestSyncedSequence = highestSyncedSequence.get();
            if (currentSequence < currentHighestSyncedSequence) {
              syncCount += releaseSyncFuture(takeSyncFuture, currentHighestSyncedSequence, null);
              continue;
            }
            break;
          }
         
          try {
            writer.sync();
            currentSequence = updateHighestSyncedSequence(currentSequence);
          } catch (IOException e) {
            。。。。。
          } catch (Exception e) {
            。。。。。
          } finally {
            takeSyncFuture.setSpan(scope.detach());
            syncCount += releaseSyncFuture(takeSyncFuture, currentSequence, t);
            syncCount += releaseSyncFutures(currentSequence, t);
          }
        } catch (InterruptedException e) {
          Thread.currentThread().interrupt();
        } catch (Throwable t) {
          LOG.warn("UNEXPECTED, continuing", t);
        }
      }
    }
}

需要注意的是writer.sync()的预处理,其取出当前已处理的最大sequence与本次待处理syncFuture中的sequence相对比,sequence是按照事务提交的顺序递增赋值的,事务append到缓存的顺序也是与sequence的赠序一致,如果当前sequence小于最大已提交sequence,则表明hlog中已写入相应记录,因此调用releaseSyncFuture()释放syncFuture。

还有一个问题,syncFuture中scope的置位是什么时候来查的呢,答案就是FSHLog向外暴露的this.wal.sync(txid)方法,客户端写操作调用sync后会阻塞等待数据刷盘成功,sync中调用syncFuture的get后阻塞在文件系统的同步操作上,当文件系统将数据落盘完成之后,get方法返回,并将syncFuture中置位的scope返回给客户端。客户端工作线程被唤醒,返回继续写入memstore,完成一次写操作。

private Span blockOnSync(final SyncFuture syncFuture) throws IOException {
    try {
      syncFuture.get();
      return syncFuture.getSpan();
    } catch (InterruptedException ie) {
      。。。。
    } catch (ExecutionException e) {
      。。。。
    }
}

总而言之,WAL的写入模型是一个多消费者单生产者模型,生产者调用的方法append(),将包装好的WALEdit写入到线程安全的消息队列RingBuffer,同时只有一个消费者从这个队列中拉取数据并调用sync()方法把数据异步刷写到磁盘,单消费者保证了WAL日志并发写入时日志的全局顺序唯一,同时采用无锁队列Disruptor RingBuffer保证了写入端(生产者)的高吞吐低延时。

LogRoller:

LogRoller是个定期执行的线程。每个RegionServer中都有一个LogRoller线程,线程执行的周期由hbase.regionserver.logroll.period给出,默认时间是1hr。也就是说每过一个小时会产生一个新的hlog文件,hlog的文件名由regionserver名称+hlog形成时的时间戳构成。

LogRoller的run方法中的主要流程如下面列出:

rollLock.lock();
try {
     for (Entry<WAL, Boolean> entry : walNeedsRoll.entrySet()) {
          final WAL wal = entry.getKey();
          final byte [][] regionsToFlush = wal.rollWriter(periodic || entry.getValue().booleanValue());
          walNeedsRoll.put(wal, Boolean.FALSE);
          if (regionsToFlush != null) {
               for (byte [] r : regionsToFlush) scheduleFlush(r);
          }
     }
} finally {
     try {
          rollLog.set(false); 
     } finally {
          rollLock.unlock();
     }
}
在第四行中调用了HLog的rollWriter,rollWriter中会打开一个新的hdfs文件供log写入,并将old的hlog文件关闭。


以上是关于事件驱动的HLog写入模型的主要内容,如果未能解决你的问题,请参考以下文章

事件驱动模型

11.python并发入门(part13 了解事件驱动模型))

Python并发编程-事件驱动模型

事件驱动模型

事件驱动模型

python入门第三十四天--事件驱动模型