MemStore刷写线程—MemStoreFlusher源代码分析
Posted skyWalker_ONLY
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了MemStore刷写线程—MemStoreFlusher源代码分析相关的知识,希望对你有一定的参考价值。
在HBase中表由一个或多个Region组成,而Region由一个或者多个Store组成,Store又由一个MenStore和若干个StoreFile组成。无论是向HBase写入数据还是请求读数据,都首先经过MemStore,对于写请求来说就是将数据直接写入MemStore,对于读请求来说就是先检查MenStore中是否包含相应的数据,如果有则直接读取该数据,否则在StoreFile中检索并读取数据。当写入MemStore中的数据达到一定的数量时,就需要将其中的数据刷写到StoreFile中,这是由后台线程自动完成的,负责该任务的主要Java类是MemStoreFlusher,本篇文章将结合该类的源代码学习HBase如何决定是否刷写数据到StoreFile中的,而不关注MemStoreFlusher如何被实例化及启动的(相应的代码位于HRegionServer.java中)以及写入StoreFile的代码,而仅仅关注HBase在满足什么条件的情况将触发刷写。
官方文档将MenStoreFlusher解释为一个线程,而实际上该类既没有继承自Thread也没有实现Runnable接口,但定义了FlushHandler内部类,该类继承自HasThread,而HasThread实现了Runnable接口,在稍后的学习中将会看到FlushHandler就是负责刷写数据到StoreFile中的线程,并会根据参数hbase.hstore.flusher.count的值(默认为2)决定启动该线程的数量。该参数具有比较重要的意义,如果该值较小,则会导致刷写队列中包含过多的待刷写的Region,而如果该值较大的话,则会有较多并行执行刷写的线程,则会增加HDFS的负担,进而引起HBase更加频繁地执行Compaction。在MenStoreFlusher中刷写队列是由下面的数据结构定义的:
private final BlockingQueue<FlushQueueEntry> flushQueue = new DelayQueue<FlushQueueEntry>();
private final Map<HRegion, FlushRegionEntry> regionsInQueue = new HashMap<HRegion, FlushRegionEntry>();
这两个数据结构必须一起使用,若某个FlushQueueEntry在二者中的一个,则在另一个中也必须能够找到该FlushQueueEntry。注意FlushRegionEntry实现了FlushQueueEntry接口,而后者继承自java.util.concurrent.Delayed接口。另一个实现了FlushQueueEntry的类为WakeupFlushThread类,该类的主要用作占位符插入到刷写队列中以确保刷写线程不会休眠。FlushRegionEntry类保存了请求刷写的Region、重试次数以及在刷写队列中存在的时间。刷写队列flushQueue的类型为DelayQueue,保存在该队列中的对象必须实现了Delayed接口,且位于该队列中的对象只有在其延迟过期后才可以被取出,位于队列头部的是延迟过期最长的对象,如果没有延迟过期,队列没有头部,poll操作将返回null。当调用对象的getDelay(TimeUnit.NANOSECONDS)方法返回值小于等于0时,该对象的延迟过期。由于DelayQueue实现了BlockingQueue接口,该接口是线程安全的,因此该队列也就支持BlockingQueue具有的阻塞入列和出列,即当队列为空时,将等待队列直到队列不为空再提取对象,当队列为满时,将等待队列有空闲位置时再插入对象。FlushRegionEntry的该方法实现如下:
@Override
public long getDelay(TimeUnit unit)
return unit.convert(this.whenToExpire - EnvironmentEdgeManager.currentTime(), TimeUnit.MILLISECONDS);
FlushRegionEntry除了实现getDelay方法外,还定义了requeue()方法:
public FlushRegionEntry requeue(final long when)
this.whenToExpire = EnvironmentEdgeManager.currentTime() + when;
this.requeueCount++;
return this;
稍后会介绍为什么会使用DelayQueue这样的数据结构定义刷写队列,现在继续看MemStoreFlusher的源代码。在MemStoreFlusher的构造函数中,读取hbase-site.xml中设置的与刷写相关的参数并赋值给相应的变量,并实例化了刷写线程FlushHandler:
public MemStoreFlusher(final Configuration conf, final HRegionServer server)
super();
this.server = server;
//hbase.server.thread.wakefrequency,默认值10s
this.threadWakeFrequency =conf.getLong(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
long max = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax();
//hbase.regionserver.global.memstore.size的值,默认为0.4,且该值必须小于等于0.8,大于0,即(0.0,0.8]
float globalMemStorePercent = HeapMemorySizeUtil.getGlobalMemStorePercent(conf, true);
this.globalMemStoreLimit = (long) (max * globalMemStorePercent);
//hbase.regionserver.global.memstore.size.lower.limit
this.globalMemStoreLimitLowMarkPercent = HeapMemorySizeUtil.getGlobalMemStoreLowerMark(conf,globalMemStorePercent);
//RS Xmx * hbase.regionserver.global.memstore.size * hbase.regionserver.global.memstore.size.lower.limit
this.globalMemStoreLimitLowMark = (long) (this.globalMemStoreLimit * this.globalMemStoreLimitLowMarkPercent);
//默认值为90s,如果任何一个Store总的StoreFile数量大于hbase.hstore.blockingStoreFiles值,HRegion将阻塞更新直到该参数
//设置的时间到达或者Compaction完成。在该参数设置的时间到期后,即使Compaction没有完成,HRegion也将不再阻塞更新
this.blockingWaitTime = conf.getInt("hbase.hstore.blockingWaitTime", 90000);
int handlerCount = conf.getInt("hbase.hstore.flusher.count", 2);
this.flushHandlers = new FlushHandler[handlerCount];
MemStoreFlusher定义了三个刷写方法,分别为:flushOneForGlobalPressure、flushRegion(final FlushRegionEntry fqe)和flushRegion(finalHRegion region, final boolean emergencyFlush)其中前两个方法最终调用了最后一个方法,也就是只有最后一个方法执行了实际的刷写操作,前两个方法都在FlushHandler刷写线程中调用,分别用于当RegionServer中的所有MemStore的大小超过了RS Xmx *hbase.regionserver.global.memstore.size *hbase.regionserver.global.memstore.size.lower.limit进行的刷写和当某个Region需要执行刷写操作。
首先看看flushOneForGlobalPressure方法的作用,从该方法的名称就可以看出,该方法用于缓解全局MemStore缓存的压力,在FlushHandler刷写线程的run方法中,当刷写队列中没有要求立刻执行刷写的Region时,刷写线程会判断所有MemStore的大小是否大参数设置的阈值,如果是,则执行该方法。在该方法中,刷写是按照Region的大小进行的,但不一定对最大的Region执行刷写,还需要进一步判断。判断主要是由方法getBiggestMemstoreRegion(SortedMap<Long, HRegion> regionsBySize, Set<HRegion>excludedRegions, boolean checkStoreFileCount)完成的,该方法的第三个参数决定是否检查当前Region的StoreFile文件数量(决定是否执行Compact操作),如果当前Region的StoreFile数量超过了参数hbase.hstore.blockingStoreFiles设置的值(默认值为10),且checkStoreFileCount的值为true,则当前Region不执行刷写,其它不执行刷写的情况还包括当前Region正在执行刷写或者当前Region不可写。在flushOneForGlobalPressure中,对getBiggestMemstoreRegion连续调用两次,一次检查StoreFile的数量,一次不检查,这样的话,不检查StoreFile数量的调用返回的Region的大小总是大于等于检查StoreFile数量的调用返回的Region,在源代码中,前者声明为bestAnyRegion,后者声明为bestFlushableRegion。如果当前Region包含有过多的StoreFile,则bestFlushableRegion的大小小于当前Region,否则bestAnyRegion和bestFlushableRegion都为当前Region,因而二者也相等。如果bestFlushableRegion不为null(bestAnyRegion肯定不为null),且拥有过多StoreFile的bestAnyRegion的大小比bestFlushableRegion大2倍多,则选择bestAnyRegion为要刷写的Region,这是因为如果选择较小的bestFlushableRegion为要刷写的Region则会导致产生较多较小的StoreFile,从而引起Compaction。如果bestFlushableRegion为null,则bestAnyRegion为要刷写的Region,如果bestFlushableRegion且其大小大于等于bestAnyRegion的一半,则bestFlushableRegion为要刷写的Region。在确定了要执行刷写的Region后,将会调用flushRegion(regionToFlush, true)方法执行刷写操作。如果刷写不成功,则重复上述的步骤,并将当前执行刷写的Region排除在判断挑选之外。
在刷写线程中调用的另一个刷写方法是flushRegion(final FlushRegionEntry fqe),该方法用于检查指定的Region是否有过多的StoreFile,如果否的话则调用flushRegion(finalHRegion region, final boolean emergencyFlush)执行刷写,否则的话再根据该Region是否等待了足够长的时间来决定执行刷写还是重新入列。这里足够长的时间指的是参数hbase.hstore.blockingWaitTime设置的值(默认为90s),该参数用于设置阻塞Region更新的时间,当Region的StoreFile数量超过了参数hbase.hstore.blockingStoreFiles设置的值,Region将阻塞更新而执行Compaction操作,当Compaction完成或者阻塞的时间超过了hbase.hstore.blockingWaitTime设置的值,Region将不再阻塞更新。如果该Region等待了足够的时间,即使该Region拥有过多的StoreFile,也将会对该Region执行刷写操作。如果该Region还没有等待hbase.hstore.blockingWaitTime设置的值,则先判断该Region重新入列的次数,如果该Region是第一次从刷写队列中取出(即该队列没有重复入列),再判断该Region是否需要执行split操作,如果不需要执行split操作,则请求执行Compaction(由于存在太多StoreFile)。对于没有等待足够长时间的Region,将该队列重新入列(此时重新入列的次数将会加1),并设置延迟时间为百分之一hbase.hstore.blockingWaitTime的值,这样该Region至少要在刷写队列中等待百分之一hbase.hstore.blockingWaitTime才能够被再次取出,进而给合并线程时间来完成对StoreFile的合并,当该Region再次有机会执行刷写时,就有可能StoreFile的数量降低到hbase.hstore.blockingStoreFiles之下,进而可刷写数据到StoreFile中。从对该方法的描述中,可以知道为什么使用DelayQueue做为刷写队列的实现,就是为了让刷写队列中的Region必须等待设置的时间,以让合并线程有时间完成合并StoreFile的任务。
上述两个方法都没有实现实际的刷写任务,而将刷写任务交给了flushRegion(final HRegion region, finalboolean emergencyFlush)方法。该方法的第二个参数的值由调用者决定,在上述的第一个方法中为true(因为所有MemStore的大小已经超过了设置的阈值),第二个方法中为false(因为仅仅是某个Region需要执行刷写操作)。如果第二个方法的参数为true,则将该Region从刷写队列中移除,如果为false,则DelayQueue的poll方法移除该Region。在该方法中调用了HRegion的flushcache方法(此处不关心具体是如何刷写数据到StoreFile,仅关注MemStoreFlusher中的实现,后续会详细研究flushcache的具体实现),flushcache的返回结果为HRegion.FlushResult,可能的值为:FLUSHED_NO_COMPACTION_NEEDED、FLUSHED_COMPACTION_NEEDED、CANNOT_FLUSH_MEMSTORE_EMPTY、CANNOT_FLUSH,接着根据返回结果判断是否要执行Compaction和Split,只能执行二者中的一种操作,无论Compaction还是Split都是由CompactSplitThread类完成的。当刷写成功完成后,将会记录刷写的完成时间。
在MemStoreFlusher中还有一个方法用于阻塞Region的更新,该方法就是reclaimMemStoreMemory。该方法将检查当前RegionServer中所有MemStore的大小是否超过了参数hbase.regionserver.global.memstore.size设置的值,如果超过了该值,则会阻塞更新5秒钟,当但并不影响刷写线程的执行。由于当所有MemStore的值高于MemStore的最高水位时自然大于MemStore的低水位,所以在刷写线程的run方法中同样会调用flushOneForGlobalPressure方法以选择合适的Region并对其所拥有的MemStore进行刷写。每当对Region进行修改操作时都将调用reclaimMemStoreMemory方法,这将导致检查所有MemStore的大小,并在条件满足时阻塞更新操作。
总结,本篇文章仅基于 MemStoreFlusher的源代码及少量其他类的源代码对刷写线程及所涉及的方法进行了详细学习,仅包含了部分触发刷写MemStore的原因,比如当所有的MemStore的大小超过低水位或者最高水位的情况,以及对单个Region执行刷写的情况,对后者有一点需要补充的时,按照Hbase Reference Guide及hbase-default.xml的说明,当Region的某个MemStore大于hbase.hregion.memstore.flush.size的值时,会对该Region执行刷写,而从HRegion的源代码中发现是当前Region中所有的MemStore的大小超过了该参数的时请求刷写该Region,这一点还需要再进一步阅读相关源代码来确定真实情况。以上是关于MemStore刷写线程—MemStoreFlusher源代码分析的主要内容,如果未能解决你的问题,请参考以下文章