Elasticsearch源码 节点启动分析

Posted 衣舞晨风

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Elasticsearch源码 节点启动分析相关的知识,希望对你有一定的参考价值。

带着疑问学源码,第五篇:Elasticsearch 节点启动分析
代码分析基于:https://github.com/jiankunking/elasticsearch
Elasticsearch 7.10.2+

目的

在看源码之前先梳理一下,自己对于节点启动流程疑惑的点:

  • 节点启动都做了哪些检查?
  • 节点启动都初始化了哪些内容?
  • 当节点启动后,数据迁移是在哪里处理?

源码分析

先从启动脚本中找到启动类的入口:org.elasticsearch.bootstrap.Elasticsearch。

下面看一下org.elasticsearch.bootstrap.Elasticsearch,先看一下主入口函数:

    /**
     * Main entry point for starting elasticsearch
     */
    public static void main(final String[] args) throws Exception 
        // 根据jvm.options中读取:es.networkaddress.cache.ttl和es.networkaddress.cache.negative.ttl
        // 并覆盖JVM Security中的networkaddress.cache.ttl与networkaddress.cache.negative.ttl
        overrideDnsCachePolicyProperties();
        /*
         * We want the JVM to think there is a security manager installed so that if internal policy decisions that would be based on the
         * presence of a security manager or lack thereof act as if there is a security manager present (e.g., DNS cache policy). This
         * forces such policies to take effect immediately.
         */
        System.setSecurityManager(new SecurityManager() 

            @Override
            public void checkPermission(Permission perm) 
                // grant all permissions so that we can later set the security manager to the one that we want
            

        );
        LogConfigurator.registerErrorListener();
        final Elasticsearch elasticsearch = new Elasticsearch();
        // 核心检查处理都在main(final String[] args, final Elasticsearch elasticsearch, final Terminal terminal)方法中
        int status = main(args, elasticsearch, Terminal.DEFAULT);
        if (status != ExitCodes.OK) 
            final String basePath = System.getProperty("es.logs.base_path");
            // It's possible to fail before logging has been configured, in which case there's no point
            // suggesting that the user look in the log file.
            if (basePath != null) 
                Terminal.DEFAULT.errorPrintln(
                    "ERROR: Elasticsearch did not exit normally - check the logs at "
                        + basePath
                        + System.getProperty("file.separator")
                        + System.getProperty("es.logs.cluster_name") + ".log"
                );
            
            exit(status);
        
    

main的处理逻辑如下:

Elasticsearch main(final String[] args)=>
Elasticsearch main(final String[] args, final Elasticsearch elasticsearch, final Terminal terminal)=>
Command main(String[] args, Terminal terminal)=>
EnvironmentAwareCommand execute(Terminal terminal, OptionSet options)=>
Elasticsearch execute(Terminal terminal, OptionSet options, Environment env)=>
Bootstrap static void init(
            final boolean foreground,
            final Path pidFile,
            final boolean quiet,
            final Environment initialEnv)=>

下面看一下Bootstrap.init

     /**
     * This method is invoked by @link Elasticsearch#main(String[]) to startup elasticsearch.
     */
    static void init(
            final boolean foreground,
            final Path pidFile,
            final boolean quiet,
            final Environment initialEnv) throws BootstrapException, NodeValidationException, UserException 
        // force the class initializer for BootstrapInfo to run before
        // the security manager is installed
        BootstrapInfo.init();

        INSTANCE = new Bootstrap();

        final SecureSettings keystore = loadSecureSettings(initialEnv);
        final Environment environment = createEnvironment(pidFile, keystore, initialEnv.settings(), initialEnv.configFile());

        // the LogConfigurator will replace System.out and System.err with redirects to our logfile, so we need to capture
        // the stream objects before calling LogConfigurator to be able to close them when appropriate
        final Runnable sysOutCloser = getSysOutCloser();
        final Runnable sysErrorCloser = getSysErrorCloser();

        LogConfigurator.setNodeName(Node.NODE_NAME_SETTING.get(environment.settings()));
        try 
            LogConfigurator.configure(environment);
         catch (IOException e) 
            throw new BootstrapException(e);
        
        if (environment.pidFile() != null) 
            try 
                PidFile.create(environment.pidFile(), true);
             catch (IOException e) 
                throw new BootstrapException(e);
            
        


        try 
            final boolean closeStandardStreams = (foreground == false) || quiet;
            if (closeStandardStreams) 
                final Logger rootLogger = LogManager.getRootLogger();
                final Appender maybeConsoleAppender = Loggers.findAppender(rootLogger, ConsoleAppender.class);
                if (maybeConsoleAppender != null) 
                    Loggers.removeAppender(rootLogger, maybeConsoleAppender);
                
                sysOutCloser.run();
            

            // fail if somebody replaced the lucene jars
            // 检查 Lucene 版本,ES 各个版本对使用的 Lucene 版本是有要求的
            // 在这里检查Lucene版本以防止有人替换不兼容的jar包。
            checkLucene();

            // install the default uncaught exception handler; must be done before security is
            // initialized as we do not want to grant the runtime permission
            // setDefaultUncaughtExceptionHandler
            // 会根据不同的异常,设置不同的exit code
            // InternalError 128
            // OutOfMemoryError 127
            // StackOverflowError 126
            // UnknownError 125
            // IOError 124
            // 其它 1
            Thread.setDefaultUncaughtExceptionHandler(new ElasticsearchUncaughtExceptionHandler());

            // 检查启动es的用户
            // 检查JNA(系统调用)
            // 检查MEMORY_LOCK
            // 检查MaxNumberOfThreads
            // 检查MaxSizeVirtualMemory
            // 检查MaxFileSize
            // init lucene random seed
            // 注册JVM addShutdownHook(Node退出的时候,会用到)
            // 检查jar冲突
            // 初始化JVM Security
            // Node实例添加validateNodeBeforeAcceptingRequests,并初始化Node实例。
            INSTANCE.setup(true, environment);

            try 
                // any secure settings must be read during node construction
                IOUtils.close(keystore);
             catch (IOException e) 
                throw new BootstrapException(e);
            

            // 1、开始启动各子模块。
            // 子模块在Node类中创建、启动
            // 子模块的start方法基本就是初始化内部数据、创建线程池、启动线程池等操作。
            // 2、调用keepAliveThread.start()方法启动keepalive线程,线程本身不做具体的工作。
            // 主线程执行完启动流程后会退出,keepalive线程是唯一的用户线程,
            // 作用是保持进程运行。在Java程序中,至少要有一个用户线程。当用户线程数为零时退出进程。
            INSTANCE.start();

            // We don't close stderr if `--quiet` is passed, because that
            // hides fatal startup errors. For example, if Elasticsearch is
            // running via systemd, the init script only specifies
            // `--quiet`, not `-d`, so we want users to be able to see
            // startup errors via journalctl.
            if (foreground == false) 
                sysErrorCloser.run();
            

         catch (NodeValidationException | RuntimeException e) 
            // disable console logging, so user does not see the exception twice (jvm will show it already)
            final Logger rootLogger = LogManager.getRootLogger();
            final Appender maybeConsoleAppender = Loggers.findAppender(rootLogger, ConsoleAppender.class);
            if (foreground && maybeConsoleAppender != null) 
                Loggers.removeAppender(rootLogger, maybeConsoleAppender);
            
            Logger logger = LogManager.getLogger(Bootstrap.class);
            // HACK, it sucks to do this, but we will run users out of disk space otherwise
            if (e instanceof CreationException) 
                // guice: log the shortened exc to the log file
                ByteArrayOutputStream os = new ByteArrayOutputStream();
                PrintStream ps = null;
                try 
                    ps = new PrintStream(os, false, "UTF-8");
                 catch (UnsupportedEncodingException uee) 
                    assert false;
                    e.addSuppressed(uee);
                
                new StartupException(e).printStackTrace(ps);
                ps.flush();
                try 
                    logger.error("Guice Exception: ", os.toString("UTF-8"));
                 catch (UnsupportedEncodingException uee) 
                    assert false;
                    e.addSuppressed(uee);
                
             else if (e instanceof NodeValidationException) 
                logger.error("node validation exception\\n", e.getMessage());
             else 
                // full exception
                logger.error("Exception", e);
            
            // re-enable it if appropriate, so they can see any logging during the shutdown process
            if (foreground && maybeConsoleAppender != null) 
                Loggers.addAppender(rootLogger, maybeConsoleAppender);
            

            throw e;
        
    

下面看一下Node实例初始化及启动部分:

// 环境变量中携带的信息主要节点的配置信息:
  // dataFiles、configFile、pluginsFile、modulesFile等等
  // https://github.com/jiankunking/elasticsearch/blob/master/server/src/main/java/org/elasticsearch/env/Environment.java
  public Node(Environment environment) 
    this(environment, Collections.emptyList(), true);
  

  /**
   * Constructs a node
   *
   * @param initialEnvironment         the initial environment for this node, which will be added to by plugins
   * @param classpathPlugins           the plugins to be loaded from the classpath
   * @param forbidPrivateIndexSettings whether or not private index settings are forbidden when creating an index; this is used in the
   *                                   test framework for tests that rely on being able to set private settings
   */
  protected Node(
    final Environment initialEnvironment,
    Collection<Class<? extends Plugin>> classpathPlugins,
    boolean forbidPrivateIndexSettings
  ) 
    final List<Closeable> resourcesToClose = new ArrayList<>(); // register everything we need to release in the case of an error
    boolean success = false;
    try 
      Settings tmpSettings = Settings
        .builder()
        .put(initialEnvironment.settings())
        .put(Client.CLIENT_TYPE_SETTING_S.getKey(), CLIENT_TYPE)
        .build();

      final JvmInfo jvmInfo = JvmInfo.jvmInfo();
      logger.info(
        "version[], pid[], build[///], OS[//], JVM[///]",
        Build.CURRENT.getQualifiedVersion(),
        jvmInfo.pid(),
        Build.CURRENT.flavor().displayName(),
        Build.CURRENT.type().displayName(),
        Build.CURRENT.hash(),
        Build.CURRENT.date(),
        Constants.OS_NAME,
        Constants.OS_VERSION,
        Constants.OS_ARCH,
        Constants.JVM_VENDOR,
        Constants.JVM_NAME,
        Constants.JAVA_VERSION,
        Constants.JVM_VERSION
      );
      if (jvmInfo.getBundledJdk()) 
        logger.info(
          "JVM home [], using bundled JDK []",
          System.getProperty("java.home"),
          jvmInfo.getUsingBundledJdk()
        );
       else 
        logger.info("JVM home []", System.getProperty("java.home"));
        deprecationLogger.deprecate(
          "no-jdk",
          "no-jdk distributions that do not bundle a JDK are deprecated and will be removed in a future release"
        );
      
      logger.info(
        "JVM arguments ",
        Arrays.toString(jvmInfo.getInputArguments())
      );
      if (Build.CURRENT.isProductionRelease() == false) 
        logger.warn(
          "version [] is a pre-release version of Elasticsearch and is not suitable for production",
          Build.CURRENT.getQualifiedVersion()
        );
      

      if (logger.isDebugEnabled()) 
        logger.debug(
          "using config [], data [], logs [], plugins []",
          initialEnvironment.configFile(),
          Arrays.toString(initialEnvironment.dataFiles()),
          initialEnvironment.logsFile(),
          initialEnvironment.pluginsFile()
        );
      

      // 创建PluginsService,加载modules目录下的所有模块和plugins目录下的所有插件
      // https://github.com/jiankunking/elasticsearch/blob/master/server/src/main/java/org/elasticsearch/plugins/PluginsService.java
      this.pluginsService =
        new PluginsService(
          tmpSettings,
          initialEnvironment.configFile(),
          initialEnvironment.modulesFile(),
          initialEnvironment.pluginsFile(),
          classpathPlugins
        );
      final Settings settings = pluginsService.updatedSettings();

      final Set<DiscoveryNodeRole> additionalRoles = pluginsService
        .filterPlugins(Plugin.class)
        .stream()
        .map(Plugin::getRoles)
        .flatMap(Set::stream)
        .collect(Collectors.toSet());
      DiscoveryNode.setAdditionalRoles(additionalRoles);

      /*
       * Create the environment based on the finalized view of the settings. This is to ensure that components get the same setting
       * values, no matter they ask for them from.
       */
      this.environment =
        new Environment(settings, initialEnvironment.configFile());
      Environment.assertEquivalent(initialEnvironment, this.environment);
      nodeEnvironment = new NodeEnvironment(tmpSettings, environment);
      logger.info(
        "node name [], node ID [], cluster name [], roles ",
        NODE_NAME_SETTING.get(tmpSettings),
        nodeEnvironment.nodeId(),
        ClusterName.CLUSTER_NAME_SETTING.get(tmpSettings).value(),
        DiscoveryNode
          .getRolesFromSettings(settings)
          .stream()
          .map(DiscoveryNodeRole::roleName)
          .collect(Collectors.toCollection(LinkedHashSet::new))
      );
      resourcesToClose.add(nodeEnvironment);
      localNodeFactory =
        new LocalNodeFactory(settings, nodeEnvironment.nodeId());

      // 调用各插件的getExecutorBuilders,获取ExecutorBuilder
      final List<ExecutorBuilder<?>> executorBuilders = pluginsService.getExecutorBuilders(
        settings
      );
      // 创建线程池
      final ThreadPool threadPool = new ThreadPool(
        settings,
        executorBuilders.toArray(new ExecutorBuilder[0])
      );
      resourcesToClose.add(
        () -> ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS)
      );

      final ResourceWatcherService resourceWatcherService = new ResourceWatcherService(
        settings,
        threadPool
      );
      resourcesToClose.add(resourceWatcherService);
      // adds the context to the DeprecationLogger so that it does not need to be injected everywhere
      HeaderWarning.setThreadContext(threadPool.getThreadContext());
      resourcesToClose.add(
        () -> HeaderWarning.removeThreadContext(threadPool.getThreadContext())
      );

      final List<Setting<?>> additionalSettings = new ArrayList<>();
      // register the node.data, node.ingest, node.master, node.remote_cluster_client settings here so we can mark them private
      additionalSettings.add(NODE_DATA_SETTING);
      additionalSettings.add(NODE_INGEST_SETTING);
      additionalSettings.add(NODE_MASTER_SETTING);
      additionalSettings.add(NODE_REMOTE_CLUSTER_CLIENT);
      additionalSettings.addAll(pluginsService.getPluginSettings());
      final List<String> additionalSettingsFilter = new ArrayList<>(
        pluginsService.getPluginSettingsFilter()
      );
      for (final ExecutorBuilder<?> builder : threadPool.builders()) 
        additionalSettings.addAll(builder.getRegisteredSettings());
      
      // 创建NodeClient
      client = new NodeClient(settings, threadPool);

      // 创建各种***Service对象和各种模***Module对象
      final ScriptModule scriptModule = new ScriptModule(
        settings,
        pluginsService.filterPlugins(ScriptPlugin.class)
      );
      final ScriptService scriptService = newScriptService(
        settings,
        scriptModule.engines,
        scriptModule.contexts
      );
      AnalysisModule analysisModule = new AnalysisModule(
        this.environment,
        pluginsService.filterPlugins(AnalysisPlugin.class)
      );
      // this is as early as we can validate settings at this point. we already pass them to ScriptModule as well as ThreadPool
      // so we might be late here already

      final Set<SettingUpgrader<?>> settingsUpgraders = pluginsService
        .filterPlugins(Plugin.class)
        .stream()
        .map(Plugin::getSettingUpgraders)
        .flatMap(List::stream)
        .collect(Collectors.toSet());

      final SettingsModule settingsModule = new SettingsModule(
        settings,
        additionalSettings,
        additionalSettingsFilter,
        settingsUpgraders
      );
      scriptModule.registerClusterSettingsListeners(
        scriptService,
        settingsModule.getClusterSettings()
      );
      final NetworkService networkService = new NetworkService(
        getCustomNameResolvers(
          pluginsService.filterPlugins(DiscoveryPlugin.class)
        )
      );

      List<ClusterPlugin> clusterPlugins = pluginsService.filterPlugins(
        ClusterPlugin.class
      );
      final ClusterService clusterService = new ClusterService(
        settings,
        settingsModule.getClusterSettings(),
        threadPool
      );
      clusterService.addStateApplier(scriptService);
      resourcesToClose.add(clusterService);
      final Set<Setting<?>> consistentSettings = settingsModule.getConsistentSettings();
      if (consistentSettings.isEmpty() == false) 
        clusterService.addLocalNodeMasterListener(
          new ConsistentSettingsService(
            settings,
            clusterService,
            consistentSettings
          )
            .newHashPublisher()
        );
      
      final IngestService ingestService = new IngestService(
        clusterService,
        threadPool,
        this.environment,
        scriptService,
        analysisModule.getAnalysisRegistry(),
        pluginsService.filterPlugins(IngestPlugin.class),
        client
      );
      final SetOnce<RepositoriesService> repositoriesServiceReference = new SetOnce<>();
      final ClusterInfoService clusterInfoService = newClusterInfoService(
        settings,
        clusterService,
        threadPool,
        client
      );
      final UsageService usageService = new UsageService();

      ModulesBuilder modules = new ModulesBuilder();
      final MonitorService monitorService = new MonitorService(
        settings,
        nodeEnvironment,
        threadPool
      );
      final FsHealthService fsHealthService = new FsHealthService(
        settings,
        clusterService.getClusterSettings(),
        threadPool,
        nodeEnvironment
      );
      final SetOnce<RerouteService> rerouteServiceReference = new SetOnce<>();
      final InternalSnapshotsInfoService snapshotsInfoService = new InternalSnapshotsInfoService(
        settings,
        clusterService,
        repositoriesServiceReference::get,
        rerouteServiceReference::get
      );
      final ClusterModule clusterModule = new ClusterModule(
        settings,
        clusterService,
        clusterPlugins,
        clusterInfoService,
        snapshotsInfoService,
        threadPool.getThreadContext()
      );
      modules.add(clusterModule);
      IndicesModule indicesModule = new IndicesModule(
        pluginsService.filterPlugins(MapperPlugin.class)
      );
      modules.add(indicesModule);

      SearchModule searchModule = new SearchModule(
        settings,
        pluginsService.filterPlugins(SearchPlugin.class)
      );
      List<BreakerSettings> pluginCircuitBreakers = pluginsService
        .filterPlugins(CircuitBreakerPlugin.class)
        .stream()
        .map(plugin -> plugin.getCircuitBreaker(settings))
        .collect(Collectors.toList());
      final CircuitBreakerService circuitBreakerService = createCircuitBreakerService(
        settingsModule.getSettings(),
        pluginCircuitBreakers,
        settingsModule.getClusterSettings()
      );
      pluginsService
        .filterPlugins(CircuitBreakerPlugin.class)
        .forEach(
          plugin -> 
            CircuitBreaker breaker = circuitBreakerService.getBreaker(
              plugin.getCircuitBreaker(settings).getName()
            );
            plugin.setCircuitBreaker(breaker);
          
        );
      resourcesToClose.add(circuitBreakerService);
      modules.add(new GatewayModule());

      PageCacheRecycler pageCacheRecycler = createPageCacheRecycler(settings);
      BigArrays bigArrays = createBigArrays(
        pageCacheRecycler,
        circuitBreakerService
      );
      modules.add(settingsModule);
      List<NamedWriteableRegistry.Entry> namedWriteables = Stream
        .of(
          NetworkModule.getNamedWriteables().stream(),
          IndicesModule.getNamedWriteables().stream(),
          searchModule.getNamedWriteables().stream(),
          pluginsService
            .filterPlugins(Plugin.class)
            .stream()
            .flatMap(p -> p.getNamedWriteables().stream()),
          ClusterModule.getNamedWriteables().stream()
        )
        .flatMap(Function.identity())
        .collect(Collectors.toList());
      final NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(
        namedWriteables
      );
      NamedXContentRegistry xContentRegistry = new NamedXContentRegistry(
        Stream
          .of(
            NetworkModule.getNamedXContents().stream(),
            IndicesModule.getNamedXContents().stream(),
            searchModule.getNamedXContents().stream(),
            pluginsService
              .filterPlugins(Plugin.class)
              .stream()
              .flatMap(p -> p.getNamedXContent().stream()),
            ClusterModule.getNamedXWriteables().stream()
          )
          .flatMap(Function.identity())
          .collect(toList())
      );
      final MetaStateService metaStateService = new MetaStateService(
        nodeEnvironment,
        xContentRegistry
      );
      final PersistedClusterStateService lucenePersistedStateFactory = new PersistedClusterStateService(
        nodeEnvironment,
        xContentRegistry,
        bigArrays,
        clusterService.getClusterSettings(),
        threadPool::relativeTimeInMillis
      );

      // collect engine factory providers from plugins
      final Collection<EnginePlugin> enginePlugins = pluginsService.filterPlugins(
        EnginePlugin.class
      );
      final Collection<Function<IndexSettings, Optional<EngineFactory>>> engineFactoryProviders = enginePlugins
        .stream()
        .map(
          plugin ->
            (Function<IndexSettings, Optional<EngineFactory>>) plugin::getEngineFactory
        )
        .collect(Collectors.toList());

      final Map<String, IndexStorePlugin.DirectoryFactory> indexStoreFactories = pluginsService
        .filterPlugins(IndexStorePlugin.class)
        .stream()
        .map(IndexStorePlugin::getDirectoryFactories)
        .flatMap(m -> m.entrySet().stream())
        .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

      final Map<String, IndexStorePlugin.RecoveryStateFactory> recoveryStateFactories = pluginsService
        .filterPlugins(IndexStorePlugin.class)
        .stream()
        .map(IndexStorePlugin::getRecoveryStateFactories)
        .flatMap(m -> m.entrySet().stream())
        .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

      final List<IndexStorePlugin.IndexFoldersDeletionListener> indexFoldersDeletionListeners = pluginsService
        .filterPlugins(IndexStorePlugin.class)
        .stream()
        .map(IndexStorePlugin::getIndexFoldersDeletionListeners)
        .flatMap(List::stream)
        .collect(Collectors.toList());

      final Map<String, IndexStorePlugin.SnapshotCommitSupplier> snapshotCommitSuppliers = pluginsService
        .filterPlugins(IndexStorePlugin.class)
        .stream()
        .map(IndexStorePlugin::getSnapshotCommitSuppliers)
        .flatMap(m -> m.entrySet().stream())
        .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

      final Map<String, Collection<SystemIndexDescriptor>> systemIndexDescriptorMap = pluginsService
        .filterPlugins(SystemIndexPlugin.class)
        .stream()
        .collect(
          Collectors.toUnmodifiableMap(
            plugin -> plugin.getClass().getSimpleName(),
            plugin -> plugin.getSystemIndexDescriptors(settings)
          )
        );
      final SystemIndices systemIndices = new SystemIndices(
        systemIndexDescriptorMap
      );

      final SystemIndexManager systemIndexManager = new SystemIndexManager(
        systemIndices,
        client
      );
      clusterService.addListener(systemIndexManager);

      final RerouteService rerouteService = new BatchedRerouteService(
        clusterService,
        clusterModule.getAllocationService()::reroute
      );
      rerouteServiceReference.set(rerouteService);
      clusterService.setRerouteService(rerouteService);

      final IndicesService indicesService = new IndicesService(
        settings,
        pluginsService,
        nodeEnvironment,
        xContentRegistry,
        analysisModule.getAnalysisRegistry(),
        clusterModule.getIndexNameExpressionResolver(),
        indicesModule.getMapperRegistry(),
        namedWriteableRegistry,
        threadPool,
        settingsModule.getIndexScopedSettings(),
        circuitBreakerService,
        bigArrays,
        scriptService,
        clusterService,
        client,
        metaStateService,
        engineFactoryProviders,
        indexStoreFactories,
        searchModule.getValuesSourceRegistry(),
        recoveryStateFactories,
        indexFoldersDeletionListeners,
        snapshotCommitSuppliers
      );

      final AliasValidator aliasValidator = new AliasValidator();

      final ShardLimitValidator shardLimitValidator = new ShardLimitValidator(
        settings,
        clusterService
      );
      final MetadataCreateIndexService metadataCreateIndexService = new MetadataCreateIndexService(
        settings,
        clusterService,
        indicesService,
        clusterModule.getAllocationService(),
        aliasValidator,
        shardLimitValidator,
        environment,
        settingsModule.getIndexScopedSettings(),
        threadPool,
        xContentRegistry,
        systemIndices,
        forbidPrivateIndexSettings
      );
      pluginsService
        .filterPlugins(Plugin.class)
        .forEach(
          p ->
            p
              .getAdditionalIndexSettingProviders()
              .forEach(
                metadataCreateIndexService::addAdditionalIndexSettingProvider
              )
        );

      final MetadataCreateDataStreamService metadataCreateDataStreamService = new MetadataCreateDataStreamService(
        threadPool,
        clusterService,
        metadataCreateIndexService
      );

      Collection<Object> pluginComponents = pluginsService
        .filterPlugins(Plugin.class)
        .stream()
        .flatMap(
          p ->
            p
              .createComponents(
                client,
                clusterService,
                threadPool,
                resourceWatcherService,
                scriptService,
                xContentRegistry,
                environment,
                nodeEnvironment,
                namedWriteableRegistry,
                clusterModule.getIndexNameExpressionResolver(),
                repositoriesServiceReference::get
              )
              .stream()
        )
        .collect(Collectors.toList());

      ActionModule actionModule = new ActionModule(
        settings,
        clusterModule.getIndexNameExpressionResolver(),
        settingsModule.getIndexScopedSettings(),
        settingsModule.getClusterSettings(),
        settingsModule.getSettingsFilter(),
        threadPool,
        pluginsService.filterPlugins(ActionPlugin.class),
        client,
        circuitBreakerService,
        usageService,
        systemIndices,
        getRestCompatibleFunction()
      );
      modules.add(actionModule);

      final RestController restController = actionModule.getRestController();
      final NetworkModule networkModule = new NetworkModule(
        settings,
        pluginsService.filterPlugins(NetworkPlugin.class),
        threadPool,
        bigArrays,
        pageCacheRecycler,
        circuitBreakerService,
        namedWriteableRegistry,
        xContentRegistry,
        networkService,
        restController,
        clusterService.getClusterSettings()
      );
      Collection<UnaryOperator<Map<String, IndexTemplateMetadata>>> indexTemplateMetadataUpgraders = pluginsService
        .filterPlugins(Plugin.class)
        .stream()
        .map(Plugin::getIndexTemplateMetadataUpgrader)
        .collect(Collectors.toList());
      final MetadataUpgrader metadataUpgrader = new MetadataUpgrader(
        indexTemplateMetadataUpgraders
      );
      final MetadataIndexUpgradeService metadataIndexUpgradeService = new MetadataIndexUpgradeService(
        settings,
        xContentRegistry,
        indicesModule.getMapperRegistry(),
        settingsModule.getIndexScopedSettings(),
        systemIndices,
        scriptService
      );
      if (DiscoveryNode.isMasterNode(settings)) 
        clusterService.addListener(
          new SystemIndexMetadataUpgradeService(systemIndices, clusterService)
        );
      
      new TemplateUpgradeService(
        client,
        clusterService,
        threadPool,
        indexTemplateMetadataUpgraders
      );
      final Transport transport = networkModule.getTransportSupplier().get();
      Set<String> taskHeaders = Stream
        .concat(
          pluginsService
            .filterPlugins(ActionPlugin.class)
            .stream()
            .flatMap(p -> p.getTaskHeaders().stream()),
          Stream.of(Task.X_OPAQUE_ID)
        )
        .collect(Collectors.toSet());
      final TransportService transportService = newTransportService(
        settings,
        transport,
        threadPool,
        networkModule.getTransportInterceptor(),
        localNodeFactory,
        settingsModule.getClusterSettings(),
        taskHeaders
      );
      final GatewayMetaState gatewayMetaState = new GatewayMetaState();
      final ResponseCollectorService responseCollectorService = new ResponseCollectorService(
        clusterService
      );
      final SearchTransportService searchTransportService = new SearchTransportService(
        transportService,
        client,
        SearchExecutionStatsCollector.makeWrapper(responseCollectorService)
      );
      final HttpServerTransport httpServerTransport = newHttpTransport(
        networkModule
      );
      final IndexingPressure indexingLimits = new IndexingPressure(settings);

      final RecoverySettings recoverySettings = new RecoverySettings(
        settings,
        settingsModule.getClusterSettings()
      );
      RepositoriesModule repositoriesModule = new RepositoriesModule(
        this.environment,
        pluginsService.filterPlugins(RepositoryPlugin.class),
        transportService,
        clusterService,
        bigArrays,
        xContentRegistry,
        recoverySettings
      );
      RepositoriesService repositoryService = repositoriesModule.getRepositoryService();
      repositoriesServiceReference.set(repositoryService);
      SnapshotsService snapshotsService = new SnapshotsService(
        settings,
        clusterService,
        clusterModule.getIndexNameExpressionResolver(),
        repositoryService,
        transportService,
        actionModule.getActionFilters()
      );
      SnapshotShardsService snapshotShardsService = new SnapshotShardsService(
        settings,
        clusterService,
        repositoryService,
        transportService,
        indicesService
      );
      RestoreService restoreService = new RestoreService(
        clusterService,
        repositoryService,
        clusterModule.getAllocationService(),
        metadataCreateIndexService,
        metadataIndexUpgradeService,
        clusterService.getClusterSettings(),
        shardLimitValidator
      );

      final DiskThresholdMonitor diskThresholdMonitor = new DiskThresholdMonitor(
        settings,
        clusterService::state,
        clusterService.getClusterSettings(),
        client,
        threadPool::relativeTimeInMillis,
        rerouteService
      );
      clusterInfoService.addListener(diskThresholdMonitor::onNewInfo);

      final DiscoveryModule discoveryModule = new DiscoveryModule(
        settings,
        transportService,
        namedWriteableRegistry,
        networkService,
        clusterService.getMasterService(),
        clusterService.getClusterApplierService(),
        clusterService.getClusterSettings(),
        pluginsService.filterPlugins(DiscoveryPlugin.class),
        clusterModule.getAllocationService(),
        environment.configFile(),
        gatewayMetaState,
        rerouteService,
        fsHealthService
      );
      this.nodeService =
        new NodeService(
          settings,
          threadPool,
          monitorService,
          discoveryModule.getDiscovery(),
          transportService,
          indicesService,
          pluginsService,
          circuitBreakerService,
          scriptService,
          httpServerTransport,
          ingestService,
          clusterService,
          settingsModule.getSettingsFilter(),
          responseCollectorService,
          searchTransportService,
          indexingLimits,
          searchModule.getValuesSourceRegistry().getUsageService()
        );

      final SearchService searchService = newSearchService(
        clusterService,
        indicesService,
        threadPool,
        scriptService,
        bigArrays,
        searchModule.getFetchPhase(),
        responseCollectorService,
        circuitBreakerService
      );

      final List<PersistentTasksExecutor<?>> tasksExecutors = pluginsService
        .filterPlugins(PersistentTaskPlugin.class)
        .stream()
        .map(
          p ->
            p.getPersistentTasksExecutor(
              clusterService,
              threadPool,
              client,
              settingsModule,
              clusterModule.getIndexNameExpressionResolver()
            )
        )
        .flatMap(List::stream)
        .collect(toList());

      final PersistentTasksExecutorRegistry registry = new PersistentTasksExecutorRegistry(
        tasksExecutors
      );
      final PersistentTasksClusterService persistentTasksClusterService = new PersistentTasksClusterService(
        settings,
        registry,
        clusterService,
        threadPool
      );
      resourcesToClose.add(persistentTasksClusterService);
      final PersistentTasksService persistentTasksService = new PersistentTasksService(
        clusterService,
        threadPool,
        client
      );
      // 绑定各种服务模块的实例
      modules.add(
        b -> 
          b.bind(Node.class).toInstance(this);
          b.bind(NodeService.class).toInstance(nodeService);
          b.bind(NamedXContentRegistry.class).toInstance(xContentRegistry);
          b.bind(PluginsService.class).toInstance(pluginsService);
          b.bind(Client.class).toInstance(client);
          b.bind(NodeClient.class).toInstance(client);
          b.bind(Environment.class).toInstance(this.environment);
          b.bind(ThreadPool.class).toInstance(threadPool);
          b.bind(NodeEnvironment.class).toInstance(nodeEnvironment);
          b
            .bind(ResourceWatcherService.class)
            .toInstance(resourceWatcherService);
          b.bind(CircuitBreakerService.class).toInstance(circuitBreakerService);
          b.bind(BigArrays.class).toInstance(bigArrays);
          b.bind(PageCacheRecycler.class).toInstance(pageCacheRecycler);
          b.bind(ScriptService.class).toInstance(scriptService);
          b
            .bind(AnalysisRegistry.class)
            .toInstance(analysisModule.getAnalysisRegistry());
          b.bind(IngestService.class).toInstance(ingestService);
          b.bind(IndexingPressure.class).toInstance(indexingLimits);
          b.bind(UsageService.class).toInstance(usageService);
          b
            .bind(AggregationUsageService.class)
            .toInstance(
              searchModule.getValuesSourceRegistry().getUsageService()
            );
          b
            .bind(NamedWriteableRegistry.class)
            .toInstance(namedWriteableRegistry);
          b.bind(MetadataUpgrader.class).toInstance(metadataUpgrader);
          b.bind(MetaStateService.class).toInstance(metaStateService);
          b
            .bind(PersistedClusterStateService.class)
            .toInstance(lucenePersistedStateFactory);
          b.bind(IndicesService.class).toInstance(indicesService);
          b.bind(AliasValidator.class).toInstance(aliasValidator);
          b
            .bind(MetadataCreateIndexService.class)
            .toInstance(metadataCreateIndexService);
          b
            .bind(MetadataCreateDataStreamService.class)
            .toInstance(metadataCreateDataStreamService);
          b.bind(SearchService.class).toInstance(searchService);
          b
            .bind(SearchTransportService.class)
            .toInstance(searchTransportService);
          b
            .bind(SearchPhaseController.class)
            .toInstance(
              new SearchPhaseController(
                namedWriteableRegistry,
                searchService::aggReduceContextBuilder
              )
            );
          b.bind(Transport.class).toInstance(transport);
          b.bind(TransportService.class).toInstance(transportService);
          b.bind(NetworkService.class).toInstance(networkService);
          b
            .bind(UpdateHelper.class)
            .toInstance(new UpdateHelper(scriptService));
          b
            .bind(MetadataIndexUpgradeService.class)
            .toInstance(metadataIndexUpgradeService);
          b.bind(ClusterInfoService.class).toInstance(clusterInfoService);
          b.bind(SnapshotsInfoService.class).toInstance(snapshotsInfoService);
          b.bind(GatewayMetaState.class).toInstance(gatewayMetaState);
          b.bind(Discovery.class).toInstance(discoveryModule.getDiscovery());
          
            processRecoverySettings(
              settingsModule.getClusterSettings(),
              recoverySettings
            );
            b
              .bind(PeerRecoverySourceService.class)
              .toInstance(
                new PeerRecoverySourceService(
                  transportService,
                  indicesService,
                  recoverySettings
                )
              );
            b
              .bind(PeerRecoveryTargetService.class)
              .toInstance(
                new PeerRecoveryTargetService(
                  threadPool,
                  transportService,
                  recoverySettings,
                  clusterService
                )
              );
          
          b.bind(HttpServerTransport.class).toInstance(httpServerTransport);
          pluginComponents
            .stream()
            .forEach(p -> b.bind((Class) p.getClass()).toInstance(p));
          b
            .bind(PersistentTasksService.class)
            .toInstance(persistentTasksService);
          b
            .bind(PersistentTasksClusterService.class)
            .toInstance(persistentTasksClusterService);
          b.bind(PersistentTasksExecutorRegistry.class).toInstance(registry);
          b.bind(RepositoriesService.class).toInstance(repositoryService);
          b.bind(SnapshotsService.class).toInstance(snapshotsService);
          b.bind(SnapshotShardsService.class).toInstance(snapshotShardsService);
          b.bind(RestoreService.class).toInstance(restoreService);
          b.bind(RerouteService.class).toInstance(rerouteService);
          b.bind(ShardLimitValidator.class).toInstance(shardLimitValidator);
          b.bind(FsHealthService.class).toInstance(fsHealthService);
          b.bind(SystemIndices.class).toInstance(systemIndices);
        
      );
      injector = modules.createInjector();

      // We allocate copies of existing shards by looking for a viable copy of the shard in the cluster and assigning the shard there.
      // The search for viable copies is triggered by an allocation attempt (i.e. a reroute) and is performed asynchronously. When it
      // completes we trigger another reroute to try the allocation again. This means there is a circular dependency: the allocation
      // service needs access to the existing shards allocators (e.g. the GatewayAllocator) which need to be able to trigger a
      // reroute, which needs to call into the allocation service. We close the loop here:
      clusterModule.setExistingShardsAllocators(
        injector.getInstance(GatewayAllocator.class)
      );

      List<LifecycleComponent> pluginLifecycleComponents = pluginComponents
        .stream()
        .filter(p -> p instanceof LifecycleComponent)
        .map(p -> (LifecycleComponent) p)
        .collect(Collectors.toList());
      resourcesToClose.addAll(pluginLifecycleComponents);
      resourcesToClose.add(
        injector.getInstance(PeerRecoverySourceService.class)
      );
      this.pluginLifecycleComponents =
        Collections.unmodifiableList(pluginLifecycleComponents);
      client.initialize(
        injector.getInstance(new Key<Map<ActionType, TransportAction>>() ),
        transportService.getTaskManager(),
        () -> clusterService.localNode().getId(),
        transportService.getLocalNodeConnection(),
        transportService.getRemoteClusterService(),
        namedWriteableRegistry
      );
      this.namedWriteableRegistry = namedWriteableRegistry;

      logger.debug("initializing HTTP handlers ...");
      actionModule.initRestHandlers(() -> clusterService.state().nodes());
      logger.info("initialized");

      success = true;
     catch (IOException ex) 
      throw new ElasticsearchException("failed to bind service", ex);
     finally 
      if (!success) 
        IOUtils.closeWhileHandlingException(resourcesToClose);
      
    
  

  /**
   * Start the node. If the node is already started, this method is no-op.
   */
  public Node start() throws NodeValidationException 
    // 将local node的state设为STARTED状态
    if (!lifecycle.moveToStarted()) 
      return this;
    

    logger.info("starting ...");
    // plugins start
    pluginLifecycleComponents.forEach(LifecycleComponent::start);

    injector.getInstance(MappingUpdatedAction.class).setClient(client);
    injector.getInstance(IndicesService.class).start();
    injector.getInstance(IndicesClusterStateService.class).start();
    injector.getInstance(SnapshotsService.class).start();
    injector.getInstance(SnapshotShardsService.class).start();
    injector.getInstance(RepositoriesService.class).start();
    injector.getInstance(SearchService.class).start();
    injector.getInstance(FsHealthService.class).start();
    nodeService.getMonitorService().start();

    final ClusterService clusterService = injector.getInstance(
      ClusterService.class
    );

    final NodeConnectionsService nodeConnectionsService = injector.getInstance(
      NodeConnectionsService.class
    );
    nodeConnectionsService.start();
    clusterService.setNodeConnectionsService(nodeConnectionsService);

    injector.getInstance(GatewayService.class).start();
    Discovery discovery = injector.getInstance(Discovery.class);
    clusterService
      .getMasterService()
      .setClusterStatePublisher(discovery::publish);

    // Start the transport service now so the publish address will be added to the local disco node in ClusterService
    TransportService transportService = injector.getInstance(
      TransportService.class
    );
    transportService
      .getTaskManager()
      .setTaskResultsService(injector.getInstance(TaskResultsService.class));
    transportService
      .getTaskManager()
      .setTaskCancellationService(
        new TaskCancellationService(transportService)
      );
    transportService.start();
    assert localNodeFactory.getNode() != null;
    assert transportService
      .getLocalNode()
      .equals(
        localNodeFactory.getNode()
      ) : "transportService has a different local node than the factory provided";
    injector.getInstance(PeerRecoverySourceService.class).start();

    // Load (and maybe upgrade) the metadata stored on disk
    final GatewayMetaState gatewayMetaState = injector.getInstance(
      GatewayMetaState.class
    );
    gatewayMetaState.start(
      settings(),
      transportService,
      clusterService,
      injector.getInstance(MetaStateService.class),
      injector.getInstance(MetadataIndexUpgradeService.class),
      injector.getInstance(MetadataUpgrader.class),
      injector.getInstance(PersistedClusterStateService.class)
    );
    if (Assertions.ENABLED) 
      try 
        assert injector
          .getInstance(MetaStateService.class)
          .loadFullState()
          .v1()
          .isEmpty();
        final NodeMetadata nodeMetadata = NodeMetadata.FORMAT.loadLatestState(
          logger,
          NamedXContentRegistry.EMPTY,
          nodeEnvironment.nodeDataPaths()
        );
        assert nodeMetadata != null;
        assert nodeMetadata.nodeVersion().equals(Version.CURRENT);
        assert nodeMetadata.nodeId().equals(localNodeFactory.getNode().getId());
       catch (IOException e) 
        assert false : e;
      
    
    // we load the global state here (the persistent part of the cluster state stored on disk) to
    // pass it to the bootstrap checks to allow plugins to enforce certain preconditions based on the recovered state.
    final Metadata onDiskMetadata = gatewayMetaState
      .getPersistedState()
      .getLastAcceptedState()
      .metadata();
    assert onDiskMetadata != null : "metadata is null but shouldn't"; // this is never null
    validateNodeBeforeAcceptingRequests(
      new BootstrapContext(environment, onDiskMetadata),
      transportService.boundAddress(),
      pluginsService
        .filterPlugins(Plugin.class)
        .stream()
        .flatMap(p -> p.getBootstrapChecks().stream())
        .collect(Collectors.toList())
    );

    clusterService.addStateApplier(transportService.getTaskManager());
    // start after transport service so the local disco is known
    discovery.start(); // start before cluster service so that it can set initial state on ClusterApplierService
    clusterService.start();
    assert clusterService
      .localNode()
      .equals(
        localNodeFactory.getNode()
      ) : "clusterService has a different local node than the factory provided";
    // start accepting incoming requests.
    // when the transport layer starts up it will block any incoming requests until this method is called.
    transportService.acceptIncomingRequests();
    // 一会着重看一下选举部分
    discovery.startInitialJoin();
    final TimeValue initialStateTimeout = INITIAL_STATE_TIMEOUT_SETTING.get(
      settings()
    );
    configureNodeAndClusterIdStateListener(clusterService);

    if (initialStateTimeout.millis() > 0) 
      final ThreadPool thread = injector.getInstance(ThreadPool.class);
      ClusterState clusterState = clusterService.state();
      ClusterStateObserver observer = new ClusterStateObserver(
        clusterState,
        clusterService,
        null,
        logger,
        thread.getThreadContext()
      );

      if (clusterState.nodes().getMasterNodeId() == null) 
        logger.debug(
          "waiting to join the cluster. timeout []",
          initialStateTimeout
        );
        final CountDownLatch latch = new CountDownLatch(1);
        // Wait for the next cluster state which satisfies statePredicate
        observer.waitForNextChange(
          new ClusterStateObserver.Listener() 
            @Override
            public void onNewClusterState(ClusterState state) 
              latch.countDown();
            

            @Override
            public void onClusterServiceClose() 
              latch.countDown();
            

            @Override
            public void onTimeout(TimeValue timeout) 
              logger.warn(
                "timed out while waiting for initial discovery state - timeout: ",
                initialStateTimeout
              );
              latch.countDown();
            
          ,
          state -> state.nodes().getMasterNodeId() != null,
          initialStateTimeout
        );

        try 
          latch.await();
         catch (InterruptedException e) 
          throw new ElasticsearchTimeoutException(
            "Interrupted while waiting for initial discovery state"
          );
        
      
    

    injector.getInstance(HttpServerTransport.class).start();

    if (WRITE_PORTS_FILE_SETTING.get(settings())) 
      TransportService transport = injector.getInstance(TransportService.class);
      writePortsFile("transport", transport.boundAddress());
      HttpServerTransport http = injector.getInstance(
        HttpServerTransport.class
      );
      writePortsFile("http", http.boundAddress());
    

    logger.info("started");

    pluginsService
      .filterPlugins(ClusterPlugin.class)
      .forEach(ClusterPlugin::onNodeStarted);

    return this;
  

总结

Node启动过程这种做的检查、初始化、加入集群都梳理清楚了,但节点加入集群后同步数据,在该部分没有找到。

这个后续在看集群管理的时候,再找一下这个问题的答案。

参考

https://www.modb.pro/db/33681
https://easyice.cn/archives/332

以上是关于Elasticsearch源码 节点启动分析的主要内容,如果未能解决你的问题,请参考以下文章

《Elasticsearch 源码解析与优化实战》第4章:节点启动和关闭

《Elasticsearch 源码解析与优化实战》第4章:节点启动和关闭

Elasticsearch源码 节点关闭分析

Elasticsearch源码 节点关闭分析

ElasticSearch 启动时加载 Analyzer 源码分析

《Elasticsearch 源码解析与优化实战》第12章:allocation模型分析