浅谈skywalking的spring-webflux-plugin

Posted 李翰林

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了浅谈skywalking的spring-webflux-plugin相关的知识,希望对你有一定的参考价值。

本文参考原文-http://bjbsair.com/2020-03-22/tech-info/5100/

本文主要研究一下skywalking的spring-webflux-plugin

技术图片

DispatcherHandlerInstrumentation

skywalking-6.6.0/apm-sniffer/optional-plugins/optional-spring-plugins/spring-webflux-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/webflux/v5/define/DispatcherHandlerInstrumentation.java

public class DispatcherHandlerInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {  
 ?  @Override  
 ?  public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {  
 ? ? ?  return new ConstructorInterceptPoint[0];  
 ?  }  
?  
 ?  @Override  
 ?  public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {  
 ? ? ?  return new InstanceMethodsInterceptPoint[]{  
 ? ? ? ? ?  new InstanceMethodsInterceptPoint() {  
 ? ? ? ? ? ? ?  @Override  
 ? ? ? ? ? ? ?  public ElementMatcher<MethodDescription> getMethodsMatcher() {  
 ? ? ? ? ? ? ? ? ?  return named("handle");  
 ? ? ? ? ? ? ?  }  
?  
 ? ? ? ? ? ? ?  @Override  
 ? ? ? ? ? ? ?  public String getMethodsInterceptor() {  
 ? ? ? ? ? ? ? ? ?  return "org.apache.skywalking.apm.plugin.spring.webflux.v5.DispatcherHandlerHandleMethodInterceptor";  
 ? ? ? ? ? ? ?  }  
?  
 ? ? ? ? ? ? ?  @Override  
 ? ? ? ? ? ? ?  public boolean isOverrideArgs() {  
 ? ? ? ? ? ? ? ? ?  return false;  
 ? ? ? ? ? ? ?  }  
 ? ? ? ? ?  }  
 ? ? ?  };  
 ?  }  
?  
 ?  @Override  
 ?  protected ClassMatch enhanceClass() {  
 ? ? ?  return byName("org.springframework.web.reactive.DispatcherHandler");  
 ?  }  
}
  • DispatcherHandlerInstrumentation继承了ClassInstanceMethodsEnhancePluginDefine,其使用org.apache.skywalking.apm.plugin.spring.webflux.v5.DispatcherHandlerHandleMethodInterceptor增强org.springframework.web.reactive.DispatcherHandler的handle方法

DispatcherHandlerHandleMethodInterceptor

skywalking-6.6.0/apm-sniffer/optional-plugins/optional-spring-plugins/spring-webflux-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/webflux/v5/DispatcherHandlerHandleMethodInterceptor.java

public class DispatcherHandlerHandleMethodInterceptor implements InstanceMethodsAroundInterceptor {  
 ?  private static final String DEFAULT_OPERATION_NAME = "WEBFLUX.handle";  
?  
 ?  @Override  
 ?  public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,  
 ? ? ? ? ? ? ? ? ? ? ? ? ? ? MethodInterceptResult result) throws Throwable {  
?  
 ?  }  
?  
 ?  @Override  
 ?  public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,  
 ? ? ? ? ? ? ? ? ? ? ? ? ? ?  Object ret) throws Throwable {  
 ? ? ?  EnhancedInstance instance = getInstance(allArguments[0]);  
?  
 ? ? ?  ServerWebExchange exchange = (ServerWebExchange) allArguments[0];  
?  
 ? ? ?  ContextCarrier carrier = new ContextCarrier();  
 ? ? ?  CarrierItem next = carrier.items();  
 ? ? ?  HttpHeaders headers = exchange.getRequest().getHeaders();  
 ? ? ?  while (next.hasNext()) {  
 ? ? ? ? ?  next = next.next();  
 ? ? ? ? ?  List<String> header = headers.get(next.getHeadKey());  
 ? ? ? ? ?  if (header != null && header.size() > 0) {  
 ? ? ? ? ? ? ?  next.setHeadValue(header.get(0));  
 ? ? ? ? ?  }  
 ? ? ?  }  
?  
 ? ? ?  AbstractSpan span = ContextManager.createEntrySpan(DEFAULT_OPERATION_NAME, carrier);  
 ? ? ?  span.setComponent(ComponentsDefine.SPRING_WEBFLUX);  
 ? ? ?  SpanLayer.asHttp(span);  
 ? ? ?  Tags.URL.set(span, exchange.getRequest().getURI().toString());  
 ? ? ?  HTTP.METHOD.set(span, exchange.getRequest().getMethodValue());  
 ? ? ?  instance.setSkyWalkingDynamicField(ContextManager.capture());  
 ? ? ?  span.prepareForAsync();  
 ? ? ?  ContextManager.stopSpan(span);  
?  
 ? ? ?  return ((Mono) ret).doFinally(s -> {  
 ? ? ? ? ?  try {  
 ? ? ? ? ? ? ?  Object pathPattern = exchange.getAttribute(HandlerMapping.BEST_MATCHING_PATTERN_ATTRIBUTE);  
 ? ? ? ? ? ? ?  if (pathPattern != null) {  
 ? ? ? ? ? ? ? ? ?  span.setOperationName(((PathPattern) pathPattern).getPatternString());  
 ? ? ? ? ? ? ?  }  
 ? ? ? ? ? ? ?  HttpStatus httpStatus = exchange.getResponse().getStatusCode();  
 ? ? ? ? ? ? ?  // fix webflux-2.0.0-2.1.0 version have bug. httpStatus is null. not support  
 ? ? ? ? ? ? ?  if (httpStatus != null) {  
 ? ? ? ? ? ? ? ? ?  Tags.STATUS_CODE.set(span, Integer.toString(httpStatus.value()));  
 ? ? ? ? ? ? ? ? ?  if (httpStatus.isError()) {  
 ? ? ? ? ? ? ? ? ? ? ?  span.errorOccurred();  
 ? ? ? ? ? ? ? ? ?  }  
 ? ? ? ? ? ? ?  }  
 ? ? ? ? ?  } finally {  
 ? ? ? ? ? ? ?  span.asyncFinish();  
 ? ? ? ? ?  }  
 ? ? ?  });  
?  
 ?  }  
?  
 ?  @Override  
 ?  public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,  
 ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?  Class<?>[] argumentsTypes, Throwable t) {  
 ?  }  
?  
 ?  public static EnhancedInstance getInstance(Object o) {  
 ? ? ?  EnhancedInstance instance = null;  
 ? ? ?  if (o instanceof DefaultServerWebExchange) {  
 ? ? ? ? ?  instance = (EnhancedInstance) o;  
 ? ? ?  } else if (o instanceof ServerWebExchangeDecorator) {  
 ? ? ? ? ?  ServerWebExchange delegate = ((ServerWebExchangeDecorator) o).getDelegate();  
 ? ? ? ? ?  return getInstance(delegate);  
 ? ? ?  }  
 ? ? ?  return instance;  
 ?  }  
?  
}
  • DispatcherHandlerHandleMethodInterceptor实现了InstanceMethodsAroundInterceptor接口,其afterMethod方法创建AbstractSpan,设置URL及METHOD的tag,执行span.prepareForAsync(),然后注册Mono的doFinally的Consumer,在里头设置span的operationName,statusCode,以及是否有异常,最后执行span.asyncFinish()

ServerWebExchangeInstrumentation

skywalking-6.6.0/apm-sniffer/optional-plugins/optional-spring-plugins/spring-webflux-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/webflux/v5/define/ServerWebExchangeInstrumentation.java

public class ServerWebExchangeInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {  
 ?  @Override  
 ?  public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {  
 ? ? ?  return new ConstructorInterceptPoint[]{  
 ? ? ? ? ?  new ConstructorInterceptPoint() {  
 ? ? ? ? ? ? ?  @Override  
 ? ? ? ? ? ? ?  public ElementMatcher<MethodDescription> getConstructorMatcher() {  
 ? ? ? ? ? ? ? ? ?  return any();  
 ? ? ? ? ? ? ?  }  
?  
 ? ? ? ? ? ? ?  @Override  
 ? ? ? ? ? ? ?  public String getConstructorInterceptor() {  
 ? ? ? ? ? ? ? ? ?  return "org.apache.skywalking.apm.plugin.spring.webflux.v5.ServerWebExchangeConstructorInterceptor";  
 ? ? ? ? ? ? ?  }  
 ? ? ? ? ?  }  
 ? ? ?  };  
 ?  }  
?  
 ?  @Override  
 ?  public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {  
 ? ? ?  return new InstanceMethodsInterceptPoint[0];  
 ?  }  
?  
 ?  @Override  
 ?  protected ClassMatch enhanceClass() {  
 ? ? ?  return byName("org.springframework.web.server.adapter.DefaultServerWebExchange");  
 ?  }  
}
  • ServerWebExchangeInstrumentation继承了ClassInstanceMethodsEnhancePluginDefine,其使用org.apache.skywalking.apm.plugin.spring.webflux.v5.ServerWebExchangeConstructorInterceptor增强org.springframework.web.server.adapter.DefaultServerWebExchange的所有方法

ServerWebExchangeConstructorInterceptor

skywalking-6.6.0/apm-sniffer/optional-plugins/optional-spring-plugins/spring-webflux-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/webflux/v5/ServerWebExchangeConstructorInterceptor.java

public class ServerWebExchangeConstructorInterceptor implements InstanceConstructorInterceptor {  
 ?  @Override  
 ?  public void onConstruct(EnhancedInstance objInst, Object[] allArguments) {  
 ?  }  
}
  • ServerWebExchangeConstructorInterceptor目前还是空实现

小结

DispatcherHandlerInstrumentation继承了ClassInstanceMethodsEnhancePluginDefine,其使用org.apache.skywalking.apm.plugin.spring.webflux.v5.DispatcherHandlerHandleMethodInterceptor增强org.springframework.web.reactive.DispatcherHandler的handle方法;ServerWebExchangeInstrumentation继承了ClassInstanceMethodsEnhancePluginDefine,其使用org.apache.skywalking.apm.plugin.spring.webflux.v5.ServerWebExchangeConstructorInterceptor增强org.springframework.web.server.adapter.DefaultServerWebExchange的所有方法

doc

本文主要研究一下skywalking的spring-webflux-plugin

技术图片

DispatcherHandlerInstrumentation

skywalking-6.6.0/apm-sniffer/optional-plugins/optional-spring-plugins/spring-webflux-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/webflux/v5/define/DispatcherHandlerInstrumentation.java

public class DispatcherHandlerInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {  
 ?  @Override  
 ?  public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {  
 ? ? ?  return new ConstructorInterceptPoint[0];  
 ?  }  
?  
 ?  @Override  
 ?  public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {  
 ? ? ?  return new InstanceMethodsInterceptPoint[]{  
 ? ? ? ? ?  new InstanceMethodsInterceptPoint() {  
 ? ? ? ? ? ? ?  @Override  
 ? ? ? ? ? ? ?  public ElementMatcher<MethodDescription> getMethodsMatcher() {  
 ? ? ? ? ? ? ? ? ?  return named("handle");  
 ? ? ? ? ? ? ?  }  
?  
 ? ? ? ? ? ? ?  @Override  
 ? ? ? ? ? ? ?  public String getMethodsInterceptor() {  
 ? ? ? ? ? ? ? ? ?  return "org.apache.skywalking.apm.plugin.spring.webflux.v5.DispatcherHandlerHandleMethodInterceptor";  
 ? ? ? ? ? ? ?  }  
?  
 ? ? ? ? ? ? ?  @Override  
 ? ? ? ? ? ? ?  public boolean isOverrideArgs() {  
 ? ? ? ? ? ? ? ? ?  return false;  
 ? ? ? ? ? ? ?  }  
 ? ? ? ? ?  }  
 ? ? ?  };  
 ?  }  
?  
 ?  @Override  
 ?  protected ClassMatch enhanceClass() {  
 ? ? ?  return byName("org.springframework.web.reactive.DispatcherHandler");  
 ?  }  
}
  • DispatcherHandlerInstrumentation继承了ClassInstanceMethodsEnhancePluginDefine,其使用org.apache.skywalking.apm.plugin.spring.webflux.v5.DispatcherHandlerHandleMethodInterceptor增强org.springframework.web.reactive.DispatcherHandler的handle方法

DispatcherHandlerHandleMethodInterceptor

skywalking-6.6.0/apm-sniffer/optional-plugins/optional-spring-plugins/spring-webflux-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/webflux/v5/DispatcherHandlerHandleMethodInterceptor.java

public class DispatcherHandlerHandleMethodInterceptor implements InstanceMethodsAroundInterceptor {  
 ?  private static final String DEFAULT_OPERATION_NAME = "WEBFLUX.handle";  
?  
 ?  @Override  
 ?  public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,  
 ? ? ? ? ? ? ? ? ? ? ? ? ? ? MethodInterceptResult result) throws Throwable {  
?  
 ?  }  
?  
 ?  @Override  
 ?  public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,  
 ? ? ? ? ? ? ? ? ? ? ? ? ? ?  Object ret) throws Throwable {  
 ? ? ?  EnhancedInstance instance = getInstance(allArguments[0]);  
?  
 ? ? ?  ServerWebExchange exchange = (ServerWebExchange) allArguments[0];  
?  
 ? ? ?  ContextCarrier carrier = new ContextCarrier();  
 ? ? ?  CarrierItem next = carrier.items();  
 ? ? ?  HttpHeaders headers = exchange.getRequest().getHeaders();  
 ? ? ?  while (next.hasNext()) {  
 ? ? ? ? ?  next = next.next();  
 ? ? ? ? ?  List<String> header = headers.get(next.getHeadKey());  
 ? ? ? ? ?  if (header != null && header.size() > 0) {  
 ? ? ? ? ? ? ?  next.setHeadValue(header.get(0));  
 ? ? ? ? ?  }  
 ? ? ?  }  
?  
 ? ? ?  AbstractSpan span = ContextManager.createEntrySpan(DEFAULT_OPERATION_NAME, carrier);  
 ? ? ?  span.setComponent(ComponentsDefine.SPRING_WEBFLUX);  
 ? ? ?  SpanLayer.asHttp(span);  
 ? ? ?  Tags.URL.set(span, exchange.getRequest().getURI().toString());  
 ? ? ?  HTTP.METHOD.set(span, exchange.getRequest().getMethodValue());  
 ? ? ?  instance.setSkyWalkingDynamicField(ContextManager.capture());  
 ? ? ?  span.prepareForAsync();  
 ? ? ?  ContextManager.stopSpan(span);  
?  
 ? ? ?  return ((Mono) ret).doFinally(s -> {  
 ? ? ? ? ?  try {  
 ? ? ? ? ? ? ?  Object pathPattern = exchange.getAttribute(HandlerMapping.BEST_MATCHING_PATTERN_ATTRIBUTE);  
 ? ? ? ? ? ? ?  if (pathPattern != null) {  
 ? ? ? ? ? ? ? ? ?  span.setOperationName(((PathPattern) pathPattern).getPatternString());  
 ? ? ? ? ? ? ?  }  
 ? ? ? ? ? ? ?  HttpStatus httpStatus = exchange.getResponse().getStatusCode();  
 ? ? ? ? ? ? ?  // fix webflux-2.0.0-2.1.0 version have bug. httpStatus is null. not support  
 ? ? ? ? ? ? ?  if (httpStatus != null) {  
 ? ? ? ? ? ? ? ? ?  Tags.STATUS_CODE.set(span, Integer.toString(httpStatus.value()));  
 ? ? ? ? ? ? ? ? ?  if (httpStatus.isError()) {  
 ? ? ? ? ? ? ? ? ? ? ?  span.errorOccurred();  
 ? ? ? ? ? ? ? ? ?  }  
 ? ? ? ? ? ? ?  }  
 ? ? ? ? ?  } finally {  
 ? ? ? ? ? ? ?  span.asyncFinish();  
 ? ? ? ? ?  }  
 ? ? ?  });  
?  
 ?  }  
?  
 ?  @Override  
 ?  public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,  
 ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?  Class<?>[] argumentsTypes, Throwable t) {  
 ?  }  
?  
 ?  public static EnhancedInstance getInstance(Object o) {  
 ? ? ?  EnhancedInstance instance = null;  
 ? ? ?  if (o instanceof DefaultServerWebExchange) {  
 ? ? ? ? ?  instance = (EnhancedInstance) o;  
 ? ? ?  } else if (o instanceof ServerWebExchangeDecorator) {  
 ? ? ? ? ?  ServerWebExchange delegate = ((ServerWebExchangeDecorator) o).getDelegate();  
 ? ? ? ? ?  return getInstance(delegate);  
 ? ? ?  }  
 ? ? ?  return instance;  
 ?  }  
?  
}
  • DispatcherHandlerHandleMethodInterceptor实现了InstanceMethodsAroundInterceptor接口,其afterMethod方法创建AbstractSpan,设置URL及METHOD的tag,执行span.prepareForAsync(),然后注册Mono的doFinally的Consumer,在里头设置span的operationName,statusCode,以及是否有异常,最后执行span.asyncFinish()

ServerWebExchangeInstrumentation

skywalking-6.6.0/apm-sniffer/optional-plugins/optional-spring-plugins/spring-webflux-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/webflux/v5/define/ServerWebExchangeInstrumentation.java

public class ServerWebExchangeInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {  
 ?  @Override  
 ?  public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {  
 ? ? ?  return new ConstructorInterceptPoint[]{  
 ? ? ? ? ?  new ConstructorInterceptPoint() {  
 ? ? ? ? ? ? ?  @Override  
 ? ? ? ? ? ? ?  public ElementMatcher<MethodDescription> getConstructorMatcher() {  
 ? ? ? ? ? ? ? ? ?  return any();  
 ? ? ? ? ? ? ?  }  
?  
 ? ? ? ? ? ? ?  @Override  
 ? ? ? ? ? ? ?  public String getConstructorInterceptor() {  
 ? ? ? ? ? ? ? ? ?  return "org.apache.skywalking.apm.plugin.spring.webflux.v5.ServerWebExchangeConstructorInterceptor";  
 ? ? ? ? ? ? ?  }  
 ? ? ? ? ?  }  
 ? ? ?  };  
 ?  }  
?  
 ?  @Override  
 ?  public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {  
 ? ? ?  return new InstanceMethodsInterceptPoint[0];  
 ?  }  
?  
 ?  @Override  
 ?  protected ClassMatch enhanceClass() {  
 ? ? ?  return byName("org.springframework.web.server.adapter.DefaultServerWebExchange");  
 ?  }  
}
  • ServerWebExchangeInstrumentation继承了ClassInstanceMethodsEnhancePluginDefine,其使用org.apache.skywalking.apm.plugin.spring.webflux.v5.ServerWebExchangeConstructorInterceptor增强org.springframework.web.server.adapter.DefaultServerWebExchange的所有方法

ServerWebExchangeConstructorInterceptor

skywalking-6.6.0/apm-sniffer/optional-plugins/optional-spring-plugins/spring-webflux-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/webflux/v5/ServerWebExchangeConstructorInterceptor.java

public class ServerWebExchangeConstructorInterceptor implements InstanceConstructorInterceptor {  
 ?  @Override  
 ?  public void onConstruct(EnhancedInstance objInst, Object[] allArguments) {  
 ?  }  
}
  • ServerWebExchangeConstructorInterceptor目前还是空实现

小结

DispatcherHandlerInstrumentation继承了ClassInstanceMethodsEnhancePluginDefine,其使用org.apache.skywalking.apm.plugin.spring.webflux.v5.DispatcherHandlerHandleMethodInterceptor增强org.springframework.web.reactive.DispatcherHandler的handle方法;ServerWebExchangeInstrumentation继承了ClassInstanceMethodsEnhancePluginDefine,其使用org.apache.skywalking.apm.plugin.spring.webflux.v5.ServerWebExchangeConstructorInterceptor增强org.springframework.web.server.adapter.DefaultServerWebExchange的所有方法

doc

本文主要研究一下skywalking的spring-webflux-plugin

技术图片

DispatcherHandlerInstrumentation

skywalking-6.6.0/apm-sniffer/optional-plugins/optional-spring-plugins/spring-webflux-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/webflux/v5/define/DispatcherHandlerInstrumentation.java

public class DispatcherHandlerInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {  
 ?  @Override  
 ?  public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {  
 ? ? ?  return new ConstructorInterceptPoint[0];  
 ?  }  
?  
 ?  @Override  
 ?  public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {  
 ? ? ?  return new InstanceMethodsInterceptPoint[]{  
 ? ? ? ? ?  new InstanceMethodsInterceptPoint() {  
 ? ? ? ? ? ? ?  @Override  
 ? ? ? ? ? ? ?  public ElementMatcher<MethodDescription> getMethodsMatcher() {  
 ? ? ? ? ? ? ? ? ?  return named("handle");  
 ? ? ? ? ? ? ?  }  
?  
 ? ? ? ? ? ? ?  @Override  
 ? ? ? ? ? ? ?  public String getMethodsInterceptor() {  
 ? ? ? ? ? ? ? ? ?  return "org.apache.skywalking.apm.plugin.spring.webflux.v5.DispatcherHandlerHandleMethodInterceptor";  
 ? ? ? ? ? ? ?  }  
?  
 ? ? ? ? ? ? ?  @Override  
 ? ? ? ? ? ? ?  public boolean isOverrideArgs() {  
 ? ? ? ? ? ? ? ? ?  return false;  
 ? ? ? ? ? ? ?  }  
 ? ? ? ? ?  }  
 ? ? ?  };  
 ?  }  
?  
 ?  @Override  
 ?  protected ClassMatch enhanceClass() {  
 ? ? ?  return byName("org.springframework.web.reactive.DispatcherHandler");  
 ?  }  
}
  • DispatcherHandlerInstrumentation继承了ClassInstanceMethodsEnhancePluginDefine,其使用org.apache.skywalking.apm.plugin.spring.webflux.v5.DispatcherHandlerHandleMethodInterceptor增强org.springframework.web.reactive.DispatcherHandler的handle方法

DispatcherHandlerHandleMethodInterceptor

skywalking-6.6.0/apm-sniffer/optional-plugins/optional-spring-plugins/spring-webflux-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/webflux/v5/DispatcherHandlerHandleMethodInterceptor.java

public class DispatcherHandlerHandleMethodInterceptor implements InstanceMethodsAroundInterceptor {  
 ?  private static final String DEFAULT_OPERATION_NAME = "WEBFLUX.handle";  
?  
 ?  @Override  
 ?  public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,  
 ? ? ? ? ? ? ? ? ? ? ? ? ? ? MethodInterceptResult result) throws Throwable {  
?  
 ?  }  
?  
 ?  @Override  
 ?  public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,  
 ? ? ? ? ? ? ? ? ? ? ? ? ? ?  Object ret) throws Throwable {  
 ? ? ?  EnhancedInstance instance = getInstance(allArguments[0]);  
?  
 ? ? ?  ServerWebExchange exchange = (ServerWebExchange) allArguments[0];  
?  
 ? ? ?  ContextCarrier carrier = new ContextCarrier();  
 ? ? ?  CarrierItem next = carrier.items();  
 ? ? ?  HttpHeaders headers = exchange.getRequest().getHeaders();  
 ? ? ?  while (next.hasNext()) {  
 ? ? ? ? ?  next = next.next();  
 ? ? ? ? ?  List<String> header = headers.get(next.getHeadKey());  
 ? ? ? ? ?  if (header != null && header.size() > 0) {  
 ? ? ? ? ? ? ?  next.setHeadValue(header.get(0));  
 ? ? ? ? ?  }  
 ? ? ?  }  
?  
 ? ? ?  AbstractSpan span = ContextManager.createEntrySpan(DEFAULT_OPERATION_NAME, carrier);  
 ? ? ?  span.setComponent(ComponentsDefine.SPRING_WEBFLUX);  
 ? ? ?  SpanLayer.asHttp(span);  
 ? ? ?  Tags.URL.set(span, exchange.getRequest().getURI().toString());  
 ? ? ?  HTTP.METHOD.set(span, exchange.getRequest().getMethodValue());  
 ? ? ?  instance.setSkyWalkingDynamicField(ContextManager.capture());  
 ? ? ?  span.prepareForAsync();  
 ? ? ?  ContextManager.stopSpan(span);  
?  
 ? ? ?  return ((Mono) ret).doFinally(s -> {  
 ? ? ? ? ?  try {  
 ? ? ? ? ? ? ?  Object pathPattern = exchange.getAttribute(HandlerMapping.BEST_MATCHING_PATTERN_ATTRIBUTE);  
 ? ? ? ? ? ? ?  if (pathPattern != null) {  
 ? ? ? ? ? ? ? ? ?  span.setOperationName(((PathPattern) pathPattern).getPatternString());  
 ? ? ? ? ? ? ?  }  
 ? ? ? ? ? ? ?  HttpStatus httpStatus = exchange.getResponse().getStatusCode();  
 ? ? ? ? ? ? ?  // fix webflux-2.0.0-2.1.0 version have bug. httpStatus is null. not support  
 ? ? ? ? ? ? ?  if (httpStatus != null) {  
 ? ? ? ? ? ? ? ? ?  Tags.STATUS_CODE.set(span, Integer.toString(httpStatus.value()));  
 ? ? ? ? ? ? ? ? ?  if (httpStatus.isError()) {  
 ? ? ? ? ? ? ? ? ? ? ?  span.errorOccurred();  
 ? ? ? ? ? ? ? ? ?  }  
 ? ? ? ? ? ? ?  }  
 ? ? ? ? ?  } finally {  
 ? ? ? ? ? ? ?  span.asyncFinish();  
 ? ? ? ? ?  }  
 ? ? ?  });  
?  
 ?  }  
?  
 ?  @Override  
 ?  public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,  
 ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?  Class<?>[] argumentsTypes, Throwable t) {  
 ?  }  
?  
 ?  public static EnhancedInstance getInstance(Object o) {  
 ? ? ?  EnhancedInstance instance = null;  
 ? ? ?  if (o instanceof DefaultServerWebExchange) {  
 ? ? ? ? ?  instance = (EnhancedInstance) o;  
 ? ? ?  } else if (o instanceof ServerWebExchangeDecorator) {  
 ? ? ? ? ?  ServerWebExchange delegate = ((ServerWebExchangeDecorator) o).getDelegate();  
 ? ? ? ? ?  return getInstance(delegate);  
 ? ? ?  }  
 ? ? ?  return instance;  
 ?  }  
?  
}
  • DispatcherHandlerHandleMethodInterceptor实现了InstanceMethodsAroundInterceptor接口,其afterMethod方法创建AbstractSpan,设置URL及METHOD的tag,执行span.prepareForAsync(),然后注册Mono的doFinally的Consumer,在里头设置span的operationName,statusCode,以及是否有异常,最后执行span.asyncFinish()

ServerWebExchangeInstrumentation

skywalking-6.6.0/apm-sniffer/optional-plugins/optional-spring-plugins/spring-webflux-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/webflux/v5/define/ServerWebExchangeInstrumentation.java

public class ServerWebExchangeInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {  
 ?  @Override  
 ?  public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {  
 ? ? ?  return new ConstructorInterceptPoint[]{  
 ? ? ? ? ?  new ConstructorInterceptPoint() {  
 ? ? ? ? ? ? ?  @Override  
 ? ? ? ? ? ? ?  public ElementMatcher<MethodDescription> getConstructorMatcher() {  
 ? ? ? ? ? ? ? ? ?  return any();  
 ? ? ? ? ? ? ?  }  
?  
 ? ? ? ? ? ? ?  @Override  
 ? ? ? ? ? ? ?  public String getConstructorInterceptor() {  
 ? ? ? ? ? ? ? ? ?  return "org.apache.skywalking.apm.plugin.spring.webflux.v5.ServerWebExchangeConstructorInterceptor";  
 ? ? ? ? ? ? ?  }  
 ? ? ? ? ?  }  
 ? ? ?  };  
 ?  }  
?  
 ?  @Override  
 ?  public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {  
 ? ? ?  return new InstanceMethodsInterceptPoint[0];  
 ?  }  
?  
 ?  @Override  
 ?  protected ClassMatch enhanceClass() {  
 ? ? ?  return byName("org.springframework.web.server.adapter.DefaultServerWebExchange");  
 ?  }  
}
  • ServerWebExchangeInstrumentation继承了ClassInstanceMethodsEnhancePluginDefine,其使用org.apache.skywalking.apm.plugin.spring.webflux.v5.ServerWebExchangeConstructorInterceptor增强org.springframework.web.server.adapter.DefaultServerWebExchange的所有方法

ServerWebExchangeConstructorInterceptor

skywalking-6.6.0/apm-sniffer/optional-plugins/optional-spring-plugins/spring-webflux-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/webflux/v5/ServerWebExchangeConstructorInterceptor.java

public class ServerWebExchangeConstructorInterceptor implements InstanceConstructorInterceptor {  
 ?  @Override  
 ?  public void onConstruct(EnhancedInstance objInst, Object[] allArguments) {  
 ?  }  
}
  • ServerWebExchangeConstructorInterceptor目前还是空实现

小结

DispatcherHandlerInstrumentation继承了ClassInstanceMethodsEnhancePluginDefine,其使用org.apache.skywalking.apm.plugin.spring.webflux.v5.DispatcherHandlerHandleMethodInterceptor增强org.springframework.web.reactive.DispatcherHandler的handle方法;ServerWebExchangeInstrumentation继承了ClassInstanceMethodsEnhancePluginDefine,其使用org.apache.skywalking.apm.plugin.spring.webflux.v5.ServerWebExchangeConstructorInterceptor增强org.springframework.web.server.adapter.DefaultServerWebExchange的所有方法

doc

本文主要研究一下skywalking的spring-webflux-plugin

技术图片

DispatcherHandlerInstrumentation

skywalking-6.6.0/apm-sniffer/optional-plugins/optional-spring-plugins/spring-webflux-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/webflux/v5/define/DispatcherHandlerInstrumentation.java

public class DispatcherHandlerInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {  
 ?  @Override  
 ?  public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {  
 ? ? ?  return new ConstructorInterceptPoint[0];  
 ?  }  
?  
 ?  @Override  
 ?  public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {  
 ? ? ?  return new InstanceMethodsInterceptPoint[]{  
 ? ? ? ? ?  new InstanceMethodsInterceptPoint() {  
 ? ? ? ? ? ? ?  @Override  
 ? ? ? ? ? ? ?  public ElementMatcher<MethodDescription> getMethodsMatcher() {  
 ? ? ? ? ? ? ? ? ?  return named("handle");  
 ? ? ? ? ? ? ?  }  
?  
 ? ? ? ? ? ? ?  @Override  
 ? ? ? ? ? ? ?  public String getMethodsInterceptor() {  
 ? ? ? ? ? ? ? ? ?  return "org.apache.skywalking.apm.plugin.spring.webflux.v5.DispatcherHandlerHandleMethodInterceptor";  
 ? ? ? ? ? ? ?  }  
?  
 ? ? ? ? ? ? ?  @Override  
 ? ? ? ? ? ? ?  public boolean isOverrideArgs() {  
 ? ? ? ? ? ? ? ? ?  return false;  
 ? ? ? ? ? ? ?  }  
 ? ? ? ? ?  }  
 ? ? ?  };  
 ?  }  
?  
 ?  @Override  
 ?  protected ClassMatch enhanceClass() {  
 ? ? ?  return byName("org.springframework.web.reactive.DispatcherHandler");  
 ?  }  
}
  • DispatcherHandlerInstrumentation继承了ClassInstanceMethodsEnhancePluginDefine,其使用org.apache.skywalking.apm.plugin.spring.webflux.v5.DispatcherHandlerHandleMethodInterceptor增强org.springframework.web.reactive.DispatcherHandler的handle方法

DispatcherHandlerHandleMethodInterceptor

skywalking-6.6.0/apm-sniffer/optional-plugins/optional-spring-plugins/spring-webflux-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/webflux/v5/DispatcherHandlerHandleMethodInterceptor.java

public class DispatcherHandlerHandleMethodInterceptor implements InstanceMethodsAroundInterceptor {  
 ?  private static final String DEFAULT_OPERATION_NAME = "WEBFLUX.handle";  
?  
 ?  @Override  
 ?  public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,  
 ? ? ? ? ? ? ? ? ? ? ? ? ? ? MethodInterceptResult result) throws Throwable {  
?  
 ?  }  
?  
 ?  @Override  
 ?  public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,  
 ? ? ? ? ? ? ? ? ? ? ? ? ? ?  Object ret) throws Throwable {  
 ? ? ?  EnhancedInstance instance = getInstance(allArguments[0]);  
?  
 ? ? ?  ServerWebExchange exchange = (ServerWebExchange) allArguments[0];  
?  
 ? ? ?  ContextCarrier carrier = new ContextCarrier();  
 ? ? ?  CarrierItem next = carrier.items();  
 ? ? ?  HttpHeaders headers = exchange.getRequest().getHeaders();  
 ? ? ?  while (next.hasNext()) {  
 ? ? ? ? ?  next = next.next();  
 ? ? ? ? ?  List<String> header = headers.get(next.getHeadKey());  
 ? ? ? ? ?  if (header != null && header.size() > 0) {  
 ? ? ? ? ? ? ?  next.setHeadValue(header.get(0));  
 ? ? ? ? ?  }  
 ? ? ?  }  
?  
 ? ? ?  AbstractSpan span = ContextManager.createEntrySpan(DEFAULT_OPERATION_NAME, carrier);  
 ? ? ?  span.setComponent(ComponentsDefine.SPRING_WEBFLUX);  
 ? ? ?  SpanLayer.asHttp(span);  
 ? ? ?  Tags.URL.set(span, exchange.getRequest().getURI().toString());  
 ? ? ?  HTTP.METHOD.set(span, exchange.getRequest().getMethodValue());  
 ? ? ?  instance.setSkyWalkingDynamicField(ContextManager.capture());  
 ? ? ?  span.prepareForAsync();  
 ? ? ?  ContextManager.stopSpan(span);  
?  
 ? ? ?  return ((Mono) ret).doFinally(s -> {  
 ? ? ? ? ?  try {  
 ? ? ? ? ? ? ?  Object pathPattern = exchange.getAttribute(HandlerMapping.BEST_MATCHING_PATTERN_ATTRIBUTE);  
 ? ? ? ? ? ? ?  if (pathPattern != null) {  
 ? ? ? ? ? ? ? ? ?  span.setOperationName(((PathPattern) pathPattern).getPatternString());  
 ? ? ? ? ? ? ?  }  
 ? ? ? ? ? ? ?  HttpStatus httpStatus = exchange.getResponse().getStatusCode();  
 ? ? ? ? ? ? ?  // fix webflux-2.0.0-2.1.0 version have bug. httpStatus is null. not support  
 ? ? ? ? ? ? ?  if (httpStatus != null) {  
 ? ? ? ? ? ? ? ? ?  Tags.STATUS_CODE.set(span, Integer.toString(httpStatus.value()));  
 ? ? ? ? ? ? ? ? ?  if (httpStatus.isError()) {  
 ? ? ? ? ? ? ? ? ? ? ?  span.errorOccurred();  
 ? ? ? ? ? ? ? ? ?  }  
 ? ? ? ? ? ? ?  }  
 ? ? ? ? ?  } finally {  
 ? ? ? ? ? ? ?  span.asyncFinish();  
 ? ? ? ? ?  }  
 ? ? ?  });  
?  
 ?  }  
?  
 ?  @Override  
 ?  public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,  
 ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?  Class<?>[] argumentsTypes, Throwable t) {  
 ?  }  
?  
 ?  public static EnhancedInstance getInstance(Object o) {  
 ? ? ?  EnhancedInstance instance = null;  
 ? ? ?  if (o instanceof DefaultServerWebExchange) {  
 ? ? ? ? ?  instance = (EnhancedInstance) o;  
 ? ? ?  } else if (o instanceof ServerWebExchangeDecorator) {  
 ? ? ? ? ?  ServerWebExchange delegate = ((ServerWebExchangeDecorator) o).getDelegate();  
 ? ? ? ? ?  return getInstance(delegate);  
 ? ? ?  }  
 ? ? ?  return instance;  
 ?  }  
?  
}
  • DispatcherHandlerHandleMethodInterceptor实现了InstanceMethodsAroundInterceptor接口,其afterMethod方法创建AbstractSpan,设置URL及METHOD的tag,执行span.prepareForAsync(),然后注册Mono的doFinally的Consumer,在里头设置span的operationName,statusCode,以及是否有异常,最后执行span.asyncFinish()

ServerWebExchangeInstrumentation

skywalking-6.6.0/apm-sniffer/optional-plugins/optional-spring-plugins/spring-webflux-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/webflux/v5/define/ServerWebExchangeInstrumentation.java

public class ServerWebExchangeInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {  
 ?  @Override  
 ?  public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {  
 ? ? ?  return new ConstructorInterceptPoint[]{  
 ? ? ? ? ?  new ConstructorInterceptPoint() {  
 ? ? ? ? ? ? ?  @Override  
 ? ? ? ? ? ? ?  public ElementMatcher<MethodDescription> getConstructorMatcher() {  
 ? ? ? ? ? ? ? ? ?  return any();  
 ? ? ? ? ? ? ?  }  
?  
 ? ? ? ? ? ? ?  @Override  
 ? ? ? ? ? ? ?  public String getConstructorInterceptor() {  
 ? ? ? ? ? ? ? ? ?  return "org.apache.skywalking.apm.plugin.spring.webflux.v5.ServerWebExchangeConstructorInterceptor";  
 ? ? ? ? ? ? ?  }  
 ? ? ? ? ?  }  
 ? ? ?  };  
 ?  }  
?  
 ?  @Override  
 ?  public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {  
 ? ? ?  return new InstanceMethodsInterceptPoint[0];  
 ?  }  
?  
 ?  @Override  
 ?  protected ClassMatch enhanceClass() {  
 ? ? ?  return byName("org.springframework.web.server.adapter.DefaultServerWebExchange");  
 ?  }  
}
  • ServerWebExchangeInstrumentation继承了ClassInstanceMethodsEnhancePluginDefine,其使用org.apache.skywalking.apm.plugin.spring.webflux.v5.ServerWebExchangeConstructorInterceptor增强org.springframework.web.server.adapter.DefaultServerWebExchange的所有方法

ServerWebExchangeConstructorInterceptor

skywalking-6.6.0/apm-sniffer/optional-plugins/optional-spring-plugins/spring-webflux-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/webflux/v5/ServerWebExchangeConstructorInterceptor.java

public class ServerWebExchangeConstructorInterceptor implements InstanceConstructorInterceptor {  
 ?  @Override  
 ?  public void onConstruct(EnhancedInstance objInst, Object[] allArguments) {  
 ?  }  
}
  • ServerWebExchangeConstructorInterceptor目前还是空实现

小结

DispatcherHandlerInstrumentation继承了ClassInstanceMethodsEnhancePluginDefine,其使用org.apache.skywalking.apm.plugin.spring.webflux.v5.DispatcherHandlerHandleMethodInterceptor增强org.springframework.web.reactive.DispatcherHandler的handle方法;ServerWebExchangeInstrumentation继承了ClassInstanceMethodsEnhancePluginDefine,其使用org.apache.skywalking.apm.plugin.spring.webflux.v5.ServerWebExchangeConstructorInterceptor增强org.springframework.web.server.adapter.DefaultServerWebExchange的所有方法

doc

本文主要研究一下skywalking的spring-webflux-plugin

技术图片

DispatcherHandlerInstrumentation

skywalking-6.6.0/apm-sniffer/optional-plugins/optional-spring-plugins/spring-webflux-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/webflux/v5/define/DispatcherHandlerInstrumentation.java

public class DispatcherHandlerInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {  
 ?  @Override  
 ?  public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {  
 ? ? ?  return new ConstructorInterceptPoint[0];  
 ?  }  
?  
 ?  @Override  
 ?  public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {  
 ? ? ?  return new InstanceMethodsInterceptPoint[]{  
 ? ? ? ? ?  new InstanceMethodsInterceptPoint() {  
 ? ? ? ? ? ? ?  @Override  
 ? ? ? ? ? ? ?  public ElementMatcher<MethodDescription> getMethodsMatcher() {  
 ? ? ? ? ? ? ? ? ?  return named("handle");  
 ? ? ? ? ? ? ?  }  
?  
 ? ? ? ? ? ? ?  @Override  
 ? ? ? ? ? ? ?  public String getMethodsInterceptor() {  
 ? ? ? ? ? ? ? ? ?  return "org.apache.skywalking.apm.plugin.spring.webflux.v5.DispatcherHandlerHandleMethodInterceptor";  
 ? ? ? ? ? ? ?  }  
?  
 ? ? ? ? ? ? ?  @Override  
 ? ? ? ? ? ? ?  public boolean isOverrideArgs() {  
 ? ? ? ? ? ? ? ? ?  return false;  
 ? ? ? ? ? ? ?  }  
 ? ? ? ? ?  }  
 ? ? ?  };  
 ?  }  
?  
 ?  @Override  
 ?  protected ClassMatch enhanceClass() {  
 ? ? ?  return byName("org.springframework.web.reactive.DispatcherHandler");  
 ?  }  
}
  • DispatcherHandlerInstrumentation继承了ClassInstanceMethodsEnhancePluginDefine,其使用org.apache.skywalking.apm.plugin.spring.webflux.v5.DispatcherHandlerHandleMethodInterceptor增强org.springframework.web.reactive.DispatcherHandler的handle方法

DispatcherHandlerHandleMethodInterceptor

skywalking-6.6.0/apm-sniffer/optional-plugins/optional-spring-plugins/spring-webflux-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/webflux/v5/DispatcherHandlerHandleMethodInterceptor.java

public class DispatcherHandlerHandleMethodInterceptor implements InstanceMethodsAroundInterceptor {  
 ?  private static final String DEFAULT_OPERATION_NAME = "WEBFLUX.handle";  
?  
 ?  @Override  
 ?  public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,  
 ? ? ? ? ? ? ? ? ? ? ? ? ? ? MethodInterceptResult result) throws Throwable {  
?  
 ?  }  
?  
 ?  @Override  
 ?  public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,  
 ? ? ? ? ? ? ? ? ? ? ? ? ? ?  Object ret) throws Throwable {  
 ? ? ?  EnhancedInstance instance = getInstance(allArguments[0]);  
?  
 ? ? ?  ServerWebExchange exchange = (ServerWebExchange) allArguments[0];  
?  
 ? ? ?  ContextCarrier carrier = new ContextCarrier();  
 ? ? ?  CarrierItem next = carrier.items();  
 ? ? ?  HttpHeaders headers = exchange.getRequest().getHeaders();  
 ? ? ?  while (next.hasNext()) {  
 ? ? ? ? ?  next = next.next();  
 ? ? ? ? ?  List<String> header = headers.get(next.getHeadKey());  
 ? ? ? ? ?  if (header != null && header.size() > 0) {  
 ? ? ? ? ? ? ?  next.setHeadValue(header.get(0));  
 ? ? ? ? ?  }  
 ? ? ?  }  
?  
 ? ? ?  AbstractSpan span = ContextManager.createEntrySpan(DEFAULT_OPERATION_NAME, carrier);  
 ? ? ?  span.setComponent(ComponentsDefine.SPRING_WEBFLUX);  
 ? ? ?  SpanLayer.asHttp(span);  
 ? ? ?  Tags.URL.set(span, exchange.getRequest().getURI().toString());  
 ? ? ?  HTTP.METHOD.set(span, exchange.getRequest().getMethodValue());  
 ? ? ?  instance.setSkyWalkingDynamicField(ContextManager.capture());  
 ? ? ?  span.prepareForAsync();  
 ? ? ?  ContextManager.stopSpan(span);  
?  
 ? ? ?  return ((Mono) ret).doFinally(s -> {  
 ? ? ? ? ?  try {  
 ? ? ? ? ? ? ?  Object pathPattern = exchange.getAttribute(HandlerMapping.BEST_MATCHING_PATTERN_ATTRIBUTE);  
 ? ? ? ? ? ? ?  if (pathPattern != null) {  
 ? ? ? ? ? ? ? ? ?  span.setOperationName(((PathPattern) pathPattern).getPatternString());  
 ? ? ? ? ? ? ?  }  
 ? ? ? ? ? ? ?  HttpStatus httpStatus = exchange.getResponse().getStatusCode();  
 ? ? ? ? ? ? ?  // fix webflux-2.0.0-2.1.0 version have bug. httpStatus is null. not support  
 ? ? ? ? ? ? ?  if (httpStatus != null) {  
 ? ? ? ? ? ? ? ? ?  Tags.STATUS_CODE.set(span, Integer.toString(httpStatus.value()));  
 ? ? ? ? ? ? ? ? ?  if (httpStatus.isError()) {  
 ? ? ? ? ? ? ? ? ? ? ?  span.errorOccurred();  
 ? ? ? ? ? ? ? ? ?  }  
 ? ? ? ? ? ? ?  }  
 ? ? ? ? ?  } finally {  
 ? ? ? ? ? ? ?  span.asyncFinish();  
 ? ? ? ? ?  }  
 ? ? ?  });  
?  
 ?  }  
?  
 ?  @Override  
 ?  public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,  
 ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?  Class<?>[] argumentsTypes, Throwable t) {  
 ?  }  
?  
 ?  public static EnhancedInstance getInstance(Object o) {  
 ? ? ?  EnhancedInstance instance = null;  
 ? ? ?  if (o instanceof DefaultServerWebExchange) {  
 ? ? ? ? ?  instance = (EnhancedInstance) o;  
 ? ? ?  } else if (o instanceof ServerWebExchangeDecorator) {  
 ? ? ? ? ?  ServerWebExchange delegate = ((ServerWebExchangeDecorator) o).getDelegate();  
 ? ? ? ? ?  return getInstance(delegate);  
 ? ? ?  }  
 ? ? ?  return instance;  
 ?  }  
?  
}
  • DispatcherHandlerHandleMethodInterceptor实现了InstanceMethodsAroundInterceptor接口,其afterMethod方法创建AbstractSpan,设置URL及METHOD的tag,执行span.prepareForAsync(),然后注册Mono的doFinally的Consumer,在里头设置span的operationName,statusCode,以及是否有异常,最后执行span.asyncFinish()

ServerWebExchangeInstrumentation

skywalking-6.6.0/apm-sniffer/optional-plugins/optional-spring-plugins/spring-webflux-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/webflux/v5/define/ServerWebExchangeInstrumentation.java

public class ServerWebExchangeInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {  
 ?  @Override  
 ?  public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {  
 ? ? ?  return new ConstructorInterceptPoint[]{  
 ? ? ? ? ?  new ConstructorInterceptPoint() {  
 ? ? ? ? ? ? ?  @Override  
 ? ? ? ? ? ? ?  public ElementMatcher<MethodDescription> getConstructorMatcher() {  
 ? ? ? ? ? ? ? ? ?  return any();  
 ? ? ? ? ? ? ?  }  
?  
 ? ? ? ? ? ? ?  @Override  
 ? ? ? ? ? ? ?  public String getConstructorInterceptor() {  
 ? ? ? ? ? ? ? ? ?  return "org.apache.skywalking.apm.plugin.spring.webflux.v5.ServerWebExchangeConstructorInterceptor";  
 ? ? ? ? ? ? ?  }  
 ? ? ? ? ?  }  
 ? ? ?  };  
 ?  }  
?  
 ?  @Override  
 ?  public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {  
 ? ? ?  return new InstanceMethodsInterceptPoint[0];  
 ?  }  
?  
 ?  @Override  
 ?  protected ClassMatch enhanceClass() {  
 ? ? ?  return byName("org.springframework.web.server.adapter.DefaultServerWebExchange");  
 ?  }  
}
  • ServerWebExchangeInstrumentation继承了ClassInstanceMethodsEnhancePluginDefine,其使用org.apache.skywalking.apm.plugin.spring.webflux.v5.ServerWebExchangeConstructorInterceptor增强org.springframework.web.server.adapter.DefaultServerWebExchange的所有方法

ServerWebExchangeConstructorInterceptor

skywalking-6.6.0/apm-sniffer/optional-plugins/optional-spring-plugins/spring-webflux-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/webflux/v5/ServerWebExchangeConstructorInterceptor.java

public class ServerWebExchangeConstructorInterceptor implements InstanceConstructorInterceptor {  
 ?  @Override  
 ?  public void onConstruct(EnhancedInstance objInst, Object[] allArguments) {  
 ?  }  
}
  • ServerWebExchangeConstructorInterceptor目前还是空实现

小结

DispatcherHandlerInstrumentation继承了ClassInstanceMethodsEnhancePluginDefine,其使用org.apache.skywalking.apm.plugin.spring.webflux.v5.DispatcherHandlerHandleMethodInterceptor增强org.springframework.web.reactive.DispatcherHandler的handle方法;ServerWebExchangeInstrumentation继承了ClassInstanceMethodsEnhancePluginDefine,其使用org.apache.skywalking.apm.plugin.spring.webflux.v5.ServerWebExchangeConstructorInterceptor增强org.springframework.web.server.adapter.DefaultServerWebExchange的所有方法

doc

  • DispatcherHandlerInstrumentation
  • ServerWebExchangeInstrumentation

以上是关于浅谈skywalking的spring-webflux-plugin的主要内容,如果未能解决你的问题,请参考以下文章

浅谈skywalking的spring-webflux-plugin

云原生月报-2020年12月

skywalking

skywalking 安装和使用

k8s安装skywalking

微服务链路追踪SkyWalking