Spring-Cloud系列-Openfeign源码解析
Posted _微风轻起
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spring-Cloud系列-Openfeign源码解析相关的知识,希望对你有一定的参考价值。
这一篇我们主要来看下OpenFeign是怎样解析调用以及负载处理的。
@FeignClient(value = "producer-server")
public interface FeignConsumerClient
@RequestMapping(value = "producer/simple",method = RequestMethod.GET)
public String producerSimple();
我们这里是对producer-server
微服务的producer/simple
方法的调用。看到主要会有两个主要的疑问:一是怎样通过微服务的名称producer-server
找到这个集群对应的urls
也就是其实际的url
地址。其二是其内部是是怎样解析数据的传输、获取的以及其是怎样对接口的数据进行封装还有对返回数据进行解析已经转换为方法的返回类的。
一、服务名称获取对应实际地址
这个主要是从注册中心获取微服务对应的地址。
@RequestMapping(value = "simple",method = RequestMethod.GET)
public String simpleMethod()
return feignConsumerClient.producerSimple();
我们这里是一个简单的调用,这里对应的feignConsumerClient
是openfeign
生成的一个代理类,执行是是进入ReflectiveFeign
处理
1、ReflectiveFeign
public class ReflectiveFeign extends Feign
我们看到如果不是一些equals
这些本身的基本方法,就是通过dispatch
来处理,这里有点类似于SpringMvc
的DispatcherServlet
分发处理了。
private final Map<Method, MethodHandler> dispatch;
这个dispatch
是一个路由map,通过Method
找到该方法的实际封装处理类MethodHandler
2、SynchronousMethodHandler
final class SynchronousMethodHandler implements MethodHandler
@Override
public Object invoke(Object[] argv) throws Throwable
RequestTemplate template = buildTemplateFromArgs.create(argv);
Options options = findOptions(argv);
Retryer retryer = this.retryer.clone();
while (true)
try
return executeAndDecode(template, options);
catch (RetryableException e)
try
retryer.continueOrPropagate(e);
catch (RetryableException th)
Throwable cause = th.getCause();
if (propagationPolicy == UNWRAP && cause != null)
throw cause;
else
throw th;
if (logLevel != Logger.Level.NONE)
logger.logRetry(metadata.configKey(), logLevel);
continue;
private final RequestTemplate.Factory buildTemplateFromArgs;
这里先是生成一个Http
的请求模板类RequestTemplate
,这个类如果我们有使用RequestInterceptor
的话可能有遇到过,例如如果我们通过opengeign
去调用其他的地址要加什么头部等内容的话,就会通过这个拓展接口去处理,那这个RequestInterceptor
接口是在上面时候调用的呢?就是在executeAndDecode
方法:
Object executeAndDecode(RequestTemplate template, Options options) throws Throwable
Request request = targetRequest(template);
......
Response response;
long start = System.nanoTime();
try
response = client.execute(request, options);
response = response.toBuilder()
.request(request)
.requestTemplate(template)
.build();
catch (IOException e)
.........
long elapsedTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
if (decoder != null)
return decoder.decode(response, metadata.returnType());
CompletableFuture<Object> resultFuture = new CompletableFuture<>();
asyncResponseHandler.handleResponse(resultFuture, metadata.configKey(), response,
metadata.returnType(),
elapsedTime);
try
.........
return resultFuture.join();
catch (CompletionException e)
Throwable cause = e.getCause();
if (cause != null)
throw cause;
throw e;
Request targetRequest(RequestTemplate template)
for (RequestInterceptor interceptor : requestInterceptors)
interceptor.apply(template);
return target.apply(template);
在前面就通过targetRequest
来前置处理用户拓展的内容,然后就正式通过client.execute(request, options)
来发送请求:
这里的client
的实现类是LoadBalancerFeignClient
:
3、LoadBalancerFeignClient
可以看到我们目前的地址还是微服务的名称,并不是实际的ip
地址。这里接下来会有两部处理,先是从注册中心获取这个微服务对应的ip
地址集合、然后再进行负载的处理选择一个地址。
private FeignLoadBalancer lbClient(String clientName)
return this.lbClientFactory.create(clientName);
这里就是通过clientName
当前是producer-server
,也就是生产者集群获取其的服务列表,例如ip
、端口
这些信息。这个是在lb
也就是IloadBalancer
中的(当前的实现类是ZoneAwareLoadBalancer
),这个是通过factory
创建:
public FeignLoadBalancer create(String clientName)
FeignLoadBalancer client = this.cache.get(clientName);
if (client != null)
return client;
IClientConfig config = this.factory.getClientConfig(clientName);
ILoadBalancer lb = this.factory.getLoadBalancer(clientName);
ServerIntrospector serverIntrospector = this.factory.getInstance(clientName,
ServerIntrospector.class);
client = this.loadBalancedRetryFactory != null
? new RetryableFeignLoadBalancer(lb, config, serverIntrospector,
this.loadBalancedRetryFactory)
: new FeignLoadBalancer(lb, config, serverIntrospector);
this.cache.put(clientName, client);
return client;
其逻辑是有一个缓存。那这里的两个列表是怎样获取的呢?也是在项目启动,因为会配置fetchRegistry
属性(默认为true
),即从注册中心获取消息,这个时候会将处理获取封装到这里,然后就是通过定时任务同步,或服务主动注册修改。
4、对应待调用的微服务的ip信息获取
1)、自动拉取
配置主动拉取的话其是在进行DiscoveryClient
初始化创建的时候(也就是eureka
客户端的核心类,本来准备先写文章介绍eureka
的,但一些概念性的东西,还有一些细节感觉不能很简单的描述出来,所以就没先写eureka
的内容),进行处理的,如果判断当前客户端需要拉取注册中心的内容,就会调用getAndStoreFullRegistry
,从注册中心获取已经注册的内容然后放到localRegionApps
中,
private final AtomicReference<Applications> localRegionApps = new AtomicReference<Applications>();
private void getAndStoreFullRegistry() throws Throwable
long currentUpdateGeneration = fetchRegistryGeneration.get();
logger.info("Getting all instance registry info from the eureka server");
Applications apps = null;
EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null
? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())
: eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());
if (httpResponse.getStatusCode() == Status.OK.getStatusCode())
apps = httpResponse.getEntity();
logger.info("The response status is ", httpResponse.getStatusCode());
if (apps == null)
logger.error("The application is null for some reason. Not storing this information");
else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1))
localRegionApps.set(this.filterAndShuffle(apps));
logger.debug("Got full registry with apps hashcode ", apps.getAppsHashCode());
else
logger.warn("Not updating applications as another thread is updating it already");
这个列表同样也有定时任务维护。当获取到Applications
,这个也就是注册的服务信息后,然后在IloadBalancer
实现类测试化的时候,也同样会将allServerList
、upServerList
这些列表更新。
2)、定时任务
定时任务的话就是DynamicServerListLoadBalancer
类中的ServerListUpdater.UpdateAction
处理
protected final ServerListUpdater.UpdateAction updateAction = new ServerListUpdater.UpdateAction()
@Override
public void doUpdate()
updateListOfServers();
;
我们前面提到IloadBalancer
的当前实现是ZoneAwareLoadBalancer
,ZoneAwareLoadBalancer
是继承的DynamicServerListLoadBalancer
:
我们前面的那些allServerList
、upServerList
列表是在基类BaseLoadBalancer
中,上面的updateListOfServers
就是定时更新 allServerList
这些列表,而这些列表就是从服务端获取的实例列表List<InstanceInfo>
这些实例列表又是从我们前面提到的DiscoveryClient
中获取的。
然后通过实例信息就能获取ip
这些信息,之后就能通过updateListOfServers
方法发起更新到allServerList
这些列表。
二、负载选取
前面我们梳理的是producer-server
的集群ip
获取,下面我们来梳理下在接口调用时的负载选取,我们回到前面的LoadBalancerFeignClient
的调用方法:
public Response execute(Request request, Request.Options options) throws IOException
try
URI asUri = URI.create(request.url());
String clientName = asUri.getHost();
URI uriWithoutHost = cleanUrl(request.url(), clientName);
FeignLoadBalancer.RibbonRequest ribbonRequest = new FeignLoadBalancer.RibbonRequest(
this.delegate, request, uriWithoutHost);
IClientConfig requestConfig = getClientConfig(options, clientName);
return lbClient(clientName)
.executeWithLoadBalancer(ribbonRequest, requestConfig).toResponse();
下面就是executeWithLoadBalancer
,也就是通过LoadBalancerFeignClient
的方法执行:
public T executeWithLoadBalancer(final S request, final IClientConfig requestConfig) throws ClientException
LoadBalancerCommand<T> command = buildLoadBalancerCommand(request, requestConfig);
try
return command.submit(
new ServerOperation<T>()
@Override
public Observable<T> call(Server server)
URI finalUri = reconstructURIWithServer(server, request.getUri());
S requestForServer = (S) request.replaceUri(finalUri);
try
return Observable.just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig));
catch (Exception e)
return Observable.error(e);
)
.toBlocking()
.single();
catch (Exception e)
Throwable t = e.getCause();
if (t instanceof ClientException)
throw (ClientException) t;
else
throw new ClientException(e);
这里的执行是通过RxJava
这个异步执行框架,感兴趣可以去了解下,但我们可以简单了解这里的主要逻辑是LoadBalancerCommand
处理的,其command.submit
里面的主要逻辑就是封装对应的逻辑交给RxJava
去处理:
public Observable<T> submit(final ServerOperation<T> operation)
final ExecutionInfoContext context = new ExecutionInfoContext();
.....
// Use the load balancer
Observable<T> o =
(server == null ? selectServer() : Observable.just(server))
.concatMap(new Func1<Server, Observable<T>>()
@Override
// Called for each server being selected
public Observable<T> call(Server server)
context.setServer(server);
final ServerStats stats = loadBalancerContext.getServerStats(server);
// Called for each attempt and retry
Observable<T> o = Observable
.just(server)
.concatMap(new Func1<Server, Observable<T>>()
@Override
public Observable<T> call(final Server server)
context.incAttemptCount();
loadBalancerContext.noteOpenConnection(stats);
if (listenerInvoker != null)
try
listenerInvoker.onStartWithServer(context.toExecutionInfo());
catch (AbortExecutionException e)
return Observable.error(e);
final Stopwatch tracer = loadBalancerContext.getExecuteTracer().start();
return operation.call(server).doOnEach(new Observer<T>()
........
这里的一大串很多,如果不是比较了解RxJava
的话,看的是眼花缭乱。我们抓住重点,一个就是通过selectServer()
方法选择最终调用的那台机的信息:
private Observable<Server> selectServer()
return Observable.create(new OnSubscribe<Server>()
@Override
public void call(Subscriber<? super Server> next)
try
Server server = loadBalancerContext.getServerFromLoadBalancer(loadBalancerURI, loadBalancerKey);
next.onNext(server);
next.onCompleted();
catch (Exception e)
next.onError(e);
);
public Server getServerFromLoadBalancer(@Nullable URI original, @Nullable Object loadBalancerKey) throws ClientException
.........
ILoadBalancer lb = getLoadBalancer();
if (host == null)
if (lb != null)
Server svc = lb.chooseServer(loadBalancerKey);
if (svc == null)
throw new ClientException(ClientException.ErrorType.GENERAL,
"Load balancer does not have available server for client: "
+ clientName);
host = svc.getHost();
if (host == null)
throw new ClientException(ClientException.ErrorType.GENERAL,
"Invalid Server for :" + svc);
logger.debug(" using LB returned Server: for request ", new Object[]clientName, svc, original);
return svc;
..SpringBoot引入openfeign 报错:spring-cloud-starter-openfeign:unknown
在 openfeign 2.1.1.RELEASE 中找不到 @EnableFeignClients
分布式系列接口调用openfeign小试牛刀---openfeign接口是如何注入spring的