tars源码解析1--服务端启动

Posted Small leaf

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了tars源码解析1--服务端启动相关的知识,希望对你有一定的参考价值。

tars源码解析1-服务端启动


首先本例子是来自, TarsJava/examples/quickstart-server/
新建一个启动类App.java

package com.qq.tars.quickstart.server;

/**
 * @author : yesheng
 * @Description :
 * @Date : 2020-01-31
 */
public class App 

    public static void main(String[] args) throws Exception 
        com.qq.tars.server.startup.Main.main(args);
    

Jvm参数配置成

-Dconfig=/Users/yesheng/Documents/huya/tars/TarsJava/examples/quickstart-server/src/main/resources/TestApp.HelloJavaServer.config.conf

运行App.main就可以运行起来。

1. 服务如何启动

1.1 AppContext


public interface AppContext 

   /**
    * 获得服务名称
    * @return
    */
   String name();

   /**
    * 
    */
   void stop();

   ServantHomeSkeleton getCapHomeSkeleton(String homeName);

   List<Filter> getFilters(FilterKind kind);

   void addFilter(FilterKind kind, Filter filter);

   void init();

tars应用的主要的几个方法。

public abstract class BaseAppContext implements AppContext 

BaseAppContext是一个抽象类,实现了AppContext。
一般一个抽象类实现了某个接口,一般会用到模板模式,有一个模板(抽象)方法,延迟到子类实现,这样不同的AppContext不同的实现。

BaseAppContext 子类:

  1. XmlAppContext:用xml的方式(servants.xml)来读取到servant
  2. SpringAppContext:spring的方式(spring-servants.xml)读取servant
  3. SpringBootAppContext: 注解的方式读取(TarsServant)
  4. TarsStartLifecycle:spring-cloud的方式。

看到这里如果让我们自定义一个容器,假如说要按照json的方式读取servant,当然只要实现了BaseAppContext,然后实现对应的方法即可。

1.2 初始化Server

  private Server() 
        System.out.println("[TARS] start server construction");
        loadServerConfig();
        initCommunicator();
        startManagerService();
    
  1. loadServerConfig:加载confg服务配置,读取app的应用名称,ip,端口,日志信息等等
  2. initCommunicator:初始化监听者,
  3. startManagerService:开启管理服务,服务上报给管理中心。创建三个线程,NodeHandleThread节点管理心跳管理,StatHandleThread服务器状态管理,PropertyHandleThread埋点信息定时上报。

初始化Server,创建心跳监听器。

1.3 加载Servant

   @Override
    public void init() 
        try 
            loadServants();
            //inject om admin servant
            injectAdminServant();
            initServants();
            appContextStarted();
            System.out.println("[SERVER] The application started successfully.");
         catch (Exception ex) 
            ready = false;
            ex.printStackTrace();
            System.out.println("[SERVER] failed to start the application.");
        
    

loadServants是模板方法,不同的tars实现方式,不同的读取方式。
其他的均是BaseAppContext实现的。

XmlAppContext.loadServants

  protected void loadServants() throws Exception 
        XMLConfigFile cfg = new XMLConfigFile();
        cfg.parse(getClass().getClassLoader().getResource("servants.xml").openStream());
        XMLConfigElement root = cfg.getRootElement();
        ArrayList<XMLConfigElement> elements = root.getChildList();

        loadInitParams(root.getChildListByName("context-param"));

        loadAppContextListeners(elements);

        loadAppServants(elements);
        
        loadDefaultFilter();
        
        loadAppFilters(elements);
        
    

简单解释:

  1. 解析servants.xml
  2. 加载初始化参数
  3. 加载app监听器
  4. 加载Servants
  5. 加载默认的Filter
  6. 加载自定义的Filter
private ServantHomeSkeleton loadServant(XMLConfigElement element) throws ClassNotFoundException, InstantiationException, IllegalAccessException, IOException 
        String homeName = null, homeApiName = null, homeClassName = null, processorClazzName = null,
                codecClazzName = null;
        Class<?> homeApiClazz = null;
        Class<? extends Codec> codecClazz = null;
        Class<? extends Processor> processorClazz = null;
        Object homeClassImpl = null;
        ServantHomeSkeleton skeleton = null;
        int maxLoadLimit = -1;

        ServerConfig serverCfg = ConfigurationManager.getInstance().getServerConfig();

        homeName = element.getStringAttribute("name");
        if (StringUtils.isEmpty(homeName)) 
            throw new RuntimeException("servant name is null.");
        
        homeName = String.format("%s.%s.%s", serverCfg.getApplication(), serverCfg.getServerName(), homeName);
        homeApiName = getChildNodeValue(element, "home-api");
        homeClassName = getChildNodeValue(element, "home-class");
        processorClazzName = getChildNodeValue(element, "home-processor-class");
        codecClazzName = getChildNodeValue(element, "home-codec-class");

        homeApiClazz = Class.forName(homeApiName);
        homeClassImpl = Class.forName(homeClassName).newInstance();
        codecClazz = (Class<? extends Codec>) (StringUtils.isEmpty(codecClazzName) ? null : Class.forName(codecClazzName));
        processorClazz = (Class<? extends Processor>) (StringUtils.isEmpty(processorClazzName) ? null : Class.forName(processorClazzName));

        if (TarsHelper.isServant(homeApiClazz)) 
            String servantName = homeApiClazz.getAnnotation(Servant.class).name();
            if (!StringUtils.isEmpty(servantName) && servantName.matches("^[\\\\w]+\\\\.[\\\\w]+\\\\.[\\\\w]+$")) 
                homeName = servantName;
            
        

        ServantAdapterConfig servantAdapterConfig = serverCfg.getServantAdapterConfMap().get(homeName);

        ServantAdapter ServerAdapter = new ServantAdapter(servantAdapterConfig);
        skeleton = new ServantHomeSkeleton(homeName, homeClassImpl, homeApiClazz, codecClazz, processorClazz, maxLoadLimit);
        skeleton.setAppContext(this);
        ServerAdapter.bind(skeleton);
        servantAdapterMap.put(homeName, ServerAdapter);
        return skeleton;
    
ServantAdapterConfig servantAdapterConfig = serverCfg.getServantAdapterConfMap().get(homeName);

        ServantAdapter ServerAdapter = new ServantAdapter(servantAdapterConfig);
        skeleton = new ServantHomeSkeleton(homeName, homeClassImpl, homeApiClazz, codecClazz, processorClazz, maxLoadLimit);
        skeleton.setAppContext(this);
        ServerAdapter.bind(skeleton);

加载Servant的时候 就进行了NIO Server启动

public void bind(AppService appService) throws IOException 
        this.skeleton = (ServantHomeSkeleton) appService;
        ServerConfig serverCfg = ConfigurationManager.getInstance().getServerConfig();

        boolean keepAlive = true;
        Codec codec = createCodec(serverCfg);
        Processor processor = createProcessor(serverCfg);
        Executor threadPool = ServantThreadPoolManager.get(servantAdapterConfig);

        Endpoint endpoint = this.servantAdapterConfig.getEndpoint();
        if (endpoint.type().equals("tcp")) 
            this.selectorManager = new SelectorManager(Utils.getSelectorPoolSize(), new ServantProtocolFactory(codec), threadPool, processor, keepAlive, "server-tcp-reactor", false);
            this.selectorManager.setTcpNoDelay(serverCfg.isTcpNoDelay());
            this.selectorManager.start();

            System.out.println("[SERVER] server starting at " + endpoint + "...");
            ServerSocketChannel serverChannel = ServerSocketChannel.open();
            serverChannel.socket().bind(new InetSocketAddress(endpoint.host(), endpoint.port()), 1024);
            serverChannel.configureBlocking(false);

            selectorManager.getReactor(0).registerChannel(serverChannel, SelectionKey.OP_ACCEPT);

            System.out.println("[SERVER] server started at " + endpoint + "...");

         else if (endpoint.type().equals("udp")) 

            this.selectorManager = new SelectorManager(1, new ServantProtocolFactory(codec), threadPool, processor, false, "server-udp-reactor", true);
            this.selectorManager.start();

            System.out.println("[SERVER] server starting at " + endpoint + "...");
            DatagramChannel serverChannel = DatagramChannel.open();
            DatagramSocket socket = serverChannel.socket();
            socket.bind(new InetSocketAddress(endpoint.host(), endpoint.port()));
            serverChannel.configureBlocking(false);

            this.selectorManager.getReactor(0).registerChannel(serverChannel, SelectionKey.OP_READ);
            System.out.println("[SERVER] servant started at " + endpoint + "...");
        
    

bind 服务端口,供客户端调用使用

SpringAppContext.loadServants

    protected void loadServants() 
        this.applicationContext = new ClassPathXmlApplicationContext("servants-spring.xml");

        loadAppContextListeners(this.applicationContext);
        loadAppServants(this.applicationContext);
        loadDefaultFilter();
        loadAppFilters(this.applicationContext);
    

简单解释:

  1. 加载servants-spring.xml
  2. 加载app监听器
  3. 加载Servants
  4. 加载默认的Filter
  5. 加载自定义的Filter

同理SpringbootAppContext,加载也类似,无非就是读取Servant及
其他的方式变了而已。

public abstract class BaseAppContext implements AppContext 
    boolean ready = true;

    ConcurrentHashMap<String, ServantHomeSkeleton> skeletonMap = new ConcurrentHashMap<>();
    ConcurrentHashMap<String, Adapter> servantAdapterMap = new ConcurrentHashMap<>();

    HashMap<String, String> contextParams = new HashMap<>();

    Set<AppContextListener> listeners = new HashSet<>(4);

    Map<FilterKind, List<Filter>> filters = new HashMap<>();

加载生成的对象放在BaseContext以供后面使用

1.4 AppContextListener


public interface AppContextListener 
	
    void appContextStarted(AppContextEvent event);

    void appServantStarted(AppServantEvent event);


appContextStarted:服务器上下文启动时调用,这个顺序在appServantStarted之后

  void appContextStarted() 
        for (AppContextListener listener : listeners) 
            listener.appContextStarted(new DefaultAppContextEvent(this));
        
    

容器最终启动是在appContextStarted,也就是说tars服务启动注册成功后,我们就可以来初始化我们的容器了。可以在这个方法中启动spring,springboot项目。

2. 服务如何注册到注册中心

   private Server() 
        System.out.println("[TARS] start server construction");
        loadServerConfig();
        initCommunicator();
        startManagerService();
    
   private void startManagerService() 
        OmServiceMngr.getInstance().initAndStartOmService();
    
      public void initAndStartOmService() 
        Communicator communicator = CommunicatorFactory.getInstance().getCommunicator();
        String app = ConfigurationManager.getInstance().getServerConfig().getApplication();
        String serverName = ConfigurationManager.getInstance().getServerConfig().getServerName();
        String basePath = ConfigurationManager.getInstance().getServerConfig().getBasePath();
        String modualName = ConfigurationManager.getInstance().getServerConfig().getCommunicatorConfig().getModuleName();

        ConfigHelper.getInstance().setConfigInfo(communicator, app, serverName, basePath);
        NodeHelper.getInstance().setNodeInfo(communicator, app, serverName);
        NotifyHelper.getInstance().setNotifyInfo(communicator, app, serverName);
        PropertyReportHelper.getInstance().init(communicator, modualName);
        NodeHelper.getInstance().reportVersion(ClientVersion.getVersion());

        Policy avgPolicy = new CommonPropertyPolicy.Avg();
        Policy maxPolicy = new CommonPropertyPolicy.Max();
        PropertyReportHelper.getInstance().createPropertyReporter(OmConstants.PropWaitTime, avgPolicy, maxPolicy);

        PropertyReportHelper.getInstance().createPropertyReporter(OmConstants.PropHeapUsed, new MemoryHeapUsedAvg());
        PropertyReportHelper.getInstance().createPropertyReporter(OmConstants.PropHeapCommitted, new MemoryHeapCommittedAvg());
        PropertyReportHelper.getInstance().createPropertyReporter(OmConstants.PropHeapMax, new MemoryHeapMaxAvg());
        PropertyReportHelper.getInstance().createPropertyReporter(OmConstants.PropThreadCount, new ThreadNumAvg());
        for (GarbageCollectorMXBean gcMXBean : ManagementFactory.getGarbageCollectorMXBeans()) 
            PropertyReportHelper.getInstance().createPropertyReporter(OmConstants.PropGcCount + gcMXBean.getName(), new GCNumCount(gcMXBean.getName()));
            PropertyReportHelper.getInstance().createPropertyReporter(OmConstants.PropGcTime + gcMXBean.getName(), new GCTimeSum(gcMXBean.getName()));
        

        ServerStatHelper.getInstance().init(communicator);
        TarsTraceZipkinConfiguration.getInstance().init();
        ScheduledServiceMngr.getInstance().start();
    

      public void start() 
        startHandleService();
    
   
    private void startHandleService() 
        Runnable nodeHandler = new Thread(new NodeHandleThread(), "HeartBeat");
        taskExecutorManager.scheduleAtFixedRate(nodeHandler, 0, HEART_BEAT_INTERVAL, TimeUnit.SECONDS);

        int initialDelay = REPORT_INTERVAL + (random.nextInt(30) * 1000);
        Runnable statHandler = new Thread(new StatHandleThread(), "ServerStat");
        taskExecutorManager.scheduleAtFixedRate(statHandler, initialDelay, REPORT_INTERVAL, TimeUnit.MILLISECONDS);

        if (REPORT_INTERVAL < PROPERTY_REPORT_MIN_INTERVAL) 
            REPORT_INTERVAL = PROPERTY_REPORT_MIN_INTERVAL;
        
        initialDelay = REPORT_INTERVAL + (random.nextInt(30) * 1000);
        Runnable propertyHandler = new Thread(new PropertyHandleThread(), "PropertyReport");
        taskExecutorManager.scheduleAtFixedRate(propertyHandler, initialDelay, REPORT_INTERVAL, TimeUnit.MILLISECONDS);
    

读源码读在这里,我发现tars源码很多都喜欢用单例来调用,其实一个写的好的框架不应该源码中这么多的单例,更应该多的是接口,抽象,组合

继续来看吧。 主要是startHandleService
创建了三个定时任务的线程,

  1. HeartBeat:心跳,10秒维持一个心跳,NodeHelper.getInstance().keepAlive();10秒向服务端发送一个心跳。
  2. ServerStat:服务状态
  3. PropertyReport:属性上报,也就是埋点的数据上报。

这里就服务就注册到中心注册了,注册中心如果收不到心跳,服务的状态就会移除掉。这个原理很多注册中心都是这样的,比如eureka,zookeeper均是这样,都是一样的套路来处理的。

3.总结

tars和其他微服务框架,比如dubbo,springcloud一样的套路。

  1. 初始化节点服务:读取服务配置,核心的ip,端口,服务名称等
  2. 注册到注册中心: tars的注册中心,eureka,zookeeper 。定时发送心跳,维持健康。
  3. 初始化服务: 服务的协议接口,版本等等信息
  4. 启动服务

当然中途会提供很多接口可以使用,不用担心万变不离其中,抓住核心去看源码如何实现就可以扩展了。

以上是关于tars源码解析1--服务端启动的主要内容,如果未能解决你的问题,请参考以下文章

zookeeper源码之服务端启动模块

netty服务端启动--ServerBootstrap源码解析

4Nacos 配置中心源码解析之 服务端启动

4Nacos 配置中心源码解析之 服务端启动

4Nacos 配置中心源码解析之 服务端启动

Nacos3# 服务注册与发现服务端启动源码解析