tars源码解析1--服务端启动
Posted Small leaf
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了tars源码解析1--服务端启动相关的知识,希望对你有一定的参考价值。
tars源码解析1-服务端启动
首先本例子是来自, TarsJava/examples/quickstart-server/
新建一个启动类App.java
package com.qq.tars.quickstart.server;
/**
* @author : yesheng
* @Description :
* @Date : 2020-01-31
*/
public class App
public static void main(String[] args) throws Exception
com.qq.tars.server.startup.Main.main(args);
Jvm参数配置成
-Dconfig=/Users/yesheng/Documents/huya/tars/TarsJava/examples/quickstart-server/src/main/resources/TestApp.HelloJavaServer.config.conf
运行App.main就可以运行起来。
1. 服务如何启动
1.1 AppContext
public interface AppContext
/**
* 获得服务名称
* @return
*/
String name();
/**
*
*/
void stop();
ServantHomeSkeleton getCapHomeSkeleton(String homeName);
List<Filter> getFilters(FilterKind kind);
void addFilter(FilterKind kind, Filter filter);
void init();
tars应用的主要的几个方法。
public abstract class BaseAppContext implements AppContext
BaseAppContext是一个抽象类,实现了AppContext。
一般一个抽象类实现了某个接口,一般会用到模板模式,有一个模板(抽象)方法,延迟到子类实现,这样不同的AppContext不同的实现。
BaseAppContext 子类:
- XmlAppContext:用xml的方式(servants.xml)来读取到servant
- SpringAppContext:spring的方式(spring-servants.xml)读取servant
- SpringBootAppContext: 注解的方式读取(TarsServant)
- TarsStartLifecycle:spring-cloud的方式。
看到这里如果让我们自定义一个容器,假如说要按照json的方式读取servant,当然只要实现了BaseAppContext,然后实现对应的方法即可。
1.2 初始化Server
private Server()
System.out.println("[TARS] start server construction");
loadServerConfig();
initCommunicator();
startManagerService();
- loadServerConfig:加载confg服务配置,读取app的应用名称,ip,端口,日志信息等等
- initCommunicator:初始化监听者,
- startManagerService:开启管理服务,服务上报给管理中心。创建三个线程,NodeHandleThread节点管理心跳管理,StatHandleThread服务器状态管理,PropertyHandleThread埋点信息定时上报。
初始化Server,创建心跳监听器。
1.3 加载Servant
@Override
public void init()
try
loadServants();
//inject om admin servant
injectAdminServant();
initServants();
appContextStarted();
System.out.println("[SERVER] The application started successfully.");
catch (Exception ex)
ready = false;
ex.printStackTrace();
System.out.println("[SERVER] failed to start the application.");
loadServants是模板方法,不同的tars实现方式,不同的读取方式。
其他的均是BaseAppContext实现的。
XmlAppContext.loadServants
protected void loadServants() throws Exception
XMLConfigFile cfg = new XMLConfigFile();
cfg.parse(getClass().getClassLoader().getResource("servants.xml").openStream());
XMLConfigElement root = cfg.getRootElement();
ArrayList<XMLConfigElement> elements = root.getChildList();
loadInitParams(root.getChildListByName("context-param"));
loadAppContextListeners(elements);
loadAppServants(elements);
loadDefaultFilter();
loadAppFilters(elements);
简单解释:
- 解析servants.xml
- 加载初始化参数
- 加载app监听器
- 加载Servants
- 加载默认的Filter
- 加载自定义的Filter
private ServantHomeSkeleton loadServant(XMLConfigElement element) throws ClassNotFoundException, InstantiationException, IllegalAccessException, IOException
String homeName = null, homeApiName = null, homeClassName = null, processorClazzName = null,
codecClazzName = null;
Class<?> homeApiClazz = null;
Class<? extends Codec> codecClazz = null;
Class<? extends Processor> processorClazz = null;
Object homeClassImpl = null;
ServantHomeSkeleton skeleton = null;
int maxLoadLimit = -1;
ServerConfig serverCfg = ConfigurationManager.getInstance().getServerConfig();
homeName = element.getStringAttribute("name");
if (StringUtils.isEmpty(homeName))
throw new RuntimeException("servant name is null.");
homeName = String.format("%s.%s.%s", serverCfg.getApplication(), serverCfg.getServerName(), homeName);
homeApiName = getChildNodeValue(element, "home-api");
homeClassName = getChildNodeValue(element, "home-class");
processorClazzName = getChildNodeValue(element, "home-processor-class");
codecClazzName = getChildNodeValue(element, "home-codec-class");
homeApiClazz = Class.forName(homeApiName);
homeClassImpl = Class.forName(homeClassName).newInstance();
codecClazz = (Class<? extends Codec>) (StringUtils.isEmpty(codecClazzName) ? null : Class.forName(codecClazzName));
processorClazz = (Class<? extends Processor>) (StringUtils.isEmpty(processorClazzName) ? null : Class.forName(processorClazzName));
if (TarsHelper.isServant(homeApiClazz))
String servantName = homeApiClazz.getAnnotation(Servant.class).name();
if (!StringUtils.isEmpty(servantName) && servantName.matches("^[\\\\w]+\\\\.[\\\\w]+\\\\.[\\\\w]+$"))
homeName = servantName;
ServantAdapterConfig servantAdapterConfig = serverCfg.getServantAdapterConfMap().get(homeName);
ServantAdapter ServerAdapter = new ServantAdapter(servantAdapterConfig);
skeleton = new ServantHomeSkeleton(homeName, homeClassImpl, homeApiClazz, codecClazz, processorClazz, maxLoadLimit);
skeleton.setAppContext(this);
ServerAdapter.bind(skeleton);
servantAdapterMap.put(homeName, ServerAdapter);
return skeleton;
ServantAdapterConfig servantAdapterConfig = serverCfg.getServantAdapterConfMap().get(homeName);
ServantAdapter ServerAdapter = new ServantAdapter(servantAdapterConfig);
skeleton = new ServantHomeSkeleton(homeName, homeClassImpl, homeApiClazz, codecClazz, processorClazz, maxLoadLimit);
skeleton.setAppContext(this);
ServerAdapter.bind(skeleton);
加载Servant的时候 就进行了NIO Server启动
public void bind(AppService appService) throws IOException
this.skeleton = (ServantHomeSkeleton) appService;
ServerConfig serverCfg = ConfigurationManager.getInstance().getServerConfig();
boolean keepAlive = true;
Codec codec = createCodec(serverCfg);
Processor processor = createProcessor(serverCfg);
Executor threadPool = ServantThreadPoolManager.get(servantAdapterConfig);
Endpoint endpoint = this.servantAdapterConfig.getEndpoint();
if (endpoint.type().equals("tcp"))
this.selectorManager = new SelectorManager(Utils.getSelectorPoolSize(), new ServantProtocolFactory(codec), threadPool, processor, keepAlive, "server-tcp-reactor", false);
this.selectorManager.setTcpNoDelay(serverCfg.isTcpNoDelay());
this.selectorManager.start();
System.out.println("[SERVER] server starting at " + endpoint + "...");
ServerSocketChannel serverChannel = ServerSocketChannel.open();
serverChannel.socket().bind(new InetSocketAddress(endpoint.host(), endpoint.port()), 1024);
serverChannel.configureBlocking(false);
selectorManager.getReactor(0).registerChannel(serverChannel, SelectionKey.OP_ACCEPT);
System.out.println("[SERVER] server started at " + endpoint + "...");
else if (endpoint.type().equals("udp"))
this.selectorManager = new SelectorManager(1, new ServantProtocolFactory(codec), threadPool, processor, false, "server-udp-reactor", true);
this.selectorManager.start();
System.out.println("[SERVER] server starting at " + endpoint + "...");
DatagramChannel serverChannel = DatagramChannel.open();
DatagramSocket socket = serverChannel.socket();
socket.bind(new InetSocketAddress(endpoint.host(), endpoint.port()));
serverChannel.configureBlocking(false);
this.selectorManager.getReactor(0).registerChannel(serverChannel, SelectionKey.OP_READ);
System.out.println("[SERVER] servant started at " + endpoint + "...");
bind 服务端口,供客户端调用使用
SpringAppContext.loadServants
protected void loadServants()
this.applicationContext = new ClassPathXmlApplicationContext("servants-spring.xml");
loadAppContextListeners(this.applicationContext);
loadAppServants(this.applicationContext);
loadDefaultFilter();
loadAppFilters(this.applicationContext);
简单解释:
- 加载servants-spring.xml
- 加载app监听器
- 加载Servants
- 加载默认的Filter
- 加载自定义的Filter
同理SpringbootAppContext,加载也类似,无非就是读取Servant及
其他的方式变了而已。
public abstract class BaseAppContext implements AppContext
boolean ready = true;
ConcurrentHashMap<String, ServantHomeSkeleton> skeletonMap = new ConcurrentHashMap<>();
ConcurrentHashMap<String, Adapter> servantAdapterMap = new ConcurrentHashMap<>();
HashMap<String, String> contextParams = new HashMap<>();
Set<AppContextListener> listeners = new HashSet<>(4);
Map<FilterKind, List<Filter>> filters = new HashMap<>();
加载生成的对象放在BaseContext以供后面使用
1.4 AppContextListener
public interface AppContextListener
void appContextStarted(AppContextEvent event);
void appServantStarted(AppServantEvent event);
appContextStarted:服务器上下文启动时调用,这个顺序在appServantStarted之后
void appContextStarted()
for (AppContextListener listener : listeners)
listener.appContextStarted(new DefaultAppContextEvent(this));
容器最终启动是在appContextStarted,也就是说tars服务启动注册成功后,我们就可以来初始化我们的容器了。可以在这个方法中启动spring,springboot项目。
2. 服务如何注册到注册中心
private Server()
System.out.println("[TARS] start server construction");
loadServerConfig();
initCommunicator();
startManagerService();
private void startManagerService()
OmServiceMngr.getInstance().initAndStartOmService();
public void initAndStartOmService()
Communicator communicator = CommunicatorFactory.getInstance().getCommunicator();
String app = ConfigurationManager.getInstance().getServerConfig().getApplication();
String serverName = ConfigurationManager.getInstance().getServerConfig().getServerName();
String basePath = ConfigurationManager.getInstance().getServerConfig().getBasePath();
String modualName = ConfigurationManager.getInstance().getServerConfig().getCommunicatorConfig().getModuleName();
ConfigHelper.getInstance().setConfigInfo(communicator, app, serverName, basePath);
NodeHelper.getInstance().setNodeInfo(communicator, app, serverName);
NotifyHelper.getInstance().setNotifyInfo(communicator, app, serverName);
PropertyReportHelper.getInstance().init(communicator, modualName);
NodeHelper.getInstance().reportVersion(ClientVersion.getVersion());
Policy avgPolicy = new CommonPropertyPolicy.Avg();
Policy maxPolicy = new CommonPropertyPolicy.Max();
PropertyReportHelper.getInstance().createPropertyReporter(OmConstants.PropWaitTime, avgPolicy, maxPolicy);
PropertyReportHelper.getInstance().createPropertyReporter(OmConstants.PropHeapUsed, new MemoryHeapUsedAvg());
PropertyReportHelper.getInstance().createPropertyReporter(OmConstants.PropHeapCommitted, new MemoryHeapCommittedAvg());
PropertyReportHelper.getInstance().createPropertyReporter(OmConstants.PropHeapMax, new MemoryHeapMaxAvg());
PropertyReportHelper.getInstance().createPropertyReporter(OmConstants.PropThreadCount, new ThreadNumAvg());
for (GarbageCollectorMXBean gcMXBean : ManagementFactory.getGarbageCollectorMXBeans())
PropertyReportHelper.getInstance().createPropertyReporter(OmConstants.PropGcCount + gcMXBean.getName(), new GCNumCount(gcMXBean.getName()));
PropertyReportHelper.getInstance().createPropertyReporter(OmConstants.PropGcTime + gcMXBean.getName(), new GCTimeSum(gcMXBean.getName()));
ServerStatHelper.getInstance().init(communicator);
TarsTraceZipkinConfiguration.getInstance().init();
ScheduledServiceMngr.getInstance().start();
public void start()
startHandleService();
private void startHandleService()
Runnable nodeHandler = new Thread(new NodeHandleThread(), "HeartBeat");
taskExecutorManager.scheduleAtFixedRate(nodeHandler, 0, HEART_BEAT_INTERVAL, TimeUnit.SECONDS);
int initialDelay = REPORT_INTERVAL + (random.nextInt(30) * 1000);
Runnable statHandler = new Thread(new StatHandleThread(), "ServerStat");
taskExecutorManager.scheduleAtFixedRate(statHandler, initialDelay, REPORT_INTERVAL, TimeUnit.MILLISECONDS);
if (REPORT_INTERVAL < PROPERTY_REPORT_MIN_INTERVAL)
REPORT_INTERVAL = PROPERTY_REPORT_MIN_INTERVAL;
initialDelay = REPORT_INTERVAL + (random.nextInt(30) * 1000);
Runnable propertyHandler = new Thread(new PropertyHandleThread(), "PropertyReport");
taskExecutorManager.scheduleAtFixedRate(propertyHandler, initialDelay, REPORT_INTERVAL, TimeUnit.MILLISECONDS);
读源码读在这里,我发现tars源码很多都喜欢用单例来调用,其实一个写的好的框架不应该源码中这么多的单例,更应该多的是接口,抽象,组合
继续来看吧。 主要是startHandleService
创建了三个定时任务的线程,
- HeartBeat:心跳,10秒维持一个心跳,NodeHelper.getInstance().keepAlive();10秒向服务端发送一个心跳。
- ServerStat:服务状态
- PropertyReport:属性上报,也就是埋点的数据上报。
这里就服务就注册到中心注册了,注册中心如果收不到心跳,服务的状态就会移除掉。这个原理很多注册中心都是这样的,比如eureka,zookeeper均是这样,都是一样的套路来处理的。
3.总结
tars和其他微服务框架,比如dubbo,springcloud一样的套路。
- 初始化节点服务:读取服务配置,核心的ip,端口,服务名称等
- 注册到注册中心: tars的注册中心,eureka,zookeeper 。定时发送心跳,维持健康。
- 初始化服务: 服务的协议接口,版本等等信息
- 启动服务
当然中途会提供很多接口可以使用,不用担心万变不离其中,抓住核心去看源码如何实现就可以扩展了。
以上是关于tars源码解析1--服务端启动的主要内容,如果未能解决你的问题,请参考以下文章