Pigeon源码分析 -- 客户端调用源码分析
Posted MaXianZhe
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Pigeon源码分析 -- 客户端调用源码分析相关的知识,希望对你有一定的参考价值。
先看客户端调用的例子
public static void main(String[] args) throws Exception { InvokerConfig<EchoService> invokerConfig = new InvokerConfig<>(EchoService.class); EchoService echoService = ServiceFactory.getService( invokerConfig); System.out.println("echoService result:" + echoService.echo("echoService_input")); }
要分析的入口就是 ServiceFactory.getService
一 ServiceFactory.getService
public static <T> T getService(InvokerConfig<T> invokerConfig) throws RpcException { return serviceProxy.getProxy(invokerConfig); }
static ServiceProxy serviceProxy = ServiceProxyLoader.getServiceProxy();
二 AbstractServiceProxy# getProxy
public <T> T getProxy(InvokerConfig<T> invokerConfig) { if (invokerConfig.getServiceInterface() == null) { throw new IllegalArgumentException("service interface is required"); } if (StringUtils.isBlank(invokerConfig.getUrl())) { invokerConfig.setUrl(ServiceFactory.getServiceUrl(invokerConfig)); } if (!StringUtils.isBlank(invokerConfig.getProtocol()) && !invokerConfig.getProtocol().equalsIgnoreCase(Constants.PROTOCOL_DEFAULT)) { String protocolPrefix = "@" + invokerConfig.getProtocol().toUpperCase() + "@"; if (!invokerConfig.getUrl().startsWith(protocolPrefix)) { invokerConfig.setUrl(protocolPrefix + invokerConfig.getUrl()); } } Object service = null; service = services.get(invokerConfig); if (service == null) { try { InvokerBootStrap.startup();//初始化各种之后用得到的,比如调用链,负载均衡客户端,序列化器等 service = SerializerFactory.getSerializer(invokerConfig.getSerialize()).proxyRequest(invokerConfig);//使用jdk的动态代理 if (StringUtils.isNotBlank(invokerConfig.getLoadbalance())) { LoadBalanceManager.register(invokerConfig.getUrl(), invokerConfig.getGroup(), invokerConfig.getLoadbalance());//注册负载均衡器 } } catch (Throwable t) { throw new RpcException("error while trying to get service:" + invokerConfig, t); } // setup region policy for service try { regionPolicyManager.register(invokerConfig.getUrl(), invokerConfig.getGroup(), invokerConfig.getRegionPolicy()); } catch (Throwable t) { throw new RegionException("error while setup region route policy: " + invokerConfig, t); } try { ClientManager.getInstance().registerClients(invokerConfig.getUrl(), invokerConfig.getGroup(), invokerConfig.getVip());//建立netty连接,并缓存起来
这里重点看下 前两个方法就行
InvokerBootStrap.startup();
service = SerializerFactory.getSerializer(invokerConfig.getSerialize()).proxyRequest(invokerConfig);
三 InvokerBootStrap.startup()
public static void startup() { if (!isStartup) { synchronized (InvokerBootStrap.class) { if (!isStartup) { RegistryConfigLoader.init();//配置中心初始化,不必关心 ServiceInvocationRepository.getInstance().init();//启动一个线程 检查所有当前还未结束的请求,每个一秒检查一次 是否执行时间大于请求里配置的超时时间 InvokerProcessHandlerFactory.init();//初始化各个Filter,这些filter会组成调用链 SerializerFactory.init();//初始化全部序列化器 LoadBalanceManager.init();//初始化四种负载均衡器 RegionPolicyManager.INSTANCE.init(); Monitor monitor = MonitorLoader.getMonitor(); if (monitor != null) { monitor.init(); } isStartup = true; logger.warn("pigeon client[version:" + VersionUtils.VERSION + "] has been started"); } } } }
这里重点看下 初始化拦截器链的逻辑
InvokerProcessHandlerFactory#init()
public static void init() { if (!isInitialized) { registerBizProcessFilter(new InvokerDataFilter()); if (Constants.MONITOR_ENABLE) { registerBizProcessFilter(new RemoteCallMonitorInvokeFilter()); } registerBizProcessFilter(new DegradationFilter()); registerBizProcessFilter(new FlowControlPigeonClientFilter()); registerBizProcessFilter(new ClusterInvokeFilter()); registerBizProcessFilter(new GatewayInvokeFilter()); registerBizProcessFilter(new ContextPrepareInvokeFilter()); registerBizProcessFilter(new RemoteCallInvokeFilter()); bizInvocationHandler = createInvocationHandler(bizProcessFilters); isInitialized = true; } }
private static <V extends ServiceInvocationFilter> ServiceInvocationHandler createInvocationHandler( List<V> internalFilters) { ServiceInvocationHandler last = null; List<V> filterList = new ArrayList<V>(); filterList.addAll(internalFilters); for (int i = filterList.size() - 1; i >= 0; i--) { final V filter = filterList.get(i); final ServiceInvocationHandler next = last; last = new ServiceInvocationHandler() { @SuppressWarnings("unchecked") @Override public InvocationResponse handle(InvocationContext invocationContext) throws Throwable { InvocationResponse resp = filter.invoke(next, invocationContext); return resp; } }; } return last; }
上面的写法是责任链最好的写法了,责任链最强的地方就在于,可以在里面的next,也就是下一个handler之前做一些事情,之后做一些事情。比如统计请求失败率的filter就是在next执行之后,看执行的结果。
四 DefaultAbstractSerializer# proxyRequest
@Override public Object proxyRequest(InvokerConfig<?> invokerConfig) throws SerializationException { return Proxy.newProxyInstance(ClassUtils.getCurrentClassLoader(invokerConfig.getClassLoader()), new Class[] { invokerConfig.getServiceInterface() }, new ServiceInvocationProxy(invokerConfig, InvokerProcessHandlerFactory.selectInvocationHandler(invokerConfig))); }
public class ServiceInvocationProxy implements InvocationHandler
public class ServiceInvocationProxy implements InvocationHandler { private static final Logger logger = LoggerLoader.getLogger(ServiceInvocationProxy.class); private InvokerConfig<?> invokerConfig; private ServiceInvocationHandler handler; public ServiceInvocationProxy(InvokerConfig<?> invokerConfig, ServiceInvocationHandler handler) { this.invokerConfig = invokerConfig; this.handler = handler; } public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { String methodName = method.getName(); Class<?>[] parameterTypes = method.getParameterTypes(); if (method.getDeclaringClass() == Object.class) { return method.invoke(handler, args); } if ("toString".equals(methodName) && parameterTypes.length == 0) { return handler.toString(); } if ("hashCode".equals(methodName) && parameterTypes.length == 0) { return handler.hashCode(); } if ("equals".equals(methodName) && parameterTypes.length == 1) { return handler.equals(args[0]); } return extractResult( handler.handle(new DefaultInvokerContext(invokerConfig, methodName, parameterTypes, args)), method.getReturnType()); }
其实就是调用上一步的各种责任链,这里就包括了负载均衡,发送tcp请求等底层逻辑
五 ClientManager.getInstance().registerClients
实际调用的是 ClientManager 中的内部类
class InnerServiceProviderChangeListener implements ServiceProviderChangeListener { @Override public void providerAdded(ServiceProviderChangeEvent event) { if (logger.isInfoEnabled()) { logger.info("add " + event.getHost() + ":" + event.getPort() + ":" + event.getWeight() + " to " + event.getServiceName()); } registerClient(event.getServiceName(), event.getHost(), event.getPort(), event.getWeight(),event.getGroup()); }
public void registerClient(String serviceName, String host, int port, int weight,String group) { ConnectInfo connectInfo = new ConnectInfo(serviceName, host, port, weight,group); this.clusterListenerManager.addConnect(connectInfo); RegistryManager.getInstance().addServiceAddress(serviceName, host, port, weight,group);//这部分都是把服务端缓存起来的逻辑 }
创建netty连接的逻辑在 this.clusterListenerManager.addConnect(connectInfo);
实际调用的是 DefaultClusterListener.addConnect
public void addConnect(ConnectInfo connectInfo) { if (logger.isInfoEnabled()) { logger.info("[cluster-listener] add service provider:" + connectInfo); } Client client = this.allClients.get(connectInfo.getConnect()); if (clientExisted(connectInfo)) { if (client != null) { for (Map<String, List<Client>> clientMap : serviceClients.values()) { for (List<Client> clientList : clientMap.values()) { int idx = clientList.indexOf(client); if (idx >= 0 && clientList.get(idx) != client) { closeClientInFuture(client); } } } } else { return; } } if (client == null) { client = ClientSelector.selectClient(connectInfo);//创建tcp的客户端 netty实现的 } if (!this.allClients.containsKey(connectInfo.getConnect())) { Client oldClient = this.allClients.putIfAbsent(connectInfo.getConnect(), client);//进行缓存 if (oldClient != null) { client = oldClient; } }
代码很多,重点说下
1 建立netty客户端连接,是和每一个服务端都建立一个netty连接
2 监听zk节点,如果有服务端下线,去掉对应的连接
以上是关于Pigeon源码分析 -- 客户端调用源码分析的主要内容,如果未能解决你的问题,请参考以下文章