tars源码解析2-客户端启动

Posted Small leaf

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了tars源码解析2-客户端启动相关的知识,希望对你有一定的参考价值。

在还没有认真分析源码之前,我根据以前阅读微服务框架的思路,去分析这样几个问题

  1. 客户端如何获取服务节点列表
  2. 客户端如何维持与服务之间的链接,这种链接是长连接还是短连接,使用的方式/框架是什么?
  3. 客户端如何实现服务端调用的负载均衡,还是说负载均衡由硬件来处理(网络层来处理)。
  4. 客户端与服务端数据是怎样通信的?序列化协议?
  5. 客户端与服务端连接出现问题如何处理?
    这是我暂时想到的,当然微服务框架之间的处理,远不止这些,这些算是入门读懂一些框架,接下来我就来具体分析。

客户端启动源码

    // 从本地启动的配置
        CommunicatorConfig cfg = new CommunicatorConfig();
        // 从本地启动的Communcator
        Communicator communicator = CommunicatorFactory.getInstance().getCommunicator(cfg);
        //warn 若是部署在tars平台启动的, 只能使用下面的构造器获取communcator
        //Communicator communicator = CommunicatorFactory.getInstance().getCommunicator();
        HelloPrx proxy = communicator.stringToProxy(HelloPrx.class, "TestApp.HelloServer.HelloObj@tcp -h 127.0.0.1 -p 18601 -t 60000");
        //同步调用
        String ret = proxy.hello(1000, "Hello World");
        System.out.println(ret);
  1. CommunicatorConfig:客户端协调器的配置文件(超时配置呀,等等一些列配置信息)
  2. Communicator:正在的协调调用者。
  3. stringToProxy:将我们的Servant封装成代理类,ObjectProxy也就是我们后面获取的Servant客户端都是ObjectProxy,直接看invoke方法即可。

几个关键的类

1. CommunicatorConfig

2. Communicator

3. ServantProxyFactory

    public <T> Object getServantProxy(Class<T> clazz, String objName, String setDivision, ServantProxyConfig servantProxyConfig,
                                     LoadBalance loadBalance, ProtocolInvoker<T> protocolInvoker) 
       String key = setDivision != null ? clazz.getSimpleName() + objName + setDivision : clazz.getSimpleName() + objName;
       Object proxy = cache.get(key);
       if (proxy == null) 
           lock.lock();
           try 
               proxy = cache.computeIfAbsent(key, param -> 
                   ObjectProxy<T> objectProxy = communicator.getObjectProxyFactory().getObjectProxy(clazz, objName, setDivision, servantProxyConfig, loadBalance, protocolInvoker);
                   return createProxy(clazz, objectProxy);
               );
            finally 
               lock.unlock();
           
       
       return proxy;
   

管理ObjectProxyFactory,如果缓存有就从缓存中取,没有就生产

4.ObjectProxyFactory

 public <T> ObjectProxy<T> getObjectProxy(Class<T> api, String objName, String setDivision, ServantProxyConfig servantProxyConfig,
                                             LoadBalance<T> loadBalance, ProtocolInvoker<T> protocolInvoker) throws ClientException 
        if (servantProxyConfig == null) 
            servantProxyConfig = createServantProxyConfig(objName, setDivision);
         else 
            servantProxyConfig.setCommunicatorId(communicator.getId());
            servantProxyConfig.setModuleName(communicator.getCommunicatorConfig().getModuleName(), communicator.getCommunicatorConfig().isEnableSet(), communicator.getCommunicatorConfig().getSetDivision());
            servantProxyConfig.setLocator(communicator.getCommunicatorConfig().getLocator());
            if (StringUtils.isNotEmpty(setDivision)) 
                servantProxyConfig.setSetDivision(setDivision);
            
        

        updateServantEndpoints(servantProxyConfig);

        if (loadBalance == null) 
            loadBalance = createLoadBalance(servantProxyConfig);
        

        if (protocolInvoker == null) 
            protocolInvoker = createProtocolInvoker(api, servantProxyConfig);
        
        return new ObjectProxy<T>(api, servantProxyConfig, loadBalance, protocolInvoker, communicator);
    

生产ObjectProxy

  1. 创建ServantProxyConfig:就是Servant服务的配置信息,每个服务配置可能不同
  2. updateServantEndpoints
   private void updateServantEndpoints(ServantProxyConfig cfg) 
       CommunicatorConfig communicatorConfig = communicator.getCommunicatorConfig();

       String endpoints = null;
       if (!ParseTools.hasServerNode(cfg.getObjectName()) && !cfg.isDirectConnection() && !communicatorConfig.getLocator().startsWith(cfg.getSimpleObjectName())) 
           try 
               /** query server nodes from registerServer */
               if (RegisterManager.getInstance().getHandler() != null) 
                   endpoints = ParseTools.parse(RegisterManager.getInstance().getHandler().query(cfg.getSimpleObjectName()),
                           cfg.getSimpleObjectName());
                else 
                   endpoints = communicator.getQueryHelper().getServerNodes(cfg);
               
               if (StringUtils.isEmpty(endpoints)) 
                   throw new CommunicatorConfigException(cfg.getSimpleObjectName(), "servant node is empty on get by registry! communicator id=" + communicator.getId());
               
               ServantCacheManager.getInstance().save(communicator.getId(), cfg.getSimpleObjectName(), endpoints, communicatorConfig.getDataPath());
            catch (CommunicatorConfigException e) 
               /** If it fails, pull it from the local cache  file */
               endpoints = ServantCacheManager.getInstance().get(communicator.getId(), cfg.getSimpleObjectName(), communicatorConfig.getDataPath());
               logger.error(cfg.getSimpleObjectName() + " error occurred on get by registry, use by local cache=" + endpoints + "|" + e.getLocalizedMessage(), e);
           

           if (StringUtils.isEmpty(endpoints)) 
               throw new CommunicatorConfigException(cfg.getSimpleObjectName(), "error occurred on create proxy, servant endpoint is empty! locator =" + communicatorConfig.getLocator() + "|communicator id=" + communicator.getId());
           
           cfg.setObjectName(endpoints);
       

       if (StringUtils.isEmpty(cfg.getObjectName())) 
           throw new CommunicatorConfigException(cfg.getSimpleObjectName(), "error occurred on create proxy, servant endpoint is empty!");
       
   

通过ObjectName判断是有设置了服务器节点,如果有(本地只连接),如果没有那就从tars管理中获取服务器节点。放在ServantCacheManager管理起来。

5.LoadBalance

客户端负载均衡选择器

  1. ConsistentHashLoadBalance:一致hash选择器
  2. HashLoadBalance:hash选择器
  3. RoundRobinLoadBalance: 轮询选择器
  4. DefaultLoadBalance:默认的选择器,先ConsistentHashLoadBalance,HashLoadBalance,RoundRobinLoadBalance

看源码可知提供了3中负载均衡的选择器。默认的是DefaultLoadBalance(这里面设置的是RoundRobinLoadBalance)
我们来分析RoundRobinLoadBalance

 public Invoker<T> select(InvokeContext invocation) throws NoInvokerException 
        List<Invoker<T>> staticWeightInvokers = staticWeightInvokersCache;

        //polling  use weights
        if (staticWeightInvokers != null && !staticWeightInvokers.isEmpty()) 
            Invoker<T> invoker = staticWeightInvokers.get((staticWeightSequence.getAndIncrement() & Integer.MAX_VALUE) % staticWeightInvokers.size());
            if (invoker.isAvailable()) return invoker;

            ServantInvokerAliveStat stat = ServantInvokerAliveChecker.get(invoker.getUrl());
            if (stat.isAlive() || (stat.getLastRetryTime() + (config.getTryTimeInterval() * 1000)) < System.currentTimeMillis()) 
                //Try to recall after blocking
                logger.info("try to use inactive invoker|" + invoker.getUrl().toIdentityString());
                stat.setLastRetryTime(System.currentTimeMillis());
                return invoker;
            
        

        //polling without weights
        List<Invoker<T>> sortedInvokers = sortedInvokersCache;
        if (CollectionUtils.isEmpty(sortedInvokers)) 
            throw new NoInvokerException("no such active connection invoker");
        

        List<Invoker<T>> list = new ArrayList<Invoker<T>>();
        for (Invoker<T> invoker : sortedInvokers) 
            if (!invoker.isAvailable()) 
                /**
                 * Try to recall after blocking
                 */
                ServantInvokerAliveStat stat = ServantInvokerAliveChecker.get(invoker.getUrl());
                if (stat.isAlive() || (stat.getLastRetryTime() + (config.getTryTimeInterval() * 1000)) < System.currentTimeMillis()) 
                    list.add(invoker);
                
             else 
                list.add(invoker);
            
        
        //TODO When all is not available. Whether to randomly extract one
        if (list.isEmpty()) 
            throw new NoInvokerException(config.getSimpleObjectName() + " try to select active invoker, size=" + sortedInvokers.size() + ", no such active connection invoker");
        

        Invoker<T> invoker = list.get((sequence.getAndIncrement() & Integer.MAX_VALUE) % list.size());

        if (!invoker.isAvailable()) 
            //Try to recall after blocking
            logger.info("try to use inactive invoker|" + invoker.getUrl().toIdentityString());
            ServantInvokerAliveChecker.get(invoker.getUrl()).setLastRetryTime(System.currentTimeMillis());
        
        return invoker;
    

主要就是从Invoke列表中轮询挑选一个可用的Invoke

6.ProtocolInvoker


public interface ProtocolInvoker<T> 

    InvokeContext createContext(Object proxy, Method method, Object[] args) throws Exception;

    Invoker<T> create(Class<T> api, Url url) throws Exception;

    Collection<Invoker<T>> getInvokers();

    void refresh();

    void destroy();


获取invoke上下文信息,创建invoke,获取invokes

   @Override
    public Invoker<T> create(Class<T> api, Url url) throws Exception 
        return new TarsInvoker<T>(servantProxyConfig, api, url, getClients(url));
    
 protected ServantClient[] getClients(Url url) throws IOException 
        int connections = url.getParameter(Constants.TARS_CLIENT_CONNECTIONS, Constants.default_connections);
        ServantClient[] clients = new ServantClient[connections];
        for (int i = 0; i < clients.length; i++) 
            clients[i] = initClient(url);
        
        return clients;
    

    protected ServantClient initClient(Url url) 
        ServantClient client = null;
        try 
            boolean tcpNoDelay = url.getParameter(Constants.TARS_CLIENT_TCPNODELAY, false);
            long connectTimeout = url.getParameter(Constants.TARS_CLIENT_CONNECTTIMEOUT, Constants.default_connect_timeout);
            long syncTimeout = url.getParameter(Constants.TARS_CLIENT_SYNCTIMEOUT, Constants.default_sync_timeout);
            long asyncTimeout = url.getParameter(Constants.TARS_CLIENT_ASYNCTIMEOUT, Constants.default_async_timeout);
            boolean udpMode = url.getParameter(Constants.TARS_CLIENT_UDPMODE, false);

            if (this.selectorManager == null) 
                this.selectorManager = ClientPoolManager.getSelectorManager(this.protocolFactory, this.threadPoolExecutor, true, udpMode, this.servantProxyConfig);
            

            client = new ServantClient(url.getHost(), url.getPort(), this.selectorManager, udpMode);
            client.setConnectTimeout(connectTimeout);
            client.setSyncTimeout(syncTimeout);
            client.setAsyncTimeout(asyncTimeout);
            client.setTcpNoDelay(tcpNoDelay);
         catch (Throwable e) 
            throw new ClientException(servantProxyConfig.getSimpleObjectName(), "Fail to create client|" + url.toIdentityString() + "|" + e.getLocalizedMessage(), e);
        
        return client;
    

创建一个invoke(连接服务端的处理器),这个url就是我们本地设置的或者是从注册中心获取到的。

refresh刷新现在的invokes

到此我们的核心ObjectProxy就创建成功了。

ObjectProxy

    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable 
        String methodName = method.getName();
        Class<?>[] parameterTypes = method.getParameterTypes();

        InvokeContext context = protocolInvoker.createContext(proxy, method, args);
        try 
            if ("toString".equals(methodName) && parameterTypes.length == 0) 
                return this.toString();
             else if ("hashCode".equals(methodName) && parameterTypes.length == 0) 
                return this.hashCode();
             else if ("equals".equals(methodName) && parameterTypes.length == 1) 
                return this.equals(args[0]);
             else if ("getObjectName".equals(methodName) && parameterTypes.length == 0) 
                return this.getObjectName();
             else if ("getApi".equals(methodName) && parameterTypes.length == 0) 
                return this.getApi();
             else if ("getConfig".equals(methodName) && parameterTypes.length == 0) 
                return this.getConfig();
             else if ("destroy".equals(methodName) && parameterTypes.length == 0) 
                this.destroy();
                return null;
             else if ("refresh".equals(methodName) && parameterTypes.length == 0) 
                this.refresh();
                return null;
            

            Invoker invoker = loadBalancer.select(context);
            return invoker.invoke(context);
         catch (Throwable e) 
            e.printStackTrace();
            if (logger.isDebugEnabled()) 
                logger.debug(servantProxyConfig.getSimpleObjectName() + " error occurred on invoke|" + e.getLocalizedMessage(), e);
            
            if (e instanceof NoInvokerException) 
                throw new NoConnectionException(servantProxyConfig.getSimpleObjectName(), e.getLocalizedMessage(), e);
            
            throw new ClientException(servantProxyConfig.getSimpleObjectName(), e.getLocalizedMessage(), e);
        
    

在执行客户端**Prx 时就是调用了ObjectProxy.invoke方法

  1. 获取接口的方法名称,方法的函数签名
  2. 获取InvokeContext
  3. loadBalancer选择一个可用的invoke(服务端的链接)
  4. TarsInvoke.invoke
protected Object doInvokeServant(final ServantInvokeContext inv) throws Throwable 
       final long begin = System.currentTimeMillis();
       int ret = Constants.INVOKE_STATUS_SUCC;
       try 
           Method method = getApi().getMethod(inv.getMethodName(), inv.getParameterTypes());
           if (inv.isAsync()) 
               invokeWithAsync(method, inv.getArguments(), inv.getAttachments());
               return null;
            else if (inv.isPromiseFuture()) 
               return invokeWithPromiseFuture(method, inv.getArguments(), inv.getAttachments());// return Future Result
            else 
               TarsServantResponse response = invokeWithSync(method, inv.getArguments(), inv.getAttachments());
               ret = response.getRet() == TarsHelper.SERVERSUCCESS ? Constants.INVOKE_STATUS_SUCC : Constants.INVOKE_STATUS_EXEC;
               if (response.getRet() != TarsHelper.SERVERSUCCESS) 
                   throw ServerException.makeException(response.getRet());
               
               return response.getResult();
           
        catch (Throwable e) 
           if (e instanceof TimeoutException) 
               ret = Constants.INVOKE_STATUS_TIMEOUT;
            else if (e instanceof NotConnectedException) 
               ret = Constants.INVOKE_STATUS_NETCONNECTTIMEOUT;
            else 
               ret = Constants.INVOKE_STATUS_EXEC;
           
           throw e;
        finally 
           if (inv.isNormal()) 
               setAvailable(ServantInvokerAliveChecker.isAlive(getUrl(), config, ret));
               InvokeStatHelper.getInstance().addProxyStat(objName)
                       .addInvokeTimeByClient(config.getMasterName(), config.getSlaveName(), config.getSlaveSetName(), config.getSlaveSetArea(),
                               config.getSlaveSetID(), inv.getMethodName(), getUrl().getHost(), getUrl().getPort(), ret, System.currentTimeMillis() - begin);
           
       
   
  1. 通过方法判断是哪种请求方式,同步异步?
  2. 以同步为例子
   private TarsServantResponse invokeWithSync(Method method, Object args[], Map<String, String> context) throws Throwable 
       ServantClient client = getClient();
       TarsServantRequest request = new TarsServantRequest(client.getiosession());
       request.setVersion(TarsHelper.VERSION);
       request.setMessageType(isHashInvoke(context) ? TarsHelper.MESSAGETYPEHASH : TarsHelper.MESSAGETYPENULL);
       request.setPacketType(TarsHelper.NORMAL);
       request.setServantName(objName);
       request.setFunctionName(method.getName());
       request.setApi(super.getApi());
       request.setMethodInfo(AnalystManager.getInstance().getMethodMap(super.getApi()).get(method));
       request.setMethodParameters(args);
       request.setContext(context);
       request.setInvokeStatus(InvokeStatus.SYNC_CALL);

       TarsServantResponse response = new TarsServantResponse(request.getIoSession());
       response.setRequest(request);
       response.setRequestId(request.getTicketNumber());
       response.setVersion(request.getVersion());
       response.setPacketType(request.getPacketType());
       response.setMessageType(request.getMessageType());
       response.setStatus(request.getStatus());
       response.setRequest(request);
       response.setCharsetName(request.getCharsetName());
       response.setTimeout(request.getTimeout());
       response.setContext(request.getContext());

       DistributedContext distributedContext = DistributedContextManager.getDistributedContext();
       Boolean bDyeing = distributedContext.get(DyeingSwitch.BDYEING);
       if (bDyeing != null && bDyeing == true) 
           request.setMessageType(request.getMessageType() | TarsHelper.MESSAGETYPEDYED);
           HashMap<String, String> status = new HashMap<String, String>();
           String routeKey = distributedContext.get(DyeingSwitch.DYEINGKEY);
           String fileName = distributedContext.get(DyeingSwitch.FILENAME);
           status.put(DyeingSwitch.STATUS_DYED_KEY, routeKey == null ? "" : routeKey);
           status.put(DyeingSwitch.STATUS_DYED_FILENAME, fileName == null ? "" : fileName);
           request.setStatus(status);

       
       FilterChain filterChain = new TarsClientFilterChain(filters, objName, FilterKind.CLIENT, client, InvokeStatus.SYNC_CALL, null);
       filterChain.doFilter(request, response);
       return response;
   
  1. 创建tars的请求和响应体
  2. 先处理过滤器,这里就和http的filter是一个道理了
  3. TarsClientFilterChain.doRealInvoke
   @Override
   protected void doRealInvoke(Request request, Response response) throws Throwable 
       if (request instanceof TarsServantRequest && target != null) 
           TarsServantResponse tarsServantResponse = (TarsServantResponse) response;
           switch (type) 
               case SYNC_CALL:
                   try 
                       TarsServantResponse result = target.invokeWithSync((ServantRequest) request);
                       BeanAccessor.setBeanValue(tarsServantResponse, "cause", result.getCause());
                       BeanAccessor.setBeanValue(tarsServantResponse, "result", result.getResult());
                       BeanAccessor.setBeanValue(tarsServantResponse, "ret", result.getRet());
                    catch (Exception e) 
                       BeanAccessor.setBeanValue(tarsServantResponse, "cause", e);
                       throw e;
                   
                   return;
               case ASYNC_CALL:
                   target.invokeWithAsync((ServantRequest) request, callback);
                   return;
               case FUTURE_CALL:
                   target.invokeWithFuture((ServantRequest) request, callback);
                   return;

           
       
   

public <T extends ServantResponse> T invokeWithSync(ServantRequest request) throws IOException 
       Ticket<T> ticket = null;
       T response = null;
       try 
           ensureConnected();
           request.setInvokeStatus(InvokeStatus.SYNC_CALL);
           ticket = TicketManager.createTicket(request, session, this.syncTimeout);

           Session current = session;
           current.write(request);
           if (!ticket.await(this.syncTimeout, TimeUnit.MILLISECONDS)) 
               if (current != null && current.getStatus() != SessionStatus.CLIENT_CONNECTED) 
                   throw new IOException("Connection reset by peer|" + this.getAddress());
                else 
                   throw new TimeoutException("the operation has timeout, " + this.syncTimeout + "ms|" + this.getAddress());
               
           
           response = ticket.response();
           if (response == null) 
               throw new IOException("the operation is failed.");
           
           return response;
        catch (InterruptedException e) 
           logger.error(e.getLocalizedMessage());
        finally 
           if (ticket != null) 
               TicketManager.removeTicket(ticket.getTicketNumber());
           
       
       return response;
   


通过同步的方式去调用,调用之前先判断是否有链接上。

 public void ensureConnected() throws IOException 
        if (isNotConnected()) 
            reConnect();
        
    
protected synchronized void reConnect() throws IOException 
        if (isNotConnected()) 
            SocketAddress server = new InetSocketAddress(this.host, this.port);
            SelectableChannel channel = null;
            Session temp = null;
            int event;

            if (this.udpMode) 
                channel = DatagramChannel.open();
                channel.configureBlocking(false);

                temp = new UDPSession(this.selectorManager);
                ((UDPSession) temp).setBufferSize(bufferSize);
                ((UDPSession) temp).setTarget(server);
                event = SelectionKey.OP_READ;
                temp.setStatus(SessionStatus.CLIENT_CONNECTED);
             else 
                channel = SocketChannel.open();
                channel.configureBlocking(false);
                try 
                    if (this.tc != INVALID_TRAFFIC_CLASS_VALUE) 
                        ((SocketChannel) channel).socket().setTrafficClass(this.tc);
                    
                 catch (Exception ex) 
                    logger.error(ex.getLocalizedMessage());
                
                ((SocketChannel) channel).connect(server);

                temp = new TCPSession(this.selectorManager);
                ((TCPSession) temp).setTcpNoDelay(this.tcpNoDelay);
                event = SelectionKey.OP_CONNECT;
            

            temp.setChannel(channel);
            temp.setKeepAlive(selectorManager.isKeepAlive());

            this.selectorManager.nextReactor().registerChannel(channel, event, temp);

            if (!this.udpMode) 
                if (!temp.waitToConnect(this.connectTimeout)) 
                    temp.asyncClose();
                    throw new TimeoutException("connect " + this.connectTimeout + "ms timed out to " + this.getAddress());
                

                if (temp.getStatus() == SessionStatus.NOT_CONNECTED) 
                    temp.asyncClose();
                    throw new NotConnectedException("connect failed to " + this.getAddress());
                 else if (temp.getStatus() == SessionStatus.CLOSED) 
                    throw new NotConnectedException("connect failed to " + this.getAddress());
                
            
            this.session = temp;
        
    

没有连接上就通过tcp(Udp)的方式连接上服务。NIo的方式,进行read,write
然后返回Response进行处理。

到此客户请求已经完毕。这里面的详细,以后再分析。

既然我们知道tars的服务启动和客户端调用,那么springboot使用注解的方式呢,只要熟读了spring源码的,那就是非常简单的了,下篇我就分析springboot怎么使用注解的方式了。

以上是关于tars源码解析2-客户端启动的主要内容,如果未能解决你的问题,请参考以下文章

Alluxio 源码完整解析 | 你不知道的开源数据编排系统(下篇)

tars源码解析1--服务端启动

tars源码解析1--服务端启动

spring源码之bean加载(bean解析下篇)

Andfix热修复框架原理及源码解析-下篇

Spring源码分析AOP源码解析(下篇)