tars源码解析2-客户端启动
Posted Small leaf
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了tars源码解析2-客户端启动相关的知识,希望对你有一定的参考价值。
在还没有认真分析源码之前,我根据以前阅读微服务框架的思路,去分析这样几个问题
- 客户端如何获取服务节点列表
- 客户端如何维持与服务之间的链接,这种链接是长连接还是短连接,使用的方式/框架是什么?
- 客户端如何实现服务端调用的负载均衡,还是说负载均衡由硬件来处理(网络层来处理)。
- 客户端与服务端数据是怎样通信的?序列化协议?
- 客户端与服务端连接出现问题如何处理?
这是我暂时想到的,当然微服务框架之间的处理,远不止这些,这些算是入门读懂一些框架,接下来我就来具体分析。
// 从本地启动的配置
CommunicatorConfig cfg = new CommunicatorConfig();
// 从本地启动的Communcator
Communicator communicator = CommunicatorFactory.getInstance().getCommunicator(cfg);
//warn 若是部署在tars平台启动的, 只能使用下面的构造器获取communcator
//Communicator communicator = CommunicatorFactory.getInstance().getCommunicator();
HelloPrx proxy = communicator.stringToProxy(HelloPrx.class, "TestApp.HelloServer.HelloObj@tcp -h 127.0.0.1 -p 18601 -t 60000");
//同步调用
String ret = proxy.hello(1000, "Hello World");
System.out.println(ret);
- CommunicatorConfig:客户端协调器的配置文件(超时配置呀,等等一些列配置信息)
- Communicator:正在的协调调用者。
- stringToProxy:将我们的Servant封装成代理类,ObjectProxy也就是我们后面获取的Servant客户端都是ObjectProxy,直接看invoke方法即可。
几个关键的类
1. CommunicatorConfig
2. Communicator
3. ServantProxyFactory
public <T> Object getServantProxy(Class<T> clazz, String objName, String setDivision, ServantProxyConfig servantProxyConfig,
LoadBalance loadBalance, ProtocolInvoker<T> protocolInvoker)
String key = setDivision != null ? clazz.getSimpleName() + objName + setDivision : clazz.getSimpleName() + objName;
Object proxy = cache.get(key);
if (proxy == null)
lock.lock();
try
proxy = cache.computeIfAbsent(key, param ->
ObjectProxy<T> objectProxy = communicator.getObjectProxyFactory().getObjectProxy(clazz, objName, setDivision, servantProxyConfig, loadBalance, protocolInvoker);
return createProxy(clazz, objectProxy);
);
finally
lock.unlock();
return proxy;
管理ObjectProxyFactory,如果缓存有就从缓存中取,没有就生产
4.ObjectProxyFactory
public <T> ObjectProxy<T> getObjectProxy(Class<T> api, String objName, String setDivision, ServantProxyConfig servantProxyConfig,
LoadBalance<T> loadBalance, ProtocolInvoker<T> protocolInvoker) throws ClientException
if (servantProxyConfig == null)
servantProxyConfig = createServantProxyConfig(objName, setDivision);
else
servantProxyConfig.setCommunicatorId(communicator.getId());
servantProxyConfig.setModuleName(communicator.getCommunicatorConfig().getModuleName(), communicator.getCommunicatorConfig().isEnableSet(), communicator.getCommunicatorConfig().getSetDivision());
servantProxyConfig.setLocator(communicator.getCommunicatorConfig().getLocator());
if (StringUtils.isNotEmpty(setDivision))
servantProxyConfig.setSetDivision(setDivision);
updateServantEndpoints(servantProxyConfig);
if (loadBalance == null)
loadBalance = createLoadBalance(servantProxyConfig);
if (protocolInvoker == null)
protocolInvoker = createProtocolInvoker(api, servantProxyConfig);
return new ObjectProxy<T>(api, servantProxyConfig, loadBalance, protocolInvoker, communicator);
生产ObjectProxy
- 创建ServantProxyConfig:就是Servant服务的配置信息,每个服务配置可能不同
- updateServantEndpoints
private void updateServantEndpoints(ServantProxyConfig cfg)
CommunicatorConfig communicatorConfig = communicator.getCommunicatorConfig();
String endpoints = null;
if (!ParseTools.hasServerNode(cfg.getObjectName()) && !cfg.isDirectConnection() && !communicatorConfig.getLocator().startsWith(cfg.getSimpleObjectName()))
try
/** query server nodes from registerServer */
if (RegisterManager.getInstance().getHandler() != null)
endpoints = ParseTools.parse(RegisterManager.getInstance().getHandler().query(cfg.getSimpleObjectName()),
cfg.getSimpleObjectName());
else
endpoints = communicator.getQueryHelper().getServerNodes(cfg);
if (StringUtils.isEmpty(endpoints))
throw new CommunicatorConfigException(cfg.getSimpleObjectName(), "servant node is empty on get by registry! communicator id=" + communicator.getId());
ServantCacheManager.getInstance().save(communicator.getId(), cfg.getSimpleObjectName(), endpoints, communicatorConfig.getDataPath());
catch (CommunicatorConfigException e)
/** If it fails, pull it from the local cache file */
endpoints = ServantCacheManager.getInstance().get(communicator.getId(), cfg.getSimpleObjectName(), communicatorConfig.getDataPath());
logger.error(cfg.getSimpleObjectName() + " error occurred on get by registry, use by local cache=" + endpoints + "|" + e.getLocalizedMessage(), e);
if (StringUtils.isEmpty(endpoints))
throw new CommunicatorConfigException(cfg.getSimpleObjectName(), "error occurred on create proxy, servant endpoint is empty! locator =" + communicatorConfig.getLocator() + "|communicator id=" + communicator.getId());
cfg.setObjectName(endpoints);
if (StringUtils.isEmpty(cfg.getObjectName()))
throw new CommunicatorConfigException(cfg.getSimpleObjectName(), "error occurred on create proxy, servant endpoint is empty!");
通过ObjectName判断是有设置了服务器节点,如果有(本地只连接),如果没有那就从tars管理中获取服务器节点。放在ServantCacheManager管理起来。
5.LoadBalance
客户端负载均衡选择器
- ConsistentHashLoadBalance:一致hash选择器
- HashLoadBalance:hash选择器
- RoundRobinLoadBalance: 轮询选择器
- DefaultLoadBalance:默认的选择器,先ConsistentHashLoadBalance,HashLoadBalance,RoundRobinLoadBalance
看源码可知提供了3中负载均衡的选择器。默认的是DefaultLoadBalance(这里面设置的是RoundRobinLoadBalance)
我们来分析RoundRobinLoadBalance
public Invoker<T> select(InvokeContext invocation) throws NoInvokerException
List<Invoker<T>> staticWeightInvokers = staticWeightInvokersCache;
//polling use weights
if (staticWeightInvokers != null && !staticWeightInvokers.isEmpty())
Invoker<T> invoker = staticWeightInvokers.get((staticWeightSequence.getAndIncrement() & Integer.MAX_VALUE) % staticWeightInvokers.size());
if (invoker.isAvailable()) return invoker;
ServantInvokerAliveStat stat = ServantInvokerAliveChecker.get(invoker.getUrl());
if (stat.isAlive() || (stat.getLastRetryTime() + (config.getTryTimeInterval() * 1000)) < System.currentTimeMillis())
//Try to recall after blocking
logger.info("try to use inactive invoker|" + invoker.getUrl().toIdentityString());
stat.setLastRetryTime(System.currentTimeMillis());
return invoker;
//polling without weights
List<Invoker<T>> sortedInvokers = sortedInvokersCache;
if (CollectionUtils.isEmpty(sortedInvokers))
throw new NoInvokerException("no such active connection invoker");
List<Invoker<T>> list = new ArrayList<Invoker<T>>();
for (Invoker<T> invoker : sortedInvokers)
if (!invoker.isAvailable())
/**
* Try to recall after blocking
*/
ServantInvokerAliveStat stat = ServantInvokerAliveChecker.get(invoker.getUrl());
if (stat.isAlive() || (stat.getLastRetryTime() + (config.getTryTimeInterval() * 1000)) < System.currentTimeMillis())
list.add(invoker);
else
list.add(invoker);
//TODO When all is not available. Whether to randomly extract one
if (list.isEmpty())
throw new NoInvokerException(config.getSimpleObjectName() + " try to select active invoker, size=" + sortedInvokers.size() + ", no such active connection invoker");
Invoker<T> invoker = list.get((sequence.getAndIncrement() & Integer.MAX_VALUE) % list.size());
if (!invoker.isAvailable())
//Try to recall after blocking
logger.info("try to use inactive invoker|" + invoker.getUrl().toIdentityString());
ServantInvokerAliveChecker.get(invoker.getUrl()).setLastRetryTime(System.currentTimeMillis());
return invoker;
主要就是从Invoke列表中轮询挑选一个可用的Invoke
6.ProtocolInvoker
public interface ProtocolInvoker<T>
InvokeContext createContext(Object proxy, Method method, Object[] args) throws Exception;
Invoker<T> create(Class<T> api, Url url) throws Exception;
Collection<Invoker<T>> getInvokers();
void refresh();
void destroy();
获取invoke上下文信息,创建invoke,获取invokes
@Override
public Invoker<T> create(Class<T> api, Url url) throws Exception
return new TarsInvoker<T>(servantProxyConfig, api, url, getClients(url));
protected ServantClient[] getClients(Url url) throws IOException
int connections = url.getParameter(Constants.TARS_CLIENT_CONNECTIONS, Constants.default_connections);
ServantClient[] clients = new ServantClient[connections];
for (int i = 0; i < clients.length; i++)
clients[i] = initClient(url);
return clients;
protected ServantClient initClient(Url url)
ServantClient client = null;
try
boolean tcpNoDelay = url.getParameter(Constants.TARS_CLIENT_TCPNODELAY, false);
long connectTimeout = url.getParameter(Constants.TARS_CLIENT_CONNECTTIMEOUT, Constants.default_connect_timeout);
long syncTimeout = url.getParameter(Constants.TARS_CLIENT_SYNCTIMEOUT, Constants.default_sync_timeout);
long asyncTimeout = url.getParameter(Constants.TARS_CLIENT_ASYNCTIMEOUT, Constants.default_async_timeout);
boolean udpMode = url.getParameter(Constants.TARS_CLIENT_UDPMODE, false);
if (this.selectorManager == null)
this.selectorManager = ClientPoolManager.getSelectorManager(this.protocolFactory, this.threadPoolExecutor, true, udpMode, this.servantProxyConfig);
client = new ServantClient(url.getHost(), url.getPort(), this.selectorManager, udpMode);
client.setConnectTimeout(connectTimeout);
client.setSyncTimeout(syncTimeout);
client.setAsyncTimeout(asyncTimeout);
client.setTcpNoDelay(tcpNoDelay);
catch (Throwable e)
throw new ClientException(servantProxyConfig.getSimpleObjectName(), "Fail to create client|" + url.toIdentityString() + "|" + e.getLocalizedMessage(), e);
return client;
创建一个invoke(连接服务端的处理器),这个url就是我们本地设置的或者是从注册中心获取到的。
refresh刷新现在的invokes
到此我们的核心ObjectProxy就创建成功了。
ObjectProxy
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable
String methodName = method.getName();
Class<?>[] parameterTypes = method.getParameterTypes();
InvokeContext context = protocolInvoker.createContext(proxy, method, args);
try
if ("toString".equals(methodName) && parameterTypes.length == 0)
return this.toString();
else if ("hashCode".equals(methodName) && parameterTypes.length == 0)
return this.hashCode();
else if ("equals".equals(methodName) && parameterTypes.length == 1)
return this.equals(args[0]);
else if ("getObjectName".equals(methodName) && parameterTypes.length == 0)
return this.getObjectName();
else if ("getApi".equals(methodName) && parameterTypes.length == 0)
return this.getApi();
else if ("getConfig".equals(methodName) && parameterTypes.length == 0)
return this.getConfig();
else if ("destroy".equals(methodName) && parameterTypes.length == 0)
this.destroy();
return null;
else if ("refresh".equals(methodName) && parameterTypes.length == 0)
this.refresh();
return null;
Invoker invoker = loadBalancer.select(context);
return invoker.invoke(context);
catch (Throwable e)
e.printStackTrace();
if (logger.isDebugEnabled())
logger.debug(servantProxyConfig.getSimpleObjectName() + " error occurred on invoke|" + e.getLocalizedMessage(), e);
if (e instanceof NoInvokerException)
throw new NoConnectionException(servantProxyConfig.getSimpleObjectName(), e.getLocalizedMessage(), e);
throw new ClientException(servantProxyConfig.getSimpleObjectName(), e.getLocalizedMessage(), e);
在执行客户端**Prx 时就是调用了ObjectProxy.invoke方法
- 获取接口的方法名称,方法的函数签名
- 获取InvokeContext
- loadBalancer选择一个可用的invoke(服务端的链接)
- TarsInvoke.invoke
protected Object doInvokeServant(final ServantInvokeContext inv) throws Throwable
final long begin = System.currentTimeMillis();
int ret = Constants.INVOKE_STATUS_SUCC;
try
Method method = getApi().getMethod(inv.getMethodName(), inv.getParameterTypes());
if (inv.isAsync())
invokeWithAsync(method, inv.getArguments(), inv.getAttachments());
return null;
else if (inv.isPromiseFuture())
return invokeWithPromiseFuture(method, inv.getArguments(), inv.getAttachments());// return Future Result
else
TarsServantResponse response = invokeWithSync(method, inv.getArguments(), inv.getAttachments());
ret = response.getRet() == TarsHelper.SERVERSUCCESS ? Constants.INVOKE_STATUS_SUCC : Constants.INVOKE_STATUS_EXEC;
if (response.getRet() != TarsHelper.SERVERSUCCESS)
throw ServerException.makeException(response.getRet());
return response.getResult();
catch (Throwable e)
if (e instanceof TimeoutException)
ret = Constants.INVOKE_STATUS_TIMEOUT;
else if (e instanceof NotConnectedException)
ret = Constants.INVOKE_STATUS_NETCONNECTTIMEOUT;
else
ret = Constants.INVOKE_STATUS_EXEC;
throw e;
finally
if (inv.isNormal())
setAvailable(ServantInvokerAliveChecker.isAlive(getUrl(), config, ret));
InvokeStatHelper.getInstance().addProxyStat(objName)
.addInvokeTimeByClient(config.getMasterName(), config.getSlaveName(), config.getSlaveSetName(), config.getSlaveSetArea(),
config.getSlaveSetID(), inv.getMethodName(), getUrl().getHost(), getUrl().getPort(), ret, System.currentTimeMillis() - begin);
- 通过方法判断是哪种请求方式,同步异步?
- 以同步为例子
private TarsServantResponse invokeWithSync(Method method, Object args[], Map<String, String> context) throws Throwable
ServantClient client = getClient();
TarsServantRequest request = new TarsServantRequest(client.getiosession());
request.setVersion(TarsHelper.VERSION);
request.setMessageType(isHashInvoke(context) ? TarsHelper.MESSAGETYPEHASH : TarsHelper.MESSAGETYPENULL);
request.setPacketType(TarsHelper.NORMAL);
request.setServantName(objName);
request.setFunctionName(method.getName());
request.setApi(super.getApi());
request.setMethodInfo(AnalystManager.getInstance().getMethodMap(super.getApi()).get(method));
request.setMethodParameters(args);
request.setContext(context);
request.setInvokeStatus(InvokeStatus.SYNC_CALL);
TarsServantResponse response = new TarsServantResponse(request.getIoSession());
response.setRequest(request);
response.setRequestId(request.getTicketNumber());
response.setVersion(request.getVersion());
response.setPacketType(request.getPacketType());
response.setMessageType(request.getMessageType());
response.setStatus(request.getStatus());
response.setRequest(request);
response.setCharsetName(request.getCharsetName());
response.setTimeout(request.getTimeout());
response.setContext(request.getContext());
DistributedContext distributedContext = DistributedContextManager.getDistributedContext();
Boolean bDyeing = distributedContext.get(DyeingSwitch.BDYEING);
if (bDyeing != null && bDyeing == true)
request.setMessageType(request.getMessageType() | TarsHelper.MESSAGETYPEDYED);
HashMap<String, String> status = new HashMap<String, String>();
String routeKey = distributedContext.get(DyeingSwitch.DYEINGKEY);
String fileName = distributedContext.get(DyeingSwitch.FILENAME);
status.put(DyeingSwitch.STATUS_DYED_KEY, routeKey == null ? "" : routeKey);
status.put(DyeingSwitch.STATUS_DYED_FILENAME, fileName == null ? "" : fileName);
request.setStatus(status);
FilterChain filterChain = new TarsClientFilterChain(filters, objName, FilterKind.CLIENT, client, InvokeStatus.SYNC_CALL, null);
filterChain.doFilter(request, response);
return response;
- 创建tars的请求和响应体
- 先处理过滤器,这里就和http的filter是一个道理了
- TarsClientFilterChain.doRealInvoke
@Override
protected void doRealInvoke(Request request, Response response) throws Throwable
if (request instanceof TarsServantRequest && target != null)
TarsServantResponse tarsServantResponse = (TarsServantResponse) response;
switch (type)
case SYNC_CALL:
try
TarsServantResponse result = target.invokeWithSync((ServantRequest) request);
BeanAccessor.setBeanValue(tarsServantResponse, "cause", result.getCause());
BeanAccessor.setBeanValue(tarsServantResponse, "result", result.getResult());
BeanAccessor.setBeanValue(tarsServantResponse, "ret", result.getRet());
catch (Exception e)
BeanAccessor.setBeanValue(tarsServantResponse, "cause", e);
throw e;
return;
case ASYNC_CALL:
target.invokeWithAsync((ServantRequest) request, callback);
return;
case FUTURE_CALL:
target.invokeWithFuture((ServantRequest) request, callback);
return;
public <T extends ServantResponse> T invokeWithSync(ServantRequest request) throws IOException
Ticket<T> ticket = null;
T response = null;
try
ensureConnected();
request.setInvokeStatus(InvokeStatus.SYNC_CALL);
ticket = TicketManager.createTicket(request, session, this.syncTimeout);
Session current = session;
current.write(request);
if (!ticket.await(this.syncTimeout, TimeUnit.MILLISECONDS))
if (current != null && current.getStatus() != SessionStatus.CLIENT_CONNECTED)
throw new IOException("Connection reset by peer|" + this.getAddress());
else
throw new TimeoutException("the operation has timeout, " + this.syncTimeout + "ms|" + this.getAddress());
response = ticket.response();
if (response == null)
throw new IOException("the operation is failed.");
return response;
catch (InterruptedException e)
logger.error(e.getLocalizedMessage());
finally
if (ticket != null)
TicketManager.removeTicket(ticket.getTicketNumber());
return response;
通过同步的方式去调用,调用之前先判断是否有链接上。
public void ensureConnected() throws IOException
if (isNotConnected())
reConnect();
protected synchronized void reConnect() throws IOException
if (isNotConnected())
SocketAddress server = new InetSocketAddress(this.host, this.port);
SelectableChannel channel = null;
Session temp = null;
int event;
if (this.udpMode)
channel = DatagramChannel.open();
channel.configureBlocking(false);
temp = new UDPSession(this.selectorManager);
((UDPSession) temp).setBufferSize(bufferSize);
((UDPSession) temp).setTarget(server);
event = SelectionKey.OP_READ;
temp.setStatus(SessionStatus.CLIENT_CONNECTED);
else
channel = SocketChannel.open();
channel.configureBlocking(false);
try
if (this.tc != INVALID_TRAFFIC_CLASS_VALUE)
((SocketChannel) channel).socket().setTrafficClass(this.tc);
catch (Exception ex)
logger.error(ex.getLocalizedMessage());
((SocketChannel) channel).connect(server);
temp = new TCPSession(this.selectorManager);
((TCPSession) temp).setTcpNoDelay(this.tcpNoDelay);
event = SelectionKey.OP_CONNECT;
temp.setChannel(channel);
temp.setKeepAlive(selectorManager.isKeepAlive());
this.selectorManager.nextReactor().registerChannel(channel, event, temp);
if (!this.udpMode)
if (!temp.waitToConnect(this.connectTimeout))
temp.asyncClose();
throw new TimeoutException("connect " + this.connectTimeout + "ms timed out to " + this.getAddress());
if (temp.getStatus() == SessionStatus.NOT_CONNECTED)
temp.asyncClose();
throw new NotConnectedException("connect failed to " + this.getAddress());
else if (temp.getStatus() == SessionStatus.CLOSED)
throw new NotConnectedException("connect failed to " + this.getAddress());
this.session = temp;
没有连接上就通过tcp(Udp)的方式连接上服务。NIo的方式,进行read,write
然后返回Response进行处理。
到此客户请求已经完毕。这里面的详细,以后再分析。
既然我们知道tars的服务启动和客户端调用,那么springboot使用注解的方式呢,只要熟读了spring源码的,那就是非常简单的了,下篇我就分析springboot怎么使用注解的方式了。
以上是关于tars源码解析2-客户端启动的主要内容,如果未能解决你的问题,请参考以下文章