Spring Cloud Gateway 没有链路信息,我 TM 人傻了(中)
Posted 干货满满张哈希
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spring Cloud Gateway 没有链路信息,我 TM 人傻了(中)相关的知识,希望对你有一定的参考价值。
本系列是 我TM人傻了 系列第五期[捂脸],往期精彩回顾:
本篇文章涉及底层设计以及原理,以及问题定位和可能的问题点,非常深入,篇幅较长,所以拆分成上中下三篇:
- 上:问题简单描述以及 Spring Cloud Gateway 基本结构和流程以及底层原理
- 中:Spring Cloud Sleuth 如何在 Spring Cloud Gateway 加入的链路追踪以及为何会出现这个问题
- 下:现有 Spring Cloud Sleuth 的非侵入设计带来的性能问题,其他可能的问题点,以及如何解决
Spring Cloud Sleuth 是如何增加链路信息
通过之前的源码分析,我们知道,在最开始的 TraceWebFilter,我们将 Mono 封装成了一个 MonoWebFilterTrace,它的核心源码是:
@Override
public void subscribe(CoreSubscriber<? super Void> subscriber) {
Context context = contextWithoutInitialSpan(subscriber.currentContext());
Span span = findOrCreateSpan(context);
//将 Span 放入执行上下文中,对于日志其实就是将链路信息放入 org.slf4j.MDC
//日志的 MDC 一般都是 ThreadLocal 的 Map,对于 Log4j2 的实现类就是 org.apache.logging.log4j.ThreadContext,其核心 contextMap 就是一个基于 ThreadLocal 实现的 Map
//简单理解就是将链路信息放入一个 ThreadLocal 的 Map 中,每个线程访问自己的 Map 获取链路信息
try (CurrentTraceContext.Scope scope = this.currentTraceContext.maybeScope(span.context())) {
//将实际的 subscribe 用 Span 所在的 Context 包裹住,结束时关闭 Span
this.source.subscribe(new WebFilterTraceSubscriber(subscriber, context, span, this));
}
//在 scope.close() 之后,会将链路信息从 ThreadLocal 的 Map 中剔除
}
@Override
public Object scanUnsafe(Attr key) {
if (key == Attr.RUN_STYLE) {
//执行的方式必须是不能切换线程,也就是同步的
//因为,日志的链路信息是放在 ThreadLocal 对象中,切换线程,链路信息就没了
return Attr.RunStyle.SYNC;
}
return super.scanUnsafe(key);
}
WebFilterTraceSubscriber 干了些什么呢?出现异常,以及 http 请求结束的时候,我们可能想将响应信息,异常信息记录进入 Span 中,就是通过这个类封装实现的。
经过 MonoWebFilterTrace 的封装,由于 Spring-WebFlux 处理请求,其实就是封装成我们上面得出的 Mono 之后进行 subscribe 处理的请求,所以这样,整个内部 Mono 的 publish 链路以及 subscribe 链路,就被 WebFilterTraceSubscriber 中的 scope 包裹起来了。只要我们自己不在 GatewayFilter 中转换成某些强制异步的 Mono 或者 Flux 导致切换线程,链路信息是不会丢失的。
我们应用中丢失链路信息的地方
通过查看日志我们发现,启用 RequestBody 缓存的地方,都有链路缺失。这个 RequestBody 缓存我们使用的是 Spring Cloud Gateway 中的 AdaptCachedBodyGlobalFilter
,其核心源码是:
private static <T> Mono<T> cacheRequestBody(ServerWebExchange exchange, boolean cacheDecoratedRequest,
Function<ServerHttpRequest, Mono<T>> function) {
ServerHttpResponse response = exchange.getResponse();
NettyDataBufferFactory factory = (NettyDataBufferFactory) response.bufferFactory();
return
//读取 Body,由于 TCP 拆包,所以需要他们拼接到一起
DataBufferUtils.join(exchange.getRequest().getBody())
//如果没有 Body,则直接返回空 DataBuffer
.defaultIfEmpty(factory.wrap(new EmptyByteBuf(factory.getByteBufAllocator())))
//decorate方法中将 dataBuffer 放入 exchange 的 Attributes 列表,只是为了防止重复进入这个 `AdaptCachedBodyGlobalFilter` 的情况导致重复缓存请求 Body
//之后,使用新的 body 以及原始请求封装成新的请求,继续 GatewayFilters 链路
.map(dataBuffer -> decorate(exchange, dataBuffer, cacheDecoratedRequest))
.switchIfEmpty(Mono.just(exchange.getRequest())).flatMap(function);
}
为何会使用这个 AdaptCachedBodyGlobalFilter
呢?获取请求 Body 是通过 exchange.getRequest().getBody()
获取的,其结果是一个 Flux<DataBuffer>
.请求的 Body 是一次性的,如果你需要请求重试的话,在第一次调用失败的之后,第二次重试的时候,Body 就读取不到了,因为 Flux 已经结束。所以,对于需要重复调用,例如重试,一对多路由转发的情况,需要将请求 Body 缓存起来,就是经过这个 GatewayFilter。但是经过这个 GatewayFilter 之后,链路信息就没了,可以通过以下这个简单项目进行复现(项目地址):
引入依赖:
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.4.6</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-gateway</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-sleuth</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-log4j2</artifactId>
</dependency>
<!--log4j2异步日志需要的依赖,所有项目都必须用log4j2和异步日志配置-->
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>${disruptor.version}</version>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>2020.0.3</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
对所有路径开启 AdaptCachedBodyGlobalFilter
:
@Configuration(proxyBeanMethods = false)
public class ApiGatewayConfiguration {
@Autowired
private AdaptCachedBodyGlobalFilter adaptCachedBodyGlobalFilter;
@Autowired
private GatewayProperties gatewayProperties;
@PostConstruct
public void init() {
gatewayProperties.getRoutes().forEach(routeDefinition -> {
//对 spring cloud gateway 路由配置中的每个路由都启用 AdaptCachedBodyGlobalFilter
EnableBodyCachingEvent enableBodyCachingEvent = new EnableBodyCachingEvent(new Object(), routeDefinition.getId());
adaptCachedBodyGlobalFilter.onApplicationEvent(enableBodyCachingEvent);
});
}
}
配置(我们只有一个路由,将请求转发到 httpbin.org 这个 http 请求测试网站):
server:
port: 8181
spring:
application:
name: apiGateway
cloud:
gateway:
httpclient:
connect-timeout: 500
response-timeout: 60000
routes:
- id: first_route
uri: http://httpbin.org
predicates:
- Path=/httpbin/**
filters:
- StripPrefix=1
添加两个全局 Filter,一个在 AdaptCachedBodyGlobalFilter
之前,一个在 AdaptCachedBodyGlobalFilter
之后。这两个 Filter 非常简单,只是打一行日志。
@Log4j2
@Component
public class PreLogFilter implements GlobalFilter, Ordered {
public static final int ORDER = new AdaptCachedBodyGlobalFilter().getOrder() - 1;
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
log.info("before AdaptCachedBodyGlobalFilter");
return chain.filter(exchange);
}
@Override
public int getOrder() {
return ORDER;
}
}
@Log4j2
@Component
public class PostLogFilter implements GlobalFilter, Ordered {
public static final int ORDER = new AdaptCachedBodyGlobalFilter().getOrder() + 1;
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
log.info("after AdaptCachedBodyGlobalFilter");
return chain.filter(exchange);
}
@Override
public int getOrder() {
return ORDER;
}
}
最后指定 Log4j2 的输出格式中包含链路信息,就像系列文章开头中指定的那样。
启动这个应用,之后访问 http://127.0.0.1:8181/httpbin/anything
,查看日志,发现 PostLogFilter 中的日志,没有链路信息了:
2021-09-08 06:32:35.457 INFO [service-apiGateway,51063d6f1fe264d0,51063d6f1fe264d0] [30600] [reactor-http-nio-2][?:]: before AdaptCachedBodyGlobalFilter
2021-09-08 06:32:35.474 INFO [service-apiGateway,,] [30600] [reactor-http-nio-2][?:]: after AdaptCachedBodyGlobalFilter
为何链路信息会丢失
我们来看经过 AdaptCachedBodyGlobalFilter 之后,我们前面拼的 Mono 链路会变成什么样:
return Mono.defer(() ->
new MonoWebFilterTrace(source,
RoutePredicateHandlerMapping.this.lookupRoute(exchange) //根据请求寻找路由
.flatMap((Function<Route, Mono<?>>) r -> {
exchange.getAttributes().put(GATEWAY_ROUTE_ATTR, r); //将路由放入 Attributes 中,后面我们还会用到
return Mono.just(RoutePredicateHandlerMapping.this.webHandler); //返回 RoutePredicateHandlerMapping 的 FilteringWebHandler
}).switchIfEmpty( //如果为 Mono.empty(),也就是没找到路由
Mono.empty()
.then(Mono.fromRunnable(() -> { //返回 Mono.empty() 之后,记录日志
if (logger.isTraceEnabled()) {
logger.trace("No RouteDefinition found for [" + getExchangeDesc(exchange) + "]");
}
})))
.switchIfEmpty(DispatcherHandler.this.createNotFoundError()) //如果没有返回不为 Mono.empty() 的 handlerMapping,则直接返回 404
.then(
Mono.defer(() -> {
//省略在 AdaptCachedBodyGlobalFilter 前面的链路嵌套
//读取 Body,由于 TCP 拆包,所以需要他们拼接到一起
DataBufferUtils.join(exchange.getRequest().getBody())
//如果没有 Body,则直接返回空 DataBuffer
.defaultIfEmpty(factory.wrap(new EmptyByteBuf(factory.getByteBufAllocator())))
//decorate方法中将 dataBuffer 放入 exchange 的 Attributes 列表,只是为了防止重复进入这个 `AdaptCachedBodyGlobalFilter` 的情况导致重复缓存请求 Body
//之后,使用新的 body 以及原始请求封装成新的请求,继续 GatewayFilters 链路
.map(dataBuffer -> decorate(exchange, dataBuffer, cacheDecoratedRequest))
.switchIfEmpty(Mono.just(exchange.getRequest())).flatMap(function);
})
.then(Mono.empty()))
), //调用对应的 Handler
TraceWebFilter.this.isTracePresent(), TraceWebFilter.this, TraceWebFilter.this.spanFromContextRetriever()).transformDeferred((call) -> {
//MetricsWebFilter 相关的处理,在前面的代码中给出了,这里省略
});
);
其中 DataBufferUtils.join(exchange.getRequest().getBody())
其实是一个 FluxReceive,这里我们可以理解为:提交一个尝试读取请求 Body 的任务,将之后的 GatewayFilter 的链路处理加到在读取完 Body 之后的回调当中,提交这个任务后,立刻返回。这么看可能比较复杂,我们用一个类似的例子类比下:
//首先我们创建一个新的 Span
Span span = tracer.newTrace();
//声明一个类似于 TraceWebFilter 中封装的 MonoWebFilterTrace 的 MonoOperator
class MonoWebFilterTrace<T> extends MonoOperator<T, T> {
protected MonoWebFilterTrace(Mono<? extends T> source) {
super(source);
}
@Override
public void subscribe(CoreSubscriber<? super T> actual) {
//将 subscribe 用 span 包裹
try (Tracer.SpanInScope spanInScope = tracer.withSpanInScope(span)) {
source.subscribe(actual);
//在将要关闭 spanInScope 的时候(即从 ThreadLocal 的 Map 中移除链路信息),打印日志
log.info("stopped");
}
}
}
Mono.defer(() -> new MonoWebFilterTrace(
Mono.fromRunnable(() -> {
log.info("first");
})
//模拟 FluxReceive
.then(Mono.delay(Duration.ofSeconds(1))
.doOnSuccess(longSignal -> log.info(longSignal))))
).subscribe(aLong -> log.info(aLong));
Mono.delay
和 FluxReceive 表现类似,都是异步切换线程池执行。执行上面的代码,我们可以从日志上面就能看出来:
2021-09-08 07:12:45.236 INFO [service-apiGateway,7b2f5c190e1406cb,7b2f5c190e1406cb] [31868] [reactor-http-nio-2][?:]: first
2021-09-08 07:12:45.240 INFO [service-apiGateway,7b2f5c190e1406cb,7b2f5c190e1406cb] [31868] [reactor-http-nio-2][?:]: stopped
2021-09-08 07:12:46.241 INFO [service-apiGateway,,] [31868] [parallel-1][?:]: doOnEach_onNext(0)
2021-09-08 07:12:46.242 INFO [service-apiGateway,,] [31868] [parallel-1][?:]: onComplete()
2021-09-08 07:12:46.242 INFO [service-apiGateway,,] [31868] [parallel-1][?:]: 0
在 Spring Cloud Gateway 中,Request Body 的 FluxReceive 使用的线程池和调用 GatewayFilter 的是同一个线程池,所以可能线程还是同一个,但是由于 Span 已经结束,从 ThreadLocal 的 Map 中已经移除了链路信息,所以日志中还是没有链路信息。
微信搜索“我的编程喵”关注公众号,每日一刷,轻松提升技术,斩获各种offer:
以上是关于Spring Cloud Gateway 没有链路信息,我 TM 人傻了(中)的主要内容,如果未能解决你的问题,请参考以下文章
Spring Cloud Gateway 没有链路信息,我 TM 人傻了(下)
Spring Cloud Gateway 没有链路信息,我 TM 人傻了(上)
Spring Cloud Gateway 没有链路信息,我 TM 人傻了(下)
Spring Cloud Gateway 没有链路信息,我 TM 人傻了(中)
Alibaba Sentinel对接Spring Cloud Gateway关于不显示API管理及请求链路的坑附带解决方案