Tars | 第2篇 TarsJava SpingBoot启动与负载均衡源码初探 #yyds干货盘点#

Posted 多氯环己烷

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Tars | 第2篇 TarsJava SpingBoot启动与负载均衡源码初探 #yyds干货盘点#相关的知识,希望对你有一定的参考价值。

@[TOC](TarsJava SpingBoot启动与负载均衡源码初探)


前言

通过源码分析可以得出这样一个负载均衡的源码结构图(基于TarsJava SpringBoot):

@EnableTarsServer注解:表明这是一个Tars服务;

  • @Import(TarsServerConfiguration.class):引入Tars服务相关配置文件;
    • Communcator:通信器;
    • getServantProxyFactory():获取代理工厂管理者;
    • getObjectProxyFactory():获取对象代理工厂;
      • createLoadBalance():创建客户端负载均衡调用器;
      • select():选择负载均衡调用器(有四种模式可以选择);
        • invoker:调用器;
        • invoke():具体的执行方法;
          • doInvokeServant():最底层的执行方法;
      • refresh():更新负载均衡调用器;
      • createProtocolInvoker():创建协议调用器;

注:在说明注解时,第一点加粗为注解中文含义,第二点为一般加在哪身上,缩进或代码块为示例,如:

@注解

  • 中文含义
  • 加在哪
  • 其他……
    • 语句示例
      //代码示例

1. Tars客户端启动

一个基础知识,SpringBoot应用入口在主启动类,Tars SpringBoot的主启动类是这样的:

可以发现它与普通SpringBoot应用的区别在于多了个@EnableTarsServer注解;

@EnableTarsServer

  • Tars服务;
  • 用在主启动类上;
  • 表名该服务是一个Tars服务,启用Tars功能;

我们从examples/tars-spring-boot-client的主启动类App.java(@EnableTarsServer注解)点进去,可以看到SpringBoot在启动时帮我们做了哪些Tars相关的配置:

@EnableTarsServer注解源码:

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(TarsServerConfiguration.class)
public @interface EnableTarsServer 

可以知道他帮我们引入了Tars服务配置类TarsServerConfiguration.class,我们点进去:

@Configuration
public class TarsServerConfiguration 

    private final Server server = Server.getInstance();

    @Bean
    public Server server() 
        return this.server;
    

    @Bean
    // 从通信器工厂注入通信器Communcator
    public Communicator communicator() 
        return CommunicatorFactory.getInstance().getCommunicator();
    

    @Bean
    //通信器后置处理器
    public CommunicatorBeanPostProcessor communicatorBeanPostProcessor(Communicator communicator) 
        return new CommunicatorBeanPostProcessor(communicator);
    

    @Bean
    //注入配置帮助器
    public ConfigHelper configHelper() 
        return ConfigHelper.getInstance();
    

    @Bean
    //注入Servlet容器定制器
    public ServletContainerCustomizer servletContainerCustomizer() 
        return new ServletContainerCustomizer();
    

    @Bean
    //Tars服务器启动生命周期
    public TarsServerStartLifecycle applicationStartLifecycle(Server server) 
        return new TarsServerStartLifecycle(server);
    

在这些容器中,可以看出最重要的是通信器Communicator,里面定义了代理方式、配置文件、负载均衡选择器等重要属性,下面我们来分析这个容器

2. Communicator通信器

通过源码分析,我们可以知道这个容器里有通信器相关初始化initCommunicator()、关闭shutdown()、获取容器idgetId()等基础方法,此外,有几个比较关键的方法:

  1. getCommunicatorConfig:获取客户端协调器的配置文件。该配置文件里做了一些超时、线程数等相关配置;

  1. getServantProxyFactory:获取代理工厂管理者。管理者的主要作用是管理ObjectProxyFactory,如果缓存有就从缓存中取,没有就生产;

    public <T> Object getServantProxy(Class<T> clazz, String objName, String setDivision, ServantProxyConfig servantProxyConfig,
                                     LoadBalance loadBalance, ProtocolInvoker<T> protocolInvoker) 
       //获取管理者的键
       String key = setDivision != null ? clazz.getSimpleName() + objName + setDivision : clazz.getSimpleName() + objName;
       //通过键从缓存中获取管理者的值
       Object proxy = cache.get(key);
       if (proxy == null) 
           lock.lock();
           try 
               proxy = cache.get(key);
               if (proxy == null) 
                   //创建管理者
                   ObjectProxy<T> objectProxy = communicator.getObjectProxyFactory().getObjectProxy(
                       clazz, objName, setDivision, servantProxyConfig, loadBalance, protocolInvoker);
                   //将管理者放进缓存
                   cache.put(key, createProxy(clazz, objectProxy));
                   proxy = cache.get(key);
               
            finally 
               lock.unlock();
           
       
       return proxy;
    
  2. getObjectProxyFactory:获取对象代理工厂。该工厂的作用是生产对象代理ObjectProxy,包括创建Servant服务的配置信息与更新服务端点等:

    //生产对象代理ObjectProxy
    public <T> ObjectProxy<T> getObjectProxy(Class<T> api, String objName, String setDivision, ServantProxyConfig servantProxyConfig,
                                            LoadBalance<T> loadBalance, ProtocolInvoker<T> protocolInvoker) throws ClientException 
       //如果容器里没有服务代理相关配置,则生成默认配置;如果容器里有服务代理相关配置,说明用户自定义了用户配置了服务代理,则读取用户配置文件进行自定义配置(SpringBoot的核心思想之一)
       if (servantProxyConfig == null) 
           servantProxyConfig = createServantProxyConfig(objName, setDivision);
        else 
           servantProxyConfig.setCommunicatorId(communicator.getId());
           servantProxyConfig.setModuleName(communicator.getCommunicatorConfig().getModuleName(), communicator.getCommunicatorConfig().isEnableSet(), communicator.getCommunicatorConfig().getSetDivision());
           servantProxyConfig.setLocator(communicator.getCommunicatorConfig().getLocator());
           addSetDivisionInfo(servantProxyConfig, setDivision);
           servantProxyConfig.setRefreshInterval(communicator.getCommunicatorConfig().getRefreshEndpointInterval());
           servantProxyConfig.setReportInterval(communicator.getCommunicatorConfig().getReportInterval());
       
    
       //更新服务端点
       updateServantEndpoints(servantProxyConfig);
    
       //【重要】创建客户端负载均衡调用器
       if (loadBalance == null) 
           loadBalance = createLoadBalance(servantProxyConfig);
       
    
       //创建协议调用器
       if (protocolInvoker == null) 
           protocolInvoker = createProtocolInvoker(api, servantProxyConfig);
       
       return new ObjectProxy<T>(api, servantProxyConfig, loadBalance, protocolInvoker, communicator);
    
    
    ……
    
       //创建Servant服务的配置信息
       private ServantProxyConfig createServantProxyConfig(String objName, String setDivision) throws CommunicatorConfigException 
       ……
    
    
    ……
    
       //更新服务端点:通过ObjectName判断是有设置了服务器节点,如果有(本地只连接),如果没有那就从tars管理中获取服务器节点。放在ServantCacheManager管理起来。
       private void updateServantEndpoints(ServantProxyConfig cfg) 
       CommunicatorConfig communicatorConfig = communicator.getCommunicatorConfig();
       ……
    

通过上面的客户端启动流程源码分析,我们找到第一个核心点: 客户端的负载均衡调用器LoadBalance

*除了创建了一个负载均衡调用器LoadBalance,还创建了一个协议调用器protocolInvoker,该协议调用器里分别对同步与异步调用方法、Tars与Http协议请求处理、以及过滤器等相关配置,但我们的重点不在这,下面将着重分析LoadBalance

3. 客户端的负载均衡调用器LoadBalance

我们点进去查看原有负载均衡逻辑,发现这是一个接口,里面定义了两个方法,都是与负载均衡调用器相关的:

public interface LoadBalance<T> 

    /**
     * 选择负载均衡调用器
     * @param 调用的上下文
     * @return
     * @throws 无负载均衡调用器 - 异常
     */
    Invoker<T> select(InvokeContext invokeContext) throws NoInvokerException;

    /**
     * 刷新本地负载均衡调用器
     * @param 负载均衡调用器
     */
    void refresh(Collection<Invoker<T>> invokers);

我们Ctrl+H一下即可发现该接口有四个实现类:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-KJyHqhl5-1627523692062)(https://lexiangla.com/assets/3963f704ee0411ebbe94aee286d18512 "负载均衡调用器实现类")]

分别是:

  • ConsistentHashLoadBalance:一致hash选择器;
  • HashLoadBalance:hash选择器;
  • RoundRobinLoadBalance: 轮询选择器;
  • DefaultLoadBalance:默认的选择器(由源码可知先ConsistentHashLoadBalance,HashLoadBalance,RoundRobinLoadBalance);

需要注意实现类有四个,选择器有三个。这四个选择器都是一个构造方法+实现接口的两个方法,比较相近。下面我们只分析RoundRobinLoadBalance的select方法:

@Override
public Invoker<T> select(InvokeContext invocation) throws NoInvokerException 

    //静态权重缓存器列表
    List<Invoker<T>> staticWeightInvokers = staticWeightInvokersCache;

    //使用权重轮询
    if (staticWeightInvokers != null && !staticWeightInvokers.isEmpty()) 
        //【体现轮询】根据index获取一个调用器,规则是:获取“静态权重顺序递增值”的绝对值后对“静态权重缓存器数”取余?
        Invoker<T> invoker = staticWeightInvokers.get((staticWeightSequence.getAndIncrement() & Integer.MAX_VALUE) % staticWeightInvokers.size());
        //如果调用器存活则直接返回
        if (invoker.isAvailable()) return invoker;

        //判断存活:先根据调用器的url获取“调用器活动状态”,判断:状态的“上次重试时间”+“尝试重启时间间隔” < “系统当前时间”,存活则将系统当前时间设置为“上次重启时间”
        ServantInvokerAliveStat stat = ServantInvokerAliveChecker.get(invoker.getUrl());
        if (stat.isAlive() || (stat.getLastRetryTime() + (config.getTryTimeInterval() * 1000)) < System.currentTimeMillis()) 
            logger.info("try to use inactive invoker|" + invoker.getUrl().toIdentityString());
            stat.setLastRetryTime(System.currentTimeMillis());
            return invoker;
        
    

    //无权重轮询,抛出异常
    List<Invoker<T>> sortedInvokers = sortedInvokersCache;
    if (CollectionUtils.isEmpty(sortedInvokers)) 
        throw new NoInvokerException("no such active connection invoker");
    

    List<Invoker<T>> list = new ArrayList<Invoker<T>>();
    for (Invoker<T> invoker : sortedInvokers) 
        //如果调用器挂了
        if (!invoker.isAvailable()) 

            //尝试救回调用器:先根据调用器的url获取“调用器活动状态”,判断:状态的“上次重试时间”+“尝试重启时间间隔” < “系统当前时间”,存活则加入到list中,挂了就不加入
            ServantInvokerAliveStat stat = ServantInvokerAliveChecker.get(invoker.getUrl());
            if (stat.isAlive() || (stat.getLastRetryTime() + (config.getTryTimeInterval() * 1000)) < System.currentTimeMillis()) 
                list.add(invoker);
            
         else 
            //调用器存活则将调用器添加到list里
            list.add(invoker);
        
    
    //TODO When all is not available. Whether to randomly extract one
    if (list.isEmpty()) 
        throw new NoInvokerException(config.getSimpleObjectName() + " try to select active invoker, size=" + sortedInvokers.size() + ", no such active connection invoker");
    

    //随机获取一个调用器?
    Invoker<T> invoker = list.get((sequence.getAndIncrement() & Integer.MAX_VALUE) % list.size());

    //如果调用器不存活,则将当前系统时间设置为该调用器的上次重启时间
    if (!invoker.isAvailable()) 
        //Try to recall after blocking
        logger.info("try to use inactive invoker|" + invoker.getUrl().toIdentityString());
        ServantInvokerAliveChecker.get(invoker.getUrl()).setLastRetryTime(System.currentTimeMillis());
    
    return invoker;

可以看出select方法重点还是在于“怎样”找到一个负载均衡调用器,只不过实现的方法不同,有的采用轮询的方法、有的根据hash值,而我们关注的是给负载均衡方法做扩展(增添路由规则),因此这里也不是重点。但为我们指明了一个方向,就是上面源码里反复提到的invoker调用器(invoker老眼熟了,SpringBoot里的controller参数处理里也有它)。

我们来看看Tars里的invoker,它也是一个接口,只有一个实现类,

public interface Invoker<T> 

    //获取uil
    Url getUrl();

    //获取api
    Class<T> getApi();

    //判断是否存活
    boolean isAvailable();

    //执行方法
    Object invoke(InvokeContext context) throws Throwable;

    //销毁方法
    void destroy();

通过对这几个实现类的源码阅读,我们发现invoke方法就是对doInvokeServant底层方法进行层层封装。

通过对TarsInvoker的源码阅读,我们还可以知道TarsInvoker有四个属性config、api、url、clients,对应前面提到的getXXX对应方法;还可以设置是否存活,对应前文对是否存活的判断。在doInvokeServant里最核心的操作流程是try里面的语句:

public class TarsInvoker<T> extends ServantInvoker<T> 

    final List<Filter> filters;

    public TarsInvoker(ServantProxyConfig config, Class<T> api, Url url, ServantClient[] clients) 
        super(config, api, url, clients);
        filters = AppContextManager.getInstance().getAppContext() == null ? null : AppContextManager.getInstance().getAppContext().getFilters(FilterKind.CLIENT);
    

    @Override
    public void setAvailable(boolean available) 
        super.setAvailable(available);
    

@Override
    protected Object doInvokeServant(final ServantInvokeContext inv) throws Throwable 
        final long begin = System.currentTimeMillis();
        int ret = Constants.INVOKE_STATUS_SUCC;
        try 
            //根据api获取将要执行的方法
            Method method = getApi().getMethod(inv.getMethodName(), inv.getParameterTypes());

            //如果是异步调用
            if (inv.isAsync()) 
                //执行异步方法
                invokeWithAsync(method, inv.getArguments(), inv.getAttachments());
                return null;
            //如果是承诺未来???
             else if (inv.isPromiseFuture()) 
                return invokeWithPromiseFuture(method, inv.getArguments(), inv.getAttachments());// return Future Result
             else 
                //执行同步方法
                TarsServantResponse response = invokeWithSync(method, inv.getArguments(), inv.getAttachments());
                ret = response.getRet() == TarsHelper.SERVERSUCCESS ? Constants.INVOKE_STATUS_SUCC : Constants.INVOKE_STATUS_EXEC;
                if (response.getRet() != TarsHelper.SERVERSUCCESS) 
                    throw ServerException.makeException(response.getRet(), response.getRemark());
                
                return response.getResult();
            
         catch (Throwable e) 
            if (e instanceof TimeoutException) 
                ret = Constants.INVOKE_STATUS_TIMEOUT;
             else if (e instanceof NotConnectedException) 
                ret = Constants.INVOKE_STATUS_NETCONNECTTIMEOUT;
             else 
                ret = Constants.INVOKE_STATUS_EXEC;
            
            throw e;
         finally 
            if (inv.isNormal()) 
                setAvailable(ServantInvokerAliveChecker.isAlive(getUrl(), config, ret));
                InvokeStatHelper.getInstance().addProxyStat(objName)
                        .addInvokeTimeByClient(config.getMasterName(), config.getSlaveName(), config.getSlaveSetName(), config.getSlaveSetArea(),
                                config.getSlaveSetID(), inv.getMethodName(), getUrl().getHost(), getUrl().getPort(), ret, System.currentTimeMillis() - begin);
            
        
    

    ……

而try语句里主要做的是执行的调用方法(异步、同步、承诺未来),由于我们要扩展的路由功能与调用方法无关,这里就不深入分析了。

由此我们可以分析得出负载均衡设计的底层结构图:

@EnableTarsServer注解:表明这是一个Tars服务;

  • @Import(TarsServerConfiguration.class):引入Tars服务相关配置文件;
    • Communcator:通信器;
    • getServantProxyFactory():获取代理工厂管理者;
    • getObjectProxyFactory():获取对象代理工厂;
      • createLoadBalance():创建客户端负载均衡调用器;
      • select():选择负载均衡调用器(有四种模式可以选择);
        • invoker:调用器;
        • invoke():具体的执行方法;
          • doInvokeServant():最底层的执行方法;
      • refresh():更新负载均衡调用器;
      • createProtocolInvoker():创建协议调用器;

最后

::: hljs-center

新人制作,如有错误,欢迎指出,感激不尽!

:::

::: hljs-center

欢迎关注公众号,会分享一些更日常的东西!

:::

::: hljs-center

如需转载,请标注出处!

:::

::: hljs-center

:::

以上是关于Tars | 第2篇 TarsJava SpingBoot启动与负载均衡源码初探 #yyds干货盘点#的主要内容,如果未能解决你的问题,请参考以下文章

tars源码解析1--服务端启动

tars源码解析1--服务端启动

Tars | 第6篇 基于TarsGo Subset路由规则的Java JDK实现方式(下)#yyds干货盘点#

Tars--------企业级入门实践篇

Sping Boot入门到实战之入门篇:Spring Boot属性配置

Sping Boot入门到实战之实战篇:实现自定义Spring Boot Starter——阿里云消息队列服务Starter