ES源码分析强制合并分段(_forcemerge API)源码分析
Posted 顧棟
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ES源码分析强制合并分段(_forcemerge API)源码分析相关的知识,希望对你有一定的参考价值。
_forcemerge API 源码分析
文章目录
源码基于6.7.2
合并方式
RestForceMergeAction
中提供了对api的处理入口。在做了简单的参数处理之后,将请求转发给TransportForceMergeAction
进行进一步的处理,里面有一个分片级的方法shardOperation
。通过indicesService
找到对应的shard
,再执行InternalEngine.forceMerge
。
public void forceMerge(final boolean flush, int maxNumSegments, boolean onlyExpungeDeletes,
final boolean upgrade, final boolean upgradeOnlyAncientSegments) throws EngineException, IOException
assert indexWriter.getConfig().getMergePolicy() instanceof ElasticsearchMergePolicy : "MergePolicy is " +
indexWriter.getConfig().getMergePolicy().getClass().getName();
// 获取合并策略
ElasticsearchMergePolicy mp = (ElasticsearchMergePolicy) indexWriter.getConfig().getMergePolicy();
optimizeLock.lock();
try
ensureOpen();
if (upgrade)
logger.info("starting segment upgrade upgradeOnlyAncientSegments=", upgradeOnlyAncientSegments);
mp.setUpgradeInProgress(true, upgradeOnlyAncientSegments);
store.incRef(); // increment the ref just to ensure nobody closes the store while we optimize
try
// 将合并分为,1.只合并删除文档 2.没有配置最大segment数的合并 3.配置了最大segment数的合并
if (onlyExpungeDeletes)
assert upgrade == false;
indexWriter.forceMergeDeletes(true /* blocks and waits for merges*/);
else if (maxNumSegments <= 0)
assert upgrade == false;
indexWriter.maybeMerge();
else
indexWriter.forceMerge(maxNumSegments, true /* blocks and waits for merges*/);
if (flush)
if (tryRenewSyncCommit() == false)
flush(false, true);
if (upgrade)
logger.info("finished segment upgrade");
finally
store.decRef();
catch (AlreadyClosedException ex)
/* in this case we first check if the engine is still open. If so this exception is just fine
* and expected. We don't hold any locks while we block on forceMerge otherwise it would block
* closing the engine as well. If we are not closed we pass it on to failOnTragicEvent which ensures
* we are handling a tragic even exception here */
ensureOpen(ex);
failOnTragicEvent(ex);
throw ex;
catch (Exception e)
try
maybeFailEngine("force merge", e);
catch (Exception inner)
e.addSuppressed(inner);
throw e;
finally
try
// reset it just to make sure we reset it in a case of an error
mp.setUpgradeInProgress(false, false);
finally
optimizeLock.unlock();
通过上述代码可以发现,在ES实现的分段文件合并中,主要分为三类:
- 只合并删除文档
- 没有限制最大segment数的合并
- 限制了最大segment数的合并
只合并删除文档
这是一个阻塞和等待的合并。先进行将所有内存中缓冲的更新(添加和删除)刷新到目录,同时会去触发可能合并的逻辑,更新合并的等待列表。当调用 forceMergeDeletes 时,我们仅在其删除百分比超过此阈值时才合并掉一个段。 默认值为 10%。
IndexWriter.forceMergeDeletes
public void forceMergeDeletes(boolean doWait)
throws IOException
// 确保索引的分片是一个开启可用的状态
ensureOpen();
// 先将目前的内存中的缓存更新到物理文件中
flush(true, true);
if (infoStream.isEnabled("IW"))
infoStream.message("IW", "forceMergeDeletes: index now " + segString());
final MergePolicy mergePolicy = config.getMergePolicy();
// 这是一个静态内部类,定义了合并规范
MergePolicy.MergeSpecification spec;
boolean newMergesFound = false;
synchronized(this)
// 通过EsTieredMergePolicy类中的forcedMergePolicy对象去寻找可以合并的segment,在EsTieredMergePolicy实例化TieredMergePolicy时,把段文件大小从Lucene中默认的5GB改为了无限制。
spec = mergePolicy.findForcedDeletesMerges(segmentInfos, this);
newMergesFound = spec != null;
if (newMergesFound)
final int numMerges = spec.merges.size();
for(int i=0;i<numMerges;i++)
//检查此合并是否涉及已参与合并的段。 如果不是,则此合并是“注册的”,这意味着我们记录其段现在正在参与合并,并返回 true。 否则(合并冲突)返回 false。
registerMerge(spec.merges.get(i));
// 合并线程进行段文件的合并
mergeScheduler.merge(this, MergeTrigger.EXPLICIT, newMergesFound);
if (spec != null && doWait)
final int numMerges = spec.merges.size();
synchronized(this)
boolean running = true;
while(running)
if (tragedy.get() != null)
throw new IllegalStateException("this writer hit an unrecoverable error; cannot complete forceMergeDeletes", tragedy.get());
// 检查 MergePolicy 要求我们执行的每个合并,以查看其中是否有任何仍在运行,以及是否有任何遇到异常。
running = false;
for(int i=0;i<numMerges;i++)
final MergePolicy.OneMerge merge = spec.merges.get(i);
if (pendingMerges.contains(merge) || runningMerges.contains(merge))
running = true;
Throwable t = merge.getException();
if (t != null)
throw new IOException("background merge hit exception: " + merge.segString(), t);
// If any of our merges are still running, wait:
if (running)
doWait();
// 注意:在 ConcurrentMergeScheduler 的情况下,当 doWait 为 false 时,我们可以在后台线程完成合并时立即返回
TieredMergePolicy.findForcedDeletesMerges
@Override
public MergeSpecification findForcedDeletesMerges(SegmentInfos infos, MergeContext mergeContext) throws IOException
if (verbose(mergeContext))
message("findForcedDeletesMerges infos=" + segString(mergeContext, infos) + " forceMergeDeletesPctAllowed=" + forceMergeDeletesPctAllowed, mergeContext);
// First do a quick check that there's any work to do.
// NOTE: this makes BaseMergePOlicyTestCase.testFindForcedDeletesMerges work
final Set<SegmentCommitInfo> merging = mergeContext.getMergingSegments();
boolean haveWork = false;
for(SegmentCommitInfo info : infos)
// 计算一个segment中被标识删除文档的个数
int delCount = mergeContext.numDeletesToMerge(info);
assert assertDelCount(delCount, info);
// 计算已删除文档数在segment中的百分比
double pctDeletes = 100.*((double) delCount)/info.info.maxDoc();
// forceMergeDeletesPctAllowed的默认值是10,即需要已删除文档数的占比超过10%,才可以合并
if (pctDeletes > forceMergeDeletesPctAllowed && !merging.contains(info))
haveWork = true;
break;
if (haveWork == false)
return null;
// 当我们在这里运行时,大小可以并发地改变,因为删除现在是并发地应用的,这可能会导致TimSort失败!因此,我们在每个段调用size()一次,并根据它排序:
List<SegmentSizeAndDocs> sortedInfos = getSortedBySegmentSize(infos, mergeContext);
Iterator<SegmentSizeAndDocs> iter = sortedInfos.iterator();
while (iter.hasNext())
SegmentSizeAndDocs segSizeDocs = iter.next();
double pctDeletes = 100. * ((double) segSizeDocs.delCount / (double) segSizeDocs.maxDoc);
// 如果此段正在合并或者目前的已删除文档占比低于阈值则不对此段文件进行合并
if (merging.contains(segSizeDocs.segInfo) || pctDeletes <= forceMergeDeletesPctAllowed)
iter.remove();
if (verbose(mergeContext))
message("eligible=" + sortedInfos, mergeContext);
// maxMergeAtOnceExplicit显示默认一次合并的最大段数 默认30
// maxMergedSegmentBytes 段的最大大小值
return doFindMerges(sortedInfos, maxMergedSegmentBytes,
maxMergeAtOnceExplicit, Integer.MAX_VALUE, 0, MERGE_TYPE.FORCE_MERGE_DELETES, mergeContext, false);
TieredMergePolicy.doFindMerges
private MergeSpecification doFindMerges(List<SegmentSizeAndDocs> sortedEligibleInfos,
final long maxMergedSegmentBytes,
final int mergeFactor, final int allowedSegCount,
final int allowedDelCount, final MERGE_TYPE mergeType,
MergeContext mergeContext,
boolean maxMergeIsRunning) throws IOException
List<SegmentSizeAndDocs> sortedEligible = new ArrayList<>(sortedEligibleInfos);
Map<SegmentCommitInfo, SegmentSizeAndDocs> segInfosSizes = new HashMap<>();
for (SegmentSizeAndDocs segSizeDocs : sortedEligible)
segInfosSizes.put(segSizeDocs.segInfo, segSizeDocs);
int originalSortedSize = sortedEligible.size();
if (verbose(mergeContext))
message("findMerges: " + originalSortedSize + " segments", mergeContext);
if (originalSortedSize == 0)
return null;
final Set<SegmentCommitInfo> toBeMerged = new HashSet<>();
MergeSpecification spec = null;
// 循环以可能选择多个合并:
// 索引中全部删除文档的触发点同时导致了一堆大段合并。 因此,在每个周期的合并列表中只放置一个大合并。 下次我们将进行另一个合并。
boolean haveOneLargeMerge = false;
while (true)
// Gather eligible segments for merging, ie segments
// not already being merged and not already picked (by
// prior iteration of this loop) for merging:
// 删除不合格的片段。 这些要么已经被合并,要么已经被先前的迭代选中
Iterator<SegmentSizeAndDocs> iter = sortedEligible.iterator();
while (iter.hasNext())
SegmentSizeAndDocs segSizeDocs = iter.next();
if (toBeMerged.contains(segSizeDocs.segInfo))
iter.remove();
if (verbose(mergeContext))
message(" allowedSegmentCount=" + allowedSegCount + " vs count=" + originalSortedSize + " (eligible count=" + sortedEligible.size() + ")", mergeContext);
if (sortedEligible.size() == 0)
return spec;
final int remainingDelCount = sortedEligible.stream().mapToInt(c -> c.delCount).sum();
// allowedSegCount每层分段的数量,如果合并类型是NATURAL,当需要处理的段小于每层允许的分段数量和剩余删除的文档小于允许删除文档阈值,则这批次合并选择结束
if (mergeType == MERGE_TYPE.NATURAL &&
sortedEligible.size() <= allowedSegCount &&
remainingDelCount <= allowedDelCount)
return spec;
// OK we are over budget -- find best merge!
MergeScore bestScore = null;
List<SegmentCommitInfo> best = null;
boolean bestTooLarge = false;
long bestMergeBytes = 0;
for (int startIdx = 0; startIdx < sortedEligible.size(); startIdx++)
long totAfterMergeBytes = 0;
final List<SegmentCommitInfo> candidate = new ArrayList<>();
boolean hitTooLarge = false;
long bytesThisMerge = 0;
// 合并因子mergeFactor一次合并的段的数量
for (int idx = startIdx; idx < sortedEligible.size() && candidate.size() < mergeFactor && bytesThisMerge < maxMergedSegmentBytes; idx++)
final SegmentSizeAndDocs segSizeDocs = sortedEligible.get(idx);
final long segBytes = segSizeDocs.sizeInBytes;
if (totAfterMergeBytes + segBytes > maxMergedSegmentBytes)
// 超过最大限制值时,将此次合并定义为超大合并
hitTooLarge = true;
if (candidate.size() == 0)
// We should never have something coming in that _cannot_ be merged, so handle singleton merges
candidate.add(segSizeDocs.segInfo);
bytesThisMerge += segBytes;
// 注意:我们继续,以便我们可以尝试将较小的段“打包”到此合并中,看看我们是否可以接近最大大小; 这通常并不完美,因为这实际上是“装箱”,我们必须尝试不同的排列。
continue;
candidate.add(segSizeDocs.segInfo);
bytesThisMerge += segBytes;
totAfterMergeBytes += segBytes;
// We should never see an empty candidate: we iterated over maxMergeAtOnce
// segments, and already pre-excluded the too-large segments:
assert candidate.size() > 0;
// 单例的合并如果不处理删除标识的文档,那么意义不大,当执行forceMerge时可以触发这个循环
if (candidate.size() == 1)
SegmentSizeAndDocs segSizeDocs = segInfosSizes.get(candidate.get(0));
if (segSizeDocs.delCount == 0)
continue;
// 如果我们没有找到一个太大的合并并且有一个长度小于合并因子的候选列表,这意味着我们到达了段列表的尾部并且只会找到更小的合并。 此轮寻找需要合并的端的过程到此结束
if (bestScore != null &&
hitTooLarge == false &&
candidate.size() < mergeFactor)
break;
// 对此次寻找的结果进行打分
final MergeScore score = score(candidate, hitTooLarge, segInfosSizes);
if (verbose(mergeContext))
message(" maybe=" + segString(mergeContext, candidate) + " score=" + score.getScore() + " " + score.getExplanation() + " tooLarge=" + hitTooLarge + " size=" + String.format(Locale.ROOT, "%.3f MB", totAfterMergeBytes/1024./1024.), mergeContext);
<以上是关于ES源码分析强制合并分段(_forcemerge API)源码分析的主要内容,如果未能解决你的问题,请参考以下文章