SpringCloud升级之路2020.0.x版-27.OpenFeign的生命周期-创建代理
Posted 干货满满张哈希
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SpringCloud升级之路2020.0.x版-27.OpenFeign的生命周期-创建代理相关的知识,希望对你有一定的参考价值。
接下来,我们开始分析 OpenFeign 的生命周期,结合 OpenFeign 本身的源代码。首先是从接口定义创建 OpenFeign 代理开始。我们这里只关心同步客户端,因为异步客户端目前还在实现中,并且在我们的项目中,异步响应式的客户端不用 OpenFeign,而是用的官方的 WebClient
创建 OpenFeign 代理
创建 OpenFeign 代理,主要分为以下几步:
- 使用 Contract 解析接口的每一个方法,生成每一个方法的元数据列表:
List<MethodMetadata> metadata
- 根据每一个
MethodMetadata
,生成对应的请求模板工厂RequestTemplate.Factory
,用于生成后面的请求。同时,使用这个模板工厂以及其他配置生成对应的方法处理器MethodHandler
,对于同步的 OpenFeign,MethodHandler
实现为SynchronousMethodHandler
。将接口方法与MethodHandler
一一对应建立映射,结果为Map<Method, MethodHandler> methodToHandler
。对于 Java 8 引入的 interface default 方法,需要用不同MethodHandler
,即DefaultMethodHandler
,因为这种方法不用代理,不用生成对应的 http 调用,其实现为直接调用对应的 default 方法代码。 - 使用 InvocationHandlerFactory 这个工厂,创建
InvocationHandler
用于代理调用。 - 调用 JDK 动态代理生成类方法使用
InvocationHandler
创建代理类。
创建 OpenFeign 代理,主要基于 JDK 的动态代理实现。我们先举一个简单的例子,创建一个 JDK 动态代理,用来类比。
JDK 动态代理
使用 JDK 动态代理,需要如下几个步骤:
1. 编写接口以及对应的代理类。我们这里编写一个简单的接口和对应的实现类:
public interface TestService {
void test();
}
public class TestServiceImpl implements TestService {
@Override
public void test() {
System.out.println("TestServiceImpl#test is called");
}
}
2.创建代理类实现java.lang.reflect.InvocationHandler
,并且,在核心方法中,调用实际的对象,这里即我们上面 TestService 的实现类 TestServiceImpl 的对象。
JDK 中有内置的动态代理 API,其核心是 java.lang.reflect.InvocationHandler
。我们先来创建一个简单的 InvocationHandler
实现类:
public class SimplePrintMethodInvocationHandler implements InvocationHandler {
private final TestService testService;
public SimplePrintMethodInvocationHandler(TestService testService) {
this.testService = testService;
}
@Override
public Object invoke(
//代理对象
Object proxy,
//调用的方法
Method method,
//使用的参数
Object[] args)
throws Throwable {
System.out.println("Invoked method: " + method.getName());
//进行实际的调用
return method.invoke(testService, args);
}
}
3.创建代理对象,并使用代理对象调用。一般通过 Proxy 的静态方法去创建,例如:
//首先,创建要代理的对象
TestServiceImpl testServiceImpl = new TestServiceImpl();
//然后使用要代理的对象创建对应的 InvocationHandler
SimplePrintMethodInvocationHandler simplePrintMethodInvocationHandler = new SimplePrintMethodInvocationHandler(testServiceImpl);
//创建代理类,因为一个类可能实现多个接口,所以这里返回的是 Object,用户根据自己需要强制转换成要用的接口
Object proxyInstance = Proxy.newProxyInstance(
TestService.class.getClassLoader(),
testServiceImpl.getClass().getInterfaces(),
simplePrintMethodInvocationHandler
);
//强制转换
TestService proxied = (TestService) proxyInstance;
//使用代理对象进行调用
proxied.test();
这样,我们就使用了 JDK 的内置动态代理机制实现了一个简单的动态代理。在 OpenFeign 的使用中,和我们的示例有一点区别。首先,我们只需要定义要代理的接口,不用定义实现类。因为所有的 OpenFeign 接口要做的事情其实都是 HTTP 调用,其信息可以自动从接口定义中生成,我们可以使用统一的对象根据接口定义,承载 OpenFeign 接口定义的请求。在 OpenFeign 中,这个等同于实现对象的,就是根据接口生成的 MethodHandler,在同步的 OpenFeign 中,即 feign.SynchronousMethodHandler
。之后,OpenFeign 创建的 InvocationHandler,其实就是将调用转发到对应的 SynchronousMethodHandler
的对应方法。
创建 OpenFeign 代理对象的流程详解
我们使用前面的例子,来看一下创建代理的流程:
interface GitHub {
/**
* 定义get方法,包括路径参数,响应返回序列化类
* @param owner
* @param repository
* @return
*/
@RequestLine("GET /repos/{owner}/{repo}/contributors")
List<Contributor> contributors(@Param("owner") String owner, @Param("repo") String repository);
/**
* 响应体结构类
*/
class Contributor {
String login;
int contributions;
public Contributor() {
}
public String getLogin() {
return login;
}
public void setLogin(String login) {
this.login = login;
}
public int getContributions() {
return contributions;
}
public void setContributions(int contributions) {
this.contributions = contributions;
}
}
}
/**
* 基于 FastJson 的反序列化解码器
*/
static class FastJsonDecoder implements Decoder {
@Override
public Object decode(Response response, Type type) throws IOException, DecodeException, FeignException {
//读取 body
byte[] body = response.body().asInputStream().readAllBytes();
return JSON.parseObject(body, type);
}
}
public static void main(String[] args) {
//创建 Feign 代理的 HTTP 调用接口实现
GitHub github = Feign.builder()
//指定解码器为 FastJsonDecoder
.decoder(new FastJsonDecoder())
//指定代理类为 GitHub,基址为 https://api.github.com
.target(GitHub.class, "https://api.github.com");
List<GitHub.Contributor> contributors = github.contributors("OpenFeign", "feign");
}
我们这里关心的其实就是创建 Feign 代理的 HTTP 调用接口实现这一步的内部流程。首先我们来看 Feign 的 Builder 的结构,当我们初始化一个 Feign 的 Builder 也就是调用 Feign.builder()
时,会创建如下组件(同时也说明以下组件都是可以配置的,如果一些配置之前没有提到,则可以):
//请求拦截器列表,默认为空
private final List<RequestInterceptor> requestInterceptors = new ArrayList();
//日志级别,默认不打印任何日志
private Level logLevel = Level.NONE;
//负责解析类元数据的 Contract,默认是支持 OpenFeign 内置注解的默认 Contract
private Contract contract = new Contract.Default();
//承载 HTTP 请求的 Client,默认是基于 Java HttpURLConnection 的 Default Client
private Client client = new feign.Client.Default((SSLSocketFactory)null, (HostnameVerifier)null);
//重试器,默认也是 Default
private Retryer retryer = new feign.Retryer.Default();
//默认的日志 Logger,默认不记录任何日志
private Logger logger = new NoOpLogger();
//编码器解码器也是默认的
private Encoder encoder = new feign.codec.Encoder.Default();
private Decoder decoder = new feign.codec.Decoder.Default();
//查询参数编码,这个我们一般不会修改
private QueryMapEncoder queryMapEncoder = new FieldQueryMapEncoder();
//错误编码器,默认为 Default
private ErrorDecoder errorDecoder = new feign.codec.ErrorDecoder.Default();
//各种超时的 Options 走的默认配置
private Options options = new Options();
//用来生成 InvocationHandler 的 Factory 也是默认的
private InvocationHandlerFactory invocationHandlerFactory = new feign.InvocationHandlerFactory.Default();
//是否特殊解析 404 错误,因为针对 404 我们可能不想抛出异常,默认是 false
private boolean decode404 = false;
//是否在解码后立刻关闭 Response,默认为是
private boolean closeAfterDecode = true;
//异常传播规则,默认是不传播
private ExceptionPropagationPolicy propagationPolicy = ExceptionPropagationPolicy.NONE;
//是否强制解码,这个主要为了兼容异步 Feign 引入的配置,我们直接忽略,认为他就是 false 即可
private boolean forceDecoding = false;
private List<Capability> capabilities = new ArrayList();
我们的代码中指定了指定解码器为 FastJsonDecoder,所以 Decoder 就是 FastJsonDecoder 了。最后通过 target(GitHub.class, "https://api.github.com");
指定定代理类为 GitHub,基址为 https://api.github.com,这时候就会生成 Feign 代理类,其步骤是:
public <T> T target(Class<T> apiType, String url) {
//使用代理接口类型,以及基址创建 HardCodedTarget,他的意思其实就是硬编码的 Target
return target(new HardCodedTarget<T>(apiType, url));
}
public <T> T target(Target<T> target) {
return build().newInstance(target);
}
public Feign build() {
//将所有组件经过所有的 Capability,从这里我们可以看出,我们可以实现 Capability 接口来在创建 Feign 代理的时候动态修改组件
Client client = Capability.enrich(this.client, capabilities);
Retryer retryer = Capability.enrich(this.retryer, capabilities);
List<RequestInterceptor> requestInterceptors = this.requestInterceptors.stream()
.map(ri -> Capability.enrich(ri, capabilities))
.collect(Collectors.toList());
Logger logger = Capability.enrich(this.logger, capabilities);
Contract contract = Capability.enrich(this.contract, capabilities);
Options options = Capability.enrich(this.options, capabilities);
Encoder encoder = Capability.enrich(this.encoder, capabilities);
Decoder decoder = Capability.enrich(this.decoder, capabilities);
InvocationHandlerFactory invocationHandlerFactory =
Capability.enrich(this.invocationHandlerFactory, capabilities);
QueryMapEncoder queryMapEncoder = Capability.enrich(this.queryMapEncoder, capabilities);
//创建 SynchronousMethodHandler 的 Factory,用于生成 SynchronousMethodHandler,SynchronousMethodHandler 是实际承载 Feign 代理请求的实现类
SynchronousMethodHandler.Factory synchronousMethodHandlerFactory =
new SynchronousMethodHandler.Factory(client, retryer, requestInterceptors, logger,
logLevel, decode404, closeAfterDecode, propagationPolicy, forceDecoding);
//通过方法名称来区分不同接口方法的元数据解析,用于生成并路由到对应的代理方法
ParseHandlersByName handlersByName =
new ParseHandlersByName(contract, options, encoder, decoder, queryMapEncoder,
errorDecoder, synchronousMethodHandlerFactory);
//创建 ReflectiveFeign
return new ReflectiveFeign(handlersByName, invocationHandlerFactory, queryMapEncoder);
}
}
创建 ReflectiveFeign 之后,会调用其中的 newInstance 方法:
public <T> T newInstance(Target<T> target) {
//使用前面提到的 ParseHandlersByName 解析元数据并生成所有需要代理的方法的 MethodHandler,我们这里分析的是同步 Feign,所以是 SynchronousMethodHandler
Map<String, MethodHandler> nameToHandler = targetToHandlersByName.apply(target);
Map<Method, MethodHandler> methodToHandler = new LinkedHashMap<Method, MethodHandler>();
List<DefaultMethodHandler> defaultMethodHandlers = new LinkedList<DefaultMethodHandler>();
//将方法与对应的 MethodHandler 一一对应
for (Method method : target.type().getMethods()) {
if (method.getDeclaringClass() == Object.class) {
//对于 Object 的方法,直接跳过
continue;
} else if (Util.isDefault(method)) {
//如果是 java 8 中接口的默认方法,就使用 DefaultMethodHandler 处理
DefaultMethodHandler handler = new DefaultMethodHandler(method);
defaultMethodHandlers.add(handler);
methodToHandler.put(method, handler);
} else {
methodToHandler.put(method, nameToHandler.get(Feign.configKey(target.type(), method)));
}
}
//使用前面 Builder 中的 InvocationHandlerFactory 创建 InvocationHandler
InvocationHandler handler = factory.create(target, methodToHandler);
//使用 InvocationHandler 创建 Proxy
T proxy = (T) Proxy.newProxyInstance(target.type().getClassLoader(),
new Class<?>[] {target.type()}, handler);
//将代理与 DefaultMethodHandler 关联
for (DefaultMethodHandler defaultMethodHandler : defaultMethodHandlers) {
defaultMethodHandler.bindTo(proxy);
}
return proxy;
}
对于使用前面提到的 ParseHandlersByName 解析元数据并生成所有需要代理的方法的 MethodHandler 这一步,主要就涉及到了使用 Contract 解析出方法的元数据,然后将这些元数据用对应的编码器绑定用于之后调用的编码:
public Map<String, MethodHandler> apply(Target target) {
// 使用 Contract 解析出所有方法的元数据
List<MethodMetadata> metadata = contract.parseAndValidateMetadata(target.type());
Map<String, MethodHandler> result = new LinkedHashMap<String, MethodHandler>();
// 对于每个解析出的方法元数据
for (MethodMetadata md : metadata) {
BuildTemplateByResolvingArgs buildTemplate;
if (!md.formParams().isEmpty() && md.template().bodyTemplate() == null) {
//有表单的情况
buildTemplate =
new BuildFormEncodedTemplateFromArgs(md, encoder, queryMapEncoder, target);
} else if (md.bodyIndex() != null) {
//有 body 的情况
buildTemplate = new BuildEncodedTemplateFromArgs(md, encoder, queryMapEncoder, target);
} else {
//其他情况
buildTemplate = new BuildTemplateByResolvingArgs(md, queryMapEncoder, target);
}
if (md.isIgnored()) {
result.put(md.configKey(), args -> {
throw new IllegalStateException(md.configKey() + " is not a method handled by feign");
});
} else {
// 使用 SynchronousMethodHandler 的 Factory 生成 SynchronousMethodHandler
result.put(md.configKey(),
factory.create(target, md, buildTemplate, options, decoder, errorDecoder));
}
}
return result;
}
}
默认的 InvocationHandlerFactory 生成的 InvocationHandler 是 ReflectiveFeign.FeignInvocationHandler:
static final class Default implements InvocationHandlerFactory {
@Override
public InvocationHandler create(Target target, Map<Method, MethodHandler> dispatch) {
return new ReflectiveFeign.FeignInvocationHandler(target, dispatch);
}
}
其中的内容是:
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
// 对于 equals,hashCode,toString 方法直接调用
if ("equals".equals(method.getName())) {
try {
Object otherHandler =
args.length > 0 && args[0] != null ? Proxy.getInvocationHandler(args[0]) : null;
return equals(otherHandler);
} catch (IllegalArgumentException e) {
return false;
}
} else if ("hashCode".equals(method.getName())) {
return hashCode();
} else if ("toString".equals(method.getName())) {
return toString();
}
//对于其他方法,调用对应的 SynchronousMethodHandler 进行处理
return dispatch.get(method).invoke(args);
}
从这里我们就可以看出,我们生成的 Proxy,其实就是将请求代理到了 SynchronousMethodHandler 上。
我们这一节详细介绍了 OpenFeign 创建代理的详细流程,可以看出,对于同步 Feign 生成的 Proxy,其实就是将接口 HTTP 请求定义的方法请求代理到了 SynchronousMethodHandler 上。下一节我们会详细分析 SynchronousMethodHandler 做实际 HTTP 调用的流程,来搞清楚 Feign 所有组件是如何协调工作的。
微信搜索“我的编程喵”关注公众号,每日一刷,轻松提升技术,斩获各种offer:
SpringCloud升级之路2020.0.x版-41. SpringCloudGateway 基本流程讲解
我们继续分析上一节提到的 WebHandler
。加入 Spring Cloud Sleuth 以及 Prometheus 相关依赖之后, Spring Cloud Gateway 的处理流程如下所示:
Spring Cloud Gateway 入口 -> WebFlux 的 DefaultWebFilterChain
Spring Cloud Gateway 是基于 Spring WebFlux 开发的异步响应式网关,异步响应式代码比较难以理解和阅读,我这里给大家分享一种方法去理解,通过这个流程来理解 Spring Cloud Gateway 的工作流程以及底层原理。其实可以理解为,上图这个流程,就是拼出来一个完整的 Mono(或者 Flux)流,最后 subscribe 执行。
当收到一个请求的时候,会经过 org.springframework.web.server.handler.DefaultWebFilterChain
,这是 WebFilter 的调用链,这个链路包括三个 WebFilter:
org.springframework.boot.actuate.metrics.web.reactive.server.MetricsWebFilter
:添加 Prometheus 相关依赖之后,会有这个 MetricsWebFilter,用于记录请求处理耗时,采集相关指标。org.springframework.cloud.sleuth.instrument.web.TraceWebFilter
:添加 Spring Cloud Sleuth 相关依赖之后,会有这个 TraceWebFilter。org.springframework.cloud.gateway.handler.predicate.WeightCalculatorWebFilter
:Spring Cloud Gateway 路由权重相关配置功能相关实现类,这个我们这里不关心。
在这个 DefaultWebFilterChain
会形成这样一个 Mono,我们依次将他们标记出来,首先是入口代码 org.springframework.web.server.handler.DefaultWebFilterChain#filter
:
public Mono<Void> filter(ServerWebExchange exchange) {
return Mono.defer(() ->
// this.currentFilter != null 代表 WebFilter 链还没有结束
// this.chain != null 代表 WebFilter 链不为空
this.currentFilter != null && this.chain != null ?
//在 WebFilter 链没有结束的情况下,调用 WebFilter
invokeFilter(this.currentFilter, this.chain, exchange) :
//在 WebFilter 结束的情况下,调用 handler
this.handler.handle(exchange));
}
对于我们这里的 WebFilter 链的第一个 MetricsWebFilter
,假设启用了对应的采集统计的话,这时候生成的 Mono 就是:
return Mono.defer(() ->
chain.filter(exchange).transformDeferred((call) -> {
long start = System.nanoTime();
return call
//成功时,记录响应时间
.doOnSuccess((done) -> MetricsWebFilter.this.onSuccess(exchange, start))
//失败时,记录响应时间和异常
.doOnError((cause) -> MetricsWebFilter.this.onError(exchange, start, cause));
});
);
这里为了方便,我们对代码做了简化,由于我们要将整个链路的所有 Mono 和 Flux 拼接在一起行程完整链路,所以原本是 MetricsWebFilter
中的 onSuccess(exchange, start)
方法,被改成了 MetricsWebFilter.this.onSuccess(exchange, start)
这种伪代码。
接着,根据DefaultWebFilterChain
的源码分析,chain.filter(exchange)
会继续 WebFilter 链路,到达下一个 WebFilter,即 TraceWebFilter
。经过 TraceWebFilter
,Mono 就会变成:
return Mono.defer(() ->
new MonoWebFilterTrace(source, chain.filter(exchange), TraceWebFilter.this.isTracePresent(), TraceWebFilter.this, TraceWebFilter.this.spanFromContextRetriever()).transformDeferred((call) -> {
//MetricsWebFilter 相关的处理,在前面的代码中给出了,这里省略
});
);
可以看出,在 TraceWebFilter
中,整个内部 Mono (chain.filter(exchange)
后续的结果)都被封装成了一个 MonoWebFilterTrace
,这也是保持链路追踪信息的关键实现。
继续 WebFilter 链路,经过最后一个 WebFilter WeightCalculatorWebFilter
; 这个 WebFilter 我们不关心,里面对路由权重做了一些计算操作,我们这里直接忽略即可。这样我们就走完了所有 WebFilter 链路,来到了最后的调用 DefaultWebFilterChain.this.handler
,这个 handler 就是 org.springframework.web.reactive.DispatcherHandler
。在 DispatcherHandler 中,我们会计算出路由并发送请求到符合条件的 GatewayFilter。经过 DispatcherHandler,Mono 会变成:
return Mono.defer(() ->
new MonoWebFilterTrace(source,
Flux.fromIterable(DispatcherHandler.this.handlerMappings) //读取所有的 handlerMappings
.concatMap(mapping -> mapping.getHandler(exchange)) //按顺序调用所有的 handlerMappings 的 getHandler 方法,如果有对应的 Handler 会返回,否则返回 Mono.empty();
.next() //找到第一个返回不是 Mono.empty() 的 Handler
.switchIfEmpty(DispatcherHandler.this.createNotFoundError()) //如果没有返回不为 Mono.empty() 的 handlerMapping,则直接返回 404
.flatMap(handler -> DispatcherHandler.this.invokeHandler(exchange, handler)) //调用对应的 Handler
.flatMap(result -> DispatcherHandler.this.handleResult(exchange, result)), //处理结果
TraceWebFilter.this.isTracePresent(), TraceWebFilter.this, TraceWebFilter.this.spanFromContextRetriever()).transformDeferred((call) -> {
//MetricsWebFilter 相关的处理,在前面的代码中给出了,这里省略
});
);
handlerMappings 包括:
org.springframework.boot.actuate.endpoint.web.reactive.WebFluxEndPointHandlerMapping
:由于我们项目中添加了 Actuator 相关依赖,所以这里有这个 HandlerMapping。Actuator 相关路径映射,不是我们这里关心的。但是可以看出,Actuator 相关路径优先于 Spring Cloud Gateway 配置路由org.springframework.boot.actuate.endpoint.web.reactive.ControllerEndpointHandlerMapping
:由于我们项目中添加了 Actuator 相关依赖,所以这里有这个 HandlerMapping。使用 @ControllerEndpoint 或者 @RestControllerEndpoint 注解标注的 Actuator 相关路径映射,不是我们这里关心的。org.springframework.web.reactive.function.server.support.RouterFunctionMapping
:在 Spring-WebFlux 中,你可以定义很多不同的 RouterFunction 来控制路径路由,但这也不是我们这里关心的。但是可以看出,自定义的 RouterFunction 会优先于 Spring Cloud Gateway 配置路由org.springframework.web.reactive.result.method.annotation.RequestMappingHandlerMapping
:针对 @RequestMapping 注解的路径的 HandlerMapping,不是我们这里关心的。但是可以看出,如果你在 Spring Cloud Gateway 中指定 RequestMapping 路径,会优先于 Spring Cloud Gateway 配置路由。org.springframework.cloud.gateway.handler.RoutePredicateHandlerMapping
:这个是 Spring Cloud Gateway 的 HandlerMapping,会读取 Spring Cloud Gateway 配置并生成路由。这个是我们这里要详细分析的。
其实这些 handlerMappings,我们这里肯定走的是 RoutePredicateHandlerMapping
的相关逻辑,所以我们的 Mono 又可以简化成:
return Mono.defer(() ->
new MonoWebFilterTrace(source,
RoutePredicateHandlerMapping.this.getHandler(exchange)
.switchIfEmpty(DispatcherHandler.this.createNotFoundError()) //如果没有返回不为 Mono.empty() 的 handlerMapping,则直接返回 404
.flatMap(handler -> DispatcherHandler.this.invokeHandler(exchange, handler)) //调用对应的 Handler
.flatMap(result -> DispatcherHandler.this.handleResult(exchange, result)), //处理结果
TraceWebFilter.this.isTracePresent(), TraceWebFilter.this, TraceWebFilter.this.spanFromContextRetriever()).transformDeferred((call) -> {
//MetricsWebFilter 相关的处理,在前面的代码中给出了,这里省略
});
);
我们来看 RoutePredicateHandlerMapping
,首先这些 handlerMapping 都是继承了抽象类 org.springframework.web.reactive.handler.AbstractHandlerMapping
, 前面我们拼接的 Mono 里面的 getHandler 的实现其实就在这个抽象类中:
public Mono<Object> getHandler(ServerWebExchange exchange) {
//调用抽象方法 getHandlerInternal 获取真正的 Handler
return getHandlerInternal(exchange).map(handler -> {
//这里针对 handler 做一些日志记录
if (logger.isDebugEnabled()) {
logger.debug(exchange.getLogPrefix() + "Mapped to " + handler);
}
// 跨域处理
ServerHttpRequest request = exchange.getRequest();
if (hasCorsConfigurationSource(handler) || CorsUtils.isPreFlightRequest(request)) {
CorsConfiguration config = (this.corsConfigurationSource != null ?
this.corsConfigurationSource.getCorsConfiguration(exchange) : null);
CorsConfiguration handlerConfig = getCorsConfiguration(handler, exchange);
config = (config != null ? config.combine(handlerConfig) : handlerConfig);
if (config != null) {
config.validateAllowCredentials();
}
if (!this.corsProcessor.process(config, exchange) || CorsUtils.isPreFlightRequest(request)) {
return NO_OP_HANDLER;
}
}
return handler;
});
}
可以看出,其实核心就是每个实现类的 getHandlerInternal(exchange)
方法,所以在我们拼接的 Mono 中,我们会忽略抽象类中的针对 handler 之后的 map 处理。
return Mono.defer(() ->
new MonoWebFilterTrace(source,
RoutePredicateHandlerMapping.this.getHandlerInternal(exchange)
.switchIfEmpty(DispatcherHandler.this.createNotFoundError()) //如果没有返回不为 Mono.empty() 的 handlerMapping,则直接返回 404
.flatMap(handler -> DispatcherHandler.this.invokeHandler(exchange, handler)) //调用对应的 Handler
.flatMap(result -> DispatcherHandler.this.handleResult(exchange, result)), //处理结果
TraceWebFilter.this.isTracePresent(), TraceWebFilter.this, TraceWebFilter.this.spanFromContextRetriever()).transformDeferred((call) -> {
//MetricsWebFilter 相关的处理,在前面的代码中给出了,这里省略
});
);
接下来经过 RoutePredicateHandlerMapping
的 getHandlerInternal(exchange)
方法,我们的 Mono 变成了:
return Mono.defer(() ->
new MonoWebFilterTrace(source,
RoutePredicateHandlerMapping.this.lookupRoute(exchange) //根据请求寻找路由
.flatMap((Function<Route, Mono<?>>) r -> {
exchange.getAttributes().put(GATEWAY_ROUTE_ATTR, r); //将路由放入 Attributes 中,后面我们还会用到
return Mono.just(RoutePredicateHandlerMapping.this.webHandler); //返回 RoutePredicateHandlerMapping 的 FilteringWebHandler
}).switchIfEmpty( //如果为 Mono.empty(),也就是没找到路由
Mono.empty() //返回 Mono.empty()
.then(Mono.fromRunnable(() -> { //返回 Mono.empty() 之后,记录日志
if (logger.isTraceEnabled()) {
logger.trace("No RouteDefinition found for [" + getExchangeDesc(exchange) + "]");
}
})))
.switchIfEmpty(DispatcherHandler.this.createNotFoundError()) //如果没有返回不为 Mono.empty() 的 handlerMapping,则直接返回 404
.flatMap(handler -> DispatcherHandler.this.invokeHandler(exchange, handler)) //调用对应的 Handler
.flatMap(result -> DispatcherHandler.this.handleResult(exchange, result)), //处理结果
TraceWebFilter.this.isTracePresent(), TraceWebFilter.this, TraceWebFilter.this.spanFromContextRetriever()).transformDeferred((call) -> {
//MetricsWebFilter 相关的处理,在前面的代码中给出了,这里省略
});
);
RoutePredicateHandlerMapping.this.lookupRoute(exchange)
根据请求寻找路由,这个我们就不详细展开了,其实就是根据你的 Spring Cloud Gateway 配置,找到合适的路由。接下来我们来看调用对应的 Handler,即 FilteringWebHandler。DispatcherHandler.this.invokeHandler(exchange, handler)
我们这里也不详细展开,我们知道其实就是调用 Handler 的 handle 方法,即 FilteringWebHandler 的 handle 方法,所以 我们的 Mono 变成了:
return Mono.defer(() ->
new MonoWebFilterTrace(source,
RoutePredicateHandlerMapping.this.lookupRoute(exchange) //根据请求寻找路由
.flatMap((Function<Route, Mono<?>>) r -> {
exchange.getAttributes().put(GATEWAY_ROUTE_ATTR, r); //将路由放入 Attributes 中,后面我们还会用到
return Mono.just(RoutePredicateHandlerMapping.this.webHandler); //返回 RoutePredicateHandlerMapping 的 FilteringWebHandler
}).switchIfEmpty( //如果为 Mono.empty(),也就是没找到路由
Mono.empty()
.then(Mono.fromRunnable(() -> { //返回 Mono.empty() 之后,记录日志
if (logger.isTraceEnabled()) {
logger.trace("No RouteDefinition found for [" + getExchangeDesc(exchange) + "]");
}
})))
.switchIfEmpty(DispatcherHandler.this.createNotFoundError()) //如果没有返回不为 Mono.empty() 的 handlerMapping,则直接返回 404
.then(FilteringWebHandler.this.handle(exchange).then(Mono.empty())) //调用对应的 Handler
.flatMap(result -> DispatcherHandler.this.handleResult(exchange, result)), //处理结果
TraceWebFilter.this.isTracePresent(), TraceWebFilter.this, TraceWebFilter.this.spanFromContextRetriever()).transformDeferred((call) -> {
//MetricsWebFilter 相关的处理,在前面的代码中给出了,这里省略
});
);
由于调用对应的 Handler,最后返回的是 Mono.empty()
,所以后面的 flatMap 其实不会执行了。所以我们可以将最后的处理结果这一步去掉。所以我们的 Mono 就变成了:
return Mono.defer(() ->
new MonoWebFilterTrace(source,
RoutePredicateHandlerMapping.this.lookupRoute(exchange) //根据请求寻找路由
.flatMap((Function<Route, Mono<?>>) r -> {
exchange.getAttributes().put(GATEWAY_ROUTE_ATTR, r); //将路由放入 Attributes 中,后面我们还会用到
return Mono.just(RoutePredicateHandlerMapping.this.webHandler); //返回 RoutePredicateHandlerMapping 的 FilteringWebHandler
}).switchIfEmpty( //如果为 Mono.empty(),也就是没找到路由
Mono.empty()
.then(Mono.fromRunnable(() -> { //返回 Mono.empty() 之后,记录日志
if (logger.isTraceEnabled()) {
logger.trace("No RouteDefinition found for [" + getExchangeDesc(exchange) + "]");
}
})))
.switchIfEmpty(DispatcherHandler.this.createNotFoundError()) //如果没有返回不为 Mono.empty() 的 handlerMapping,则直接返回 404
.then(FilteringWebHandler.this.handle(exchange).then(Mono.empty()))), //调用对应的 Handler
TraceWebFilter.this.isTracePresent(), TraceWebFilter.this, TraceWebFilter.this.spanFromContextRetriever()).transformDeferred((call) -> {
//MetricsWebFilter 相关的处理,在前面的代码中给出了,这里省略
});
);
FilteringWebHandler.this.handle(exchange)
其实就是从 Attributes 中取出路由,从路由中取出对应的 GatewayFilters,与全局 GatewayFilters 放到同一个 List 中,并按照这些 GatewayFilter 的顺序排序(可以通过实现 org.springframework.core.Ordered
接口来制定顺序),然后生成 DefaultGatewayFilterChain
即 GatewayFilter 链路。对应的源码是:
public Mono<Void> handle(ServerWebExchange exchange) {
//从 Attributes 中取出路由,从路由中取出对应的 GatewayFilters
Route route = exchange.getRequiredAttribute(GATEWAY_ROUTE_ATTR);
List<GatewayFilter> gatewayFilters = route.getFilters();
//与全局 GatewayFilters 放到同一个 List 中
List<GatewayFilter> combined = new ArrayList<>(this.globalFilters);
combined.addAll(gatewayFilters);
//按照这些 GatewayFilter 的顺序排序(可以通过实现 `org.springframework.core.Ordered` 接口来制定顺序)
AnnotationAwareOrderComparator.sort(combined);
if (logger.isDebugEnabled()) {
logger.debug("Sorted gatewayFilterFactories: " + combined);
}
//生成调用链
return new DefaultGatewayFilterChain(combined).filter(exchange);
}
这个 GatewayFilter 调用链和 WebFilter 调用链类似,参考 DefaultGatewayFilterChain
的源码:
public Mono<Void> filter(ServerWebExchange exchange) {
return Mono.defer(() -> {
//如果链路没有结束,则继续链路
if (this.index < filters.size()) {
GatewayFilter filter = filters.get(this.index);
//这里将 index + 1,也就是调用链路中的下一个 GatewayFilter
DefaultGatewayFilterChain chain = new DefaultGatewayFilterChain(this, this.index + 1);
//每个 filter 中如果想要继续链路,则会调用 chain.filter(exchange),这也是我们开发 GatewayFilter 的时候的使用方式
return filter.filter(exchange, chain);
}
else {
//到达末尾,链路结束
return Mono.empty(); // complete
}
});
}
所以,经过 DefaultGatewayFilterChain
后,我们的 Mono 就会变成:
return Mono.defer(() ->
new MonoWebFilterTrace(source,
RoutePredicateHandlerMapping.this.lookupRoute(exchange) //根据请求寻找路由
.flatMap((Function<Route, Mono<?>>) r -> {
exchange.getAttributes().put(GATEWAY_ROUTE_ATTR, r); //将路由放入 Attributes 中,后面我们还会用到
return Mono.just(RoutePredicateHandlerMapping.this.webHandler); //返回 RoutePredicateHandlerMapping 的 FilteringWebHandler
}).switchIfEmpty( //如果为 Mono.empty(),也就是没找到路由
Mono.empty()
.then(Mono.fromRunnable(() -> { //返回 Mono.empty() 之后,记录日志
if (logger.isTraceEnabled()) {
logger.trace("No RouteDefinition found for [" + getExchangeDesc(exchange) + "]");
}
})))
.switchIfEmpty(DispatcherHandler.this.createNotFoundError()) //如果没有返回不为 Mono.empty() 的 handlerMapping,则直接返回 404
.then(new DefaultGatewayFilterChain(combined).filter(exchange).then(Mono.empty()))), //调用对应的 Handler
TraceWebFilter.this.isTracePresent(), TraceWebFilter.this, TraceWebFilter.this.spanFromContextRetriever()).transformDeferred((call) -> {
//MetricsWebFilter 相关的处理,在前面的代码中给出了,这里省略
});
);
再继续展开 DefaultGatewayFilterChain
的链路调用,可以得到:
return Mono.defer(() ->
new MonoWebFilterTrace(source,
RoutePredicateHandlerMapping.this.lookupRoute(exchange) //根据请求寻找路由
.flatMap((Function<Route, Mono<?>>) r -> {
exchange.getAttributes().put(GATEWAY_ROUTE_ATTR, r); //将路由放入 Attributes 中,后面我们还会用到
return Mono.just(RoutePredicateHandlerMapping.this.webHandler); //返回 RoutePredicateHandlerMapping 的 FilteringWebHandler
}).switchIfEmpty( //如果为 Mono.empty(),也就是没找到路由
Mono.empty()
.then(Mono.fromRunnable(() -> { //返回 Mono.empty() 之后,记录日志
if (logger.isTraceEnabled()) {
logger.trace("No RouteDefinition found for [" + getExchangeDesc(exchange) + "]");
}
})))
.switchIfEmpty(DispatcherHandler.this.createNotFoundError()) //如果没有返回不为 Mono.empty() 的 handlerMapping,则直接返回 404
.then(
Mono.defer(() -> {
//如果链路没有结束,则继续链路
if (DefaultGatewayFilterChain.this.index < DefaultGatewayFilterChain.this.filters.size()) {
GatewayFilter filter = DefaultGatewayFilterChain.this.filters.get(DefaultGatewayFilterChain.this.index);
//这里将 index + 1,也就是调用链路中的下一个 GatewayFilter
DefaultGatewayFilterChain chain = new DefaultGatewayFilterChain(DefaultGatewayFilterChain.this, DefaultGatewayFilterChain.this.index + 1);
//每个 filter 中如果想要继续链路,则会调用 chain.filter(exchange),这也是我们开发 GatewayFilter 的时候的使用方式
return filter.filter(exchange, chain);
}
else {
return Mono.empty(); //链路完成
}
})
.then(Mono.empty()))
), //调用对应的 Handler
TraceWebFilter.this.isTracePresent(), TraceWebFilter.this, TraceWebFilter.this.spanFromContextRetriever()).transformDeferred((call) -> {
//MetricsWebFilter 相关的处理,在前面的代码中给出了,这里省略
});
);
这样,就形成了 Spring Cloud Gateway 针对路由请求的完整 Mono 调用链。
微信搜索“我的编程喵”关注公众号,每日一刷,轻松提升技术,斩获各种offer:
以上是关于SpringCloud升级之路2020.0.x版-27.OpenFeign的生命周期-创建代理的主要内容,如果未能解决你的问题,请参考以下文章
SpringCloud升级之路2020.0.x版-7.从Bean到SpringCloud
SpringCloud升级之路2020.0.x版-44.避免链路信息丢失做的设计
SpringCloud升级之路2020.0.x版-41. SpringCloudGateway 基本流程讲解
SpringCloud升级之路2020.0.x版-22.Spring Cloud LoadBalan