Elasticsearch源码 节点关闭分析

Posted 衣舞晨风

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Elasticsearch源码 节点关闭分析相关的知识,希望对你有一定的参考价值。

带着疑问学源码,第六篇:Elasticsearch 节点关闭分析
代码分析基于:https://github.com/jiankunking/elasticsearch
Elasticsearch 7.10.2+

目的

在看源码之前先梳理一下,自己对于节点关闭流程疑惑的点:

  • 节点关闭都做了哪些检查?
  • kill ES进程来关闭节点是否安全?
  • 普通节点关闭与Master节点关闭有什么区别?
  • 正在写入数据的节点,在关闭的时候,会发生什么?

源码分析

在节点启动过程中,Bootstrap#setup方法中添加了shutdown hook,当进程收到系统SIGTERM(kill命令默认信号 15)或SIGINT(2)信号时,调用Node#close方法,执行节点关闭流程。

https://stackoverflow.com/questions/4042201/how-does-sigint-relate-to-the-other-termination-signals-such-as-sigterm-sigquit

每个模块的Service中都实现了doStop和doClose,用于处理这个模块的正常关闭流程。节点总的关闭流程位于Node#close,在close方法的实现中,先调用一遍各个模块的doStop,然后再次遍历各个模块执行doClose。

    // During concurrent close() calls we want to make sure that all of them return after the node has completed it's shutdown cycle.
    // If not, the hook that is added in Bootstrap#setup() will be useless:
    // close() might not be executed, in case another (for example api) call to close() has already set some lifecycles to stopped.
    // In this case the process will be terminated even if the first call to close() has not finished yet.
    @Override
    public synchronized void close() throws IOException 
        synchronized (lifecycle) 
            if (lifecycle.started()) 
                stop();
            
            if (!lifecycle.moveToClosed()) 
                return;
            
        

        logger.info("closing ...");

        // 关闭各种服务
        List<Closeable> toClose = new ArrayList<>();
        StopWatch stopWatch = new StopWatch("node_close");
        toClose.add(() -> stopWatch.start("node_service"));
        toClose.add(nodeService);
        toClose.add(() -> stopWatch.stop().start("http"));
        toClose.add(injector.getInstance(HttpServerTransport.class));
        toClose.add(() -> stopWatch.stop().start("snapshot_service"));
        toClose.add(injector.getInstance(SnapshotsService.class));
        toClose.add(injector.getInstance(SnapshotShardsService.class));
        toClose.add(injector.getInstance(RepositoriesService.class));
        toClose.add(() -> stopWatch.stop().start("client"));
        Releasables.close(injector.getInstance(Client.class));
        toClose.add(() -> stopWatch.stop().start("indices_cluster"));
        toClose.add(injector.getInstance(IndicesClusterStateService.class));
        toClose.add(() -> stopWatch.stop().start("indices"));
        toClose.add(injector.getInstance(IndicesService.class));
        // close filter/fielddata caches after indices
        toClose.add(injector.getInstance(IndicesStore.class));
        toClose.add(injector.getInstance(PeerRecoverySourceService.class));
        toClose.add(() -> stopWatch.stop().start("cluster"));
        toClose.add(injector.getInstance(ClusterService.class));
        toClose.add(() -> stopWatch.stop().start("node_connections_service"));
        toClose.add(injector.getInstance(NodeConnectionsService.class));
        toClose.add(() -> stopWatch.stop().start("discovery"));
        toClose.add(injector.getInstance(Discovery.class));
        toClose.add(() -> stopWatch.stop().start("monitor"));
        toClose.add(nodeService.getMonitorService());
        toClose.add(() -> stopWatch.stop().start("fsHealth"));
        toClose.add(injector.getInstance(FsHealthService.class));
        toClose.add(() -> stopWatch.stop().start("gateway"));
        toClose.add(injector.getInstance(GatewayService.class));
        toClose.add(() -> stopWatch.stop().start("search"));
        toClose.add(injector.getInstance(SearchService.class));
        toClose.add(() -> stopWatch.stop().start("transport"));
        toClose.add(injector.getInstance(TransportService.class));

        for (LifecycleComponent plugin : pluginLifecycleComponents) 
            toClose.add(() -> stopWatch.stop().start("plugin(" + plugin.getClass().getName() + ")"));
            toClose.add(plugin);
        
        toClose.addAll(pluginsService.filterPlugins(Plugin.class));

        toClose.add(() -> stopWatch.stop().start("script"));
        toClose.add(injector.getInstance(ScriptService.class));

        toClose.add(() -> stopWatch.stop().start("thread_pool"));
        toClose.add(() -> injector.getInstance(ThreadPool.class).shutdown());
        // Don't call shutdownNow here, it might break ongoing operations on Lucene indices.
        // See https://issues.apache.org/jira/browse/LUCENE-7248. We call shutdownNow in
        // awaitClose if the node doesn't finish closing within the specified time.

        toClose.add(() -> stopWatch.stop().start("gateway_meta_state"));
        toClose.add(injector.getInstance(GatewayMetaState.class));

        toClose.add(() -> stopWatch.stop().start("node_environment"));
        toClose.add(injector.getInstance(NodeEnvironment.class));
        toClose.add(stopWatch::stop);

        if (logger.isTraceEnabled()) 
            toClose.add(() -> logger.trace("Close times for each service:\\n", stopWatch.prettyPrint()));
        
        IOUtils.close(toClose);
        logger.info("closed");
    

    private Node stop() 
        if (!lifecycle.moveToStopped()) 
            return this;
        
        logger.info("stopping ...");

        injector.getInstance(ResourceWatcherService.class).close();
        injector.getInstance(HttpServerTransport.class).stop();

        injector.getInstance(SnapshotsService.class).stop();
        injector.getInstance(SnapshotShardsService.class).stop();
        injector.getInstance(RepositoriesService.class).stop();
        // stop any changes happening as a result of cluster state changes
        injector.getInstance(IndicesClusterStateService.class).stop();
        // close discovery early to not react to pings anymore.
        // This can confuse other nodes and delay things - mostly if we're the master and we're running tests.
        injector.getInstance(Discovery.class).stop();
        // we close indices first, so operations won't be allowed on it
        injector.getInstance(ClusterService.class).stop();
        injector.getInstance(NodeConnectionsService.class).stop();
        injector.getInstance(FsHealthService.class).stop();
        nodeService.getMonitorService().stop();
        injector.getInstance(GatewayService.class).stop();
        injector.getInstance(SearchService.class).stop();
        injector.getInstance(TransportService.class).stop();

        pluginLifecycleComponents.forEach(LifecycleComponent::stop);
        // we should stop this last since it waits for resources to get released
        // if we had scroll searchers etc or recovery going on we wait for to finish.
        injector.getInstance(IndicesService.class).stop();
        logger.info("stopped");

        return this;
    

各模块的关闭有一定的顺序关系,以 doStop 为例,按下表所示的 顺序调用各模块 doStop方法。

服务简介
ResourceWatcherService通用资源监视服务
HttpServerTransportHTTP 传输服务,提供REST接口服务
SnapshotsService快照服务
SnapshotShardsService负责启动和停止shard级快照
RepositoriesServiceService responsible for maintaining and providing access to snapshot repositories on nodes
IndicesClusterStateService收到集群状态信息后,处理其中索引相关操作
Discovery集群拓扑管理
ClusterService集群管理服务,主要处理集群任务,发布集群状态
NodeConnectionsService节点连接管理服务
FsHealthServiceRuns periodically and attempts to create a temp file to see if the filesystem is writable. If not then it marks the path as unhealthy
MonitorService提供进程级、系统级、文件系统和 JVM 的监控服务
GatewayService负责集群元数据持久化与恢复
SearchService处理搜索请求
TransportService底层传输服务
plugins当前的所有插件
IndicesService负责创建、删除索引等索引操作

综合来看,关闭顺序大致如下∶

  • 关闭快照和HTTPServer,不再响应用户REST请求。
  • 关闭集群拓扑管理,不再响应ping请求。
  • 关闭网络模块,让节点离线。
  • 执行各个插件的关闭流程。
  • 关闭IndicesService。
    最后才关闭IndicesService,是因为这期间需要等待释放的资源最多,时间最长。

下面着重看一下IndicesService的doStop:

    @Override
    protected void doStop() 
        clusterService.removeApplier(timestampFieldMapperService);
        timestampFieldMapperService.doStop();

        ThreadPool.terminate(danglingIndicesThreadPoolExecutor, 10, TimeUnit.SECONDS);

        ExecutorService indicesStopExecutor =
            Executors.newFixedThreadPool(5, daemonThreadFactory(settings, "indices_shutdown"));

        // Copy indices because we modify it asynchronously in the body of the loop
        final Set<Index> indices = this.indices.values().stream().map(s -> s.index()).collect(Collectors.toSet());
        final CountDownLatch latch = new CountDownLatch(indices.size());
        for (final Index index : indices) 
            indicesStopExecutor.execute(() -> 
                try 
                    removeIndex(index, IndexRemovalReason.SHUTDOWN, "shutdown");
                 finally 
                    latch.countDown();
                
            );
        
        try 
        	// 注意shardsClosedTimeout 这个值是在IndicesService的构造函数中初始化的
        	// this.shardsClosedTimeout = settings.getAsTime(INDICES_SHARDS_CLOSED_TIMEOUT, new TimeValue(1, TimeUnit.DAYS));
        	// 也就是说 CountDownLatch.await默认1天才会继续后面的流程
            if (latch.await(shardsClosedTimeout.seconds(), TimeUnit.SECONDS) == false) 
              logger.warn("Not all shards are closed yet, waited sec - stopping service", shardsClosedTimeout.seconds());
            
         catch (InterruptedException e) 
            // ignore
         finally 
            indicesStopExecutor.shutdown();
        
    

那什么时候会导致removeIndex执行一直无法返回呢?

IndicesService removeIndex(final Index index, final IndexRemovalReason reason, final String extraInfo)=>
IndexService close(final String reason, boolean delete)=>
IndexService removeShard(int shardId, String reason)=>
IndexService removeShard(String reason, ShardId sId, IndexShard indexShard, Store store, IndexEventListener listener)=>
IndexShard close(String reason, boolean flushEngine)=>
Engine flushAndClose()=>

下面具体看一下flushAndClose():

    /**
     * Flush the engine (committing segments to disk and truncating the
     * translog) and close it.
     */
    public void flushAndClose() throws IOException 
        if (isClosed.get() == false) 
            logger.trace("flushAndClose now acquire writeLock");
            // 可以看一下:
            // https://github.com/jiankunking/elasticsearch/blob/master/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java#L857
            // 由于写入操作已经加了读锁,此时写锁会等待,直到写入执行完毕。
            // 因此数据写入过程不会被中断。但是由于网络模块被关闭,客户端的连接会被断开。
            // 客户端应当作为失败处理,虽然ES服务端的写流程还在继续。
            try (ReleasableLock lock = writeLock.acquire()) 
                logger.trace("flushAndClose now acquired writeLock");
                try 
                    logger.debug("flushing shard on close - this might take some time to sync files to disk");
                    try 
                        // TODO we might force a flush in the future since we have the write lock already even though recoveries
                        // are running.
                        flush();
                     catch (AlreadyClosedException ex) 
                        logger.debug("engine already closed - skipping flushAndClose");
                    
                 finally 
                    close(); // double close is not a problem
                
            
        
        awaitPendingClose();
    

总结

kill -15/-2 ElasticSearch是可以正常退出的。

正在写入数据的节点,在关闭的时候,需要等待数据写入完成或者超时。

主节点被关闭时,没有想象中的特殊处理,节点正常执行关闭流程,当TransportService模块被关闭后,集群重新选举新Master。因此,滚动重启期间会有一段时间处于无主状态。

该部分等看集群选举部分的时候,再细看一下。

以上是关于Elasticsearch源码 节点关闭分析的主要内容,如果未能解决你的问题,请参考以下文章

《Elasticsearch 源码解析与优化实战》第4章:节点启动和关闭

《Elasticsearch 源码解析与优化实战》第4章:节点启动和关闭

Elasticsearch源码 节点启动分析

Elasticsearch源码 节点启动分析

Elasticsearch之client源码简要分析

《Elasticsearch 源码解析与优化实战》第13章:Snapshot 模块分析