Spring-Cloud系列-Openfeign源码解析

Posted _微风轻起

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spring-Cloud系列-Openfeign源码解析相关的知识,希望对你有一定的参考价值。

这一篇我们主要来看下OpenFeign是怎样解析调用以及负载处理的。

@FeignClient(value = "producer-server")
public interface FeignConsumerClient 

    @RequestMapping(value = "producer/simple",method = RequestMethod.GET)
    public String producerSimple();


​ 我们这里是对producer-server微服务的producer/simple方法的调用。看到主要会有两个主要的疑问:一是怎样通过微服务的名称producer-server找到这个集群对应的urls也就是其实际的url地址。其二是其内部是是怎样解析数据的传输、获取的以及其是怎样对接口的数据进行封装还有对返回数据进行解析已经转换为方法的返回类的。

一、服务名称获取对应实际地址

​ 这个主要是从注册中心获取微服务对应的地址。

@RequestMapping(value = "simple",method = RequestMethod.GET)
public String simpleMethod()
    return feignConsumerClient.producerSimple();

​ 我们这里是一个简单的调用,这里对应的feignConsumerClientopenfeign生成的一个代理类,执行是是进入ReflectiveFeign处理

1、ReflectiveFeign

public class ReflectiveFeign extends Feign 

​ 我们看到如果不是一些equals这些本身的基本方法,就是通过dispatch来处理,这里有点类似于SpringMvcDispatcherServlet分发处理了。

private final Map<Method, MethodHandler> dispatch;

​ 这个dispatch是一个路由map,通过Method找到该方法的实际封装处理类MethodHandler

2、SynchronousMethodHandler

final class SynchronousMethodHandler implements MethodHandler 
@Override
public Object invoke(Object[] argv) throws Throwable 
  RequestTemplate template = buildTemplateFromArgs.create(argv);
  Options options = findOptions(argv);
  Retryer retryer = this.retryer.clone();
  while (true) 
    try 
      return executeAndDecode(template, options);
     catch (RetryableException e) 
      try 
        retryer.continueOrPropagate(e);
       catch (RetryableException th) 
        Throwable cause = th.getCause();
        if (propagationPolicy == UNWRAP && cause != null) 
          throw cause;
         else 
          throw th;
        
      
      if (logLevel != Logger.Level.NONE) 
        logger.logRetry(metadata.configKey(), logLevel);
      
      continue;
    
  

private final RequestTemplate.Factory buildTemplateFromArgs;

​ 这里先是生成一个Http的请求模板类RequestTemplate,这个类如果我们有使用RequestInterceptor的话可能有遇到过,例如如果我们通过opengeign去调用其他的地址要加什么头部等内容的话,就会通过这个拓展接口去处理,那这个RequestInterceptor接口是在上面时候调用的呢?就是在executeAndDecode方法:

Object executeAndDecode(RequestTemplate template, Options options) throws Throwable 
  Request request = targetRequest(template);
	......
  Response response;
  long start = System.nanoTime();
  try 
    response = client.execute(request, options);
    response = response.toBuilder()
        .request(request)
        .requestTemplate(template)
        .build();
   catch (IOException e) 
    .........
  
  long elapsedTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
  if (decoder != null)
    return decoder.decode(response, metadata.returnType());
  CompletableFuture<Object> resultFuture = new CompletableFuture<>();
  asyncResponseHandler.handleResponse(resultFuture, metadata.configKey(), response,
      metadata.returnType(),
      elapsedTime);
  try 
		.........
    return resultFuture.join();
   catch (CompletionException e) 
    Throwable cause = e.getCause();
    if (cause != null)
      throw cause;
    throw e;
  

Request targetRequest(RequestTemplate template) 
  for (RequestInterceptor interceptor : requestInterceptors) 
    interceptor.apply(template);
  
  return target.apply(template);

​ 在前面就通过targetRequest来前置处理用户拓展的内容,然后就正式通过client.execute(request, options)来发送请求:

这里的client的实现类是LoadBalancerFeignClient:

3、LoadBalancerFeignClient

​ 可以看到我们目前的地址还是微服务的名称,并不是实际的ip地址。这里接下来会有两部处理,先是从注册中心获取这个微服务对应的ip地址集合、然后再进行负载的处理选择一个地址。

private FeignLoadBalancer lbClient(String clientName) 
   return this.lbClientFactory.create(clientName);

​ 这里就是通过clientName当前是producer-server,也就是生产者集群获取其的服务列表,例如ip端口这些信息。这个是在lb也就是IloadBalancer中的(当前的实现类是ZoneAwareLoadBalancer),这个是通过factory创建:

public FeignLoadBalancer create(String clientName) 
   FeignLoadBalancer client = this.cache.get(clientName);
   if (client != null) 
      return client;
   
   IClientConfig config = this.factory.getClientConfig(clientName);
   ILoadBalancer lb = this.factory.getLoadBalancer(clientName);
   ServerIntrospector serverIntrospector = this.factory.getInstance(clientName,
         ServerIntrospector.class);
   client = this.loadBalancedRetryFactory != null
         ? new RetryableFeignLoadBalancer(lb, config, serverIntrospector,
               this.loadBalancedRetryFactory)
         : new FeignLoadBalancer(lb, config, serverIntrospector);
   this.cache.put(clientName, client);
   return client;

​ 其逻辑是有一个缓存。那这里的两个列表是怎样获取的呢?也是在项目启动,因为会配置fetchRegistry属性(默认为true),即从注册中心获取消息,这个时候会将处理获取封装到这里,然后就是通过定时任务同步,或服务主动注册修改。

4、对应待调用的微服务的ip信息获取

1)、自动拉取

​ 配置主动拉取的话其是在进行DiscoveryClient初始化创建的时候(也就是eureka客户端的核心类,本来准备先写文章介绍eureka的,但一些概念性的东西,还有一些细节感觉不能很简单的描述出来,所以就没先写eureka的内容),进行处理的,如果判断当前客户端需要拉取注册中心的内容,就会调用getAndStoreFullRegistry,从注册中心获取已经注册的内容然后放到localRegionApps中,

private final AtomicReference<Applications> localRegionApps = new AtomicReference<Applications>();
private void getAndStoreFullRegistry() throws Throwable 
    long currentUpdateGeneration = fetchRegistryGeneration.get();

    logger.info("Getting all instance registry info from the eureka server");

    Applications apps = null;
    EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null
            ? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())
            : eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());
    if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) 
        apps = httpResponse.getEntity();
    
    logger.info("The response status is ", httpResponse.getStatusCode());

    if (apps == null) 
        logger.error("The application is null for some reason. Not storing this information");
     else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) 
        localRegionApps.set(this.filterAndShuffle(apps));
        logger.debug("Got full registry with apps hashcode ", apps.getAppsHashCode());
     else 
        logger.warn("Not updating applications as another thread is updating it already");
    

​ 这个列表同样也有定时任务维护。当获取到Applications,这个也就是注册的服务信息后,然后在IloadBalancer实现类测试化的时候,也同样会将allServerListupServerList这些列表更新。

2)、定时任务

​ 定时任务的话就是DynamicServerListLoadBalancer类中的ServerListUpdater.UpdateAction处理

protected final ServerListUpdater.UpdateAction updateAction = new ServerListUpdater.UpdateAction() 
    @Override
    public void doUpdate() 
        updateListOfServers();
    
;

​ 我们前面提到IloadBalancer的当前实现是ZoneAwareLoadBalancerZoneAwareLoadBalancer是继承的DynamicServerListLoadBalancer

​ 我们前面的那些allServerListupServerList列表是在基类BaseLoadBalancer中,上面的updateListOfServers就是定时更新 allServerList这些列表,而这些列表就是从服务端获取的实例列表List<InstanceInfo>这些实例列表又是从我们前面提到的DiscoveryClient中获取的。

​ 然后通过实例信息就能获取ip这些信息,之后就能通过updateListOfServers方法发起更新到allServerList这些列表。

二、负载选取

​ 前面我们梳理的是producer-server的集群ip获取,下面我们来梳理下在接口调用时的负载选取,我们回到前面的LoadBalancerFeignClient的调用方法:

public Response execute(Request request, Request.Options options) throws IOException 
   try 
      URI asUri = URI.create(request.url());
      String clientName = asUri.getHost();
      URI uriWithoutHost = cleanUrl(request.url(), clientName);
      FeignLoadBalancer.RibbonRequest ribbonRequest = new FeignLoadBalancer.RibbonRequest(
            this.delegate, request, uriWithoutHost);

      IClientConfig requestConfig = getClientConfig(options, clientName);
      return lbClient(clientName)
            .executeWithLoadBalancer(ribbonRequest, requestConfig).toResponse();
   

​ 下面就是executeWithLoadBalancer,也就是通过LoadBalancerFeignClient的方法执行:

public T executeWithLoadBalancer(final S request, final IClientConfig requestConfig) throws ClientException 
    LoadBalancerCommand<T> command = buildLoadBalancerCommand(request, requestConfig);

    try 
        return command.submit(
            new ServerOperation<T>() 
                @Override
                public Observable<T> call(Server server) 
                    URI finalUri = reconstructURIWithServer(server, request.getUri());
                    S requestForServer = (S) request.replaceUri(finalUri);
                    try 
                        return Observable.just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig));
                     
                    catch (Exception e) 
                        return Observable.error(e);
                    
                
            )
            .toBlocking()
            .single();
     catch (Exception e) 
        Throwable t = e.getCause();
        if (t instanceof ClientException) 
            throw (ClientException) t;
         else 
            throw new ClientException(e);
        
    
    

​ 这里的执行是通过RxJava这个异步执行框架,感兴趣可以去了解下,但我们可以简单了解这里的主要逻辑是LoadBalancerCommand处理的,其command.submit里面的主要逻辑就是封装对应的逻辑交给RxJava去处理:

public Observable<T> submit(final ServerOperation<T> operation) 
    final ExecutionInfoContext context = new ExecutionInfoContext();
   		.....
    // Use the load balancer
    Observable<T> o = 
            (server == null ? selectServer() : Observable.just(server))
            .concatMap(new Func1<Server, Observable<T>>() 
                @Override
                // Called for each server being selected
                public Observable<T> call(Server server) 
                    context.setServer(server);
                    final ServerStats stats = loadBalancerContext.getServerStats(server);
                    // Called for each attempt and retry
                    Observable<T> o = Observable
                            .just(server)
                            .concatMap(new Func1<Server, Observable<T>>() 
                                @Override
                                public Observable<T> call(final Server server) 
                                    context.incAttemptCount();
                                    loadBalancerContext.noteOpenConnection(stats);
                                    
                                    if (listenerInvoker != null) 
                                        try 
                                            listenerInvoker.onStartWithServer(context.toExecutionInfo());
                                         catch (AbortExecutionException e) 
                                            return Observable.error(e);
                                        
                                    
                                    
                                    final Stopwatch tracer = loadBalancerContext.getExecuteTracer().start();
                                    
                                    return operation.call(server).doOnEach(new Observer<T>() 
 						........

​ 这里的一大串很多,如果不是比较了解RxJava的话,看的是眼花缭乱。我们抓住重点,一个就是通过selectServer()方法选择最终调用的那台机的信息:

private Observable<Server> selectServer() 
    return Observable.create(new OnSubscribe<Server>() 
        @Override
        public void call(Subscriber<? super Server> next) 
            try 
                Server server = loadBalancerContext.getServerFromLoadBalancer(loadBalancerURI, loadBalancerKey);   
                next.onNext(server);
                next.onCompleted();
             catch (Exception e) 
                next.onError(e);
            
        
    );

public Server getServerFromLoadBalancer(@Nullable URI original, @Nullable Object loadBalancerKey) throws ClientException 
		.........
    ILoadBalancer lb = getLoadBalancer();
    if (host == null) 
        if (lb != null)
            Server svc = lb.chooseServer(loadBalancerKey);
            if (svc == null)
                throw new ClientException(ClientException.ErrorType.GENERAL,
                        "Load balancer does not have available server for client: "
                                + clientName);
            
            host = svc.getHost();
            if (host == null)
                throw new ClientException(ClientException.ErrorType.GENERAL,
                        "Invalid Server for :" + svc);
            
            logger.debug(" using LB returned Server:  for request ", new Object[]clientName, svc, original);
            return svc;
       ..SpringBoot引入openfeign 报错:spring-cloud-starter-openfeign:unknown

day07-OpenFeign-服务调用

在 openfeign 2.1.1.RELEASE 中找不到 @EnableFeignClients

分布式系列接口调用openfeign小试牛刀---openfeign接口是如何注入spring的

SpringCloud系列——openfeign远程服务调用实战

Spring-Cloud系列-Hystrix源码解析