Dubbo数据透传

Posted IT巡游屋

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Dubbo数据透传相关的知识,希望对你有一定的参考价值。

前言

关于Dubbo框架,可能很多人都知道。Dubbo是阿里巴巴公司开源的一个高性能优秀的服务框架,它使得应用可通过高性能的 RPC 实现服务的输出和输入功能,可以和Spring框架无缝集成。而它的使用场景主要是用于分布式的远程调用。很多朋友在使用Dubbo的过程中,只是关注着业务代码的接口定义以及实现,但是确往往忽略了一个重要的技术点,就是如何在服务间追踪一个调用链。

关于追踪服务的调用链

简单常见的Dubbo服务部署架构

就上图而言,有两个服务的消费者和三个服务的提供者。

消费者:ConsumerA,ConsumerB

提供者:ServiceA,ServiceB,ServiceC

但是这些服务的消费者和提供者都不是单一部署的。消费者部署在了两台服务器C1和C2,提供者分别在P1,P2,P3都部署了。那么现在问题来了,如果ConsumerA中,某一个业务方法,需要分别依靠调用ServiceA、ServiceB和ServiceC才能完成。那么如何才能知道这个请求调用的是P1-P3服务器上部署的哪些服务呢?其中可能的一个结果就是,比如ConsumerA调用了P2中的ServiceA,调用了P3中的ServiceB,最后调用了P1中的serviceC。如下图:

Dubbo数据透传

那么要完成这样子的追踪,通常的思路就是在调用中额外传递一个追踪的数据,比如trackerId。接下来就讲解实现这个追踪数据传递的方法。

方法一

描述

在参数中直接添加追踪数据trackId

代码

/** * @author jogeen * @version 1.0 */public class ModelA { /** * 追踪ID */ private String trackerId;  /** * 业务数据 */ private String businessMessage; }
/** * 接口的定义 * @author jogeen * @version 1.0 */public interface IServiceA { public void method1(ModelA modelA);
}

分析

通过这样传递,在同一个线程调用里,每个服务都能获取相同的trackId,这样就将调用链串联起来。但是这种方式太过于粗暴,因为每一个需要传输的Model,都得加上trackerId这个追踪字段,对代码的侵入性太强。

方法二

描述

在方法一的基础上,将追踪数据trackId提取到一个父类BaseModel中,所有其它的Model都继承至BaseModel

代码

/** * @author jogeen * @version 1.0 */public class BaseModel { /** * 追踪ID */ protected String trackerId;}/** * @author jogeen * @version 1.0 */public class ModelA extends BaseModel{  /** * 业务数据 */ private String businessMessage; }

分析

相较于方法一,侵入性虽然小了许多,但是依然还是破坏了“无侵入性”原则。毕竟,需要使每个Model都去继承BaseModel。在java这种类的单一继承的条件下,方法二也是不合时宜的。

方法三

描述

修改Dubbo源码。如下图,Dubbo本身是具有非常清晰的分层。

代码

/** * 使用ThreadLocal保存现在trackID * @author jogeen * @version 1.0 */public class TraceIdUtils { private static final ThreadLocal<String> TRACE_ID_THREAD_LOCAL = new ThreadLocal<String>();  public static final String TRACK_ID="trackId";
public static String getTraceId() { return TRACE_ID_THREAD_LOCAL.get(); }
public static void setTraceId(String traceId) { TRACE_ID_THREAD_LOCAL.set(traceId); }}

Cunsumer端的修改点

package org.apache.dubbo.rpc.proxy;/** * InvokerHandler */public class InvokerInvocationHandler implements InvocationHandler {
private final Invoker<?> invoker;
public InvokerInvocationHandler(Invoker<?> handler) { this.invoker = handler; }
@Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { String methodName = method.getName(); Class<?>[] parameterTypes = method.getParameterTypes(); if (method.getDeclaringClass() == Object.class) { return method.invoke(invoker, args); } if ("toString".equals(methodName) && parameterTypes.length == 0) { return invoker.toString(); } if ("hashCode".equals(methodName) && parameterTypes.length == 0) { return invoker.hashCode(); } if ("equals".equals(methodName) && parameterTypes.length == 1) { return invoker.equals(args[0]); }
RpcInvocation invocation; if (RpcUtils.hasGeneratedFuture(method)) { Class<?> clazz = method.getDeclaringClass(); String syncMethodName = methodName.substring(0, methodName.length() - Constants.ASYNC_SUFFIX.length()); Method syncMethod = clazz.getMethod(syncMethodName, method.getParameterTypes()); invocation = new RpcInvocation(syncMethod, args); invocation.setAttachment(Constants.FUTURE_GENERATED_KEY, "true"); invocation.setAttachment(Constants.ASYNC_KEY, "true"); } else { invocation = new RpcInvocation(method, args); if (RpcUtils.hasFutureReturnType(method)) { invocation.setAttachment(Constants.FUTURE_RETURNTYPE_KEY, "true"); invocation.setAttachment(Constants.ASYNC_KEY, "true"); } } //从ThreadLocal中将Consumer端的当前线程的trackId取出,放入RpcInvocation invocation.setAttachment(TraceIdUtils.TRACK_ID,TrackUtils.getTraceId()); return invoker.invoke(invocation).recreate();    }}

Provider端的修改点

/** * dubbo protocol support. */public class DubboProtocol extends AbstractProtocol { //省略其它变量定义
private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {
@Override public CompletableFuture<Object> reply(ExchangeChannel channel, Object message) throws RemotingException { if (message instanceof Invocation) { Invocation inv = (Invocation) message; Invoker<?> invoker = getInvoker(channel, inv); // need to consider backward-compatibility if it's a callback if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) { String methodsStr = invoker.getUrl().getParameters().get("methods"); boolean hasMethod = false; if (methodsStr == null || !methodsStr.contains(",")) { hasMethod = inv.getMethodName().equals(methodsStr); } else { String[] methods = methodsStr.split(","); for (String method : methods) { if (inv.getMethodName().equals(method)) { hasMethod = true; break; } } } if (!hasMethod) { logger.warn(new IllegalStateException("The methodName " + inv.getMethodName() + " not found in callback service interface ,invoke will be ignored." + " please update the api interface. url is:" + invoker.getUrl()) + " ,invocation is :" + inv); return null; } } RpcContext rpcContext = RpcContext.getContext(); boolean supportServerAsync = invoker.getUrl().getMethodParameter(inv.getMethodName(), Constants.ASYNC_KEY, false); if (supportServerAsync) { CompletableFuture<Object> future = new CompletableFuture<>(); rpcContext.setAsyncContext(new AsyncContextImpl(future)); } rpcContext.setRemoteAddress(channel.getRemoteAddress()); //从RpcInvocation中取出trackId,存入ThreadLocal中,供线程中其它业务使用。TraceIdUtils.setTraceId(inv.getAttachment(TrackUtils.TRACK_ID)); Result result = invoker.invoke(inv);
if (result instanceof AsyncRpcResult) { return ((AsyncRpcResult) result).getResultFuture().thenApply(r -> (Object) r); } else { return CompletableFuture.completedFuture(result); } } throw new RemotingException(channel, "Unsupported request: " + (message == null ? null : (message.getClass().getName() + ": " + message)) + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress()); }
//省略类中其它方法。}

分析

该方法对业务代码没有任何侵入性,对于需要添加此功能的业务代码无需做任何修改。对于性能应该没有明显影响,毕竟trackId只是较短的固定长度的字符串。缺点是每次跟随官方升级Dubbo的版本时,需要再做相同的修改。

方法四

描述

使用RpcContext对象,RpcContext对象是Dubbo框架提供的,其本身维护了一次RPC交互调用的上下文信息。查看RpcContext源码,可以看到,它本身也是维护的ThreadLocal。

代码

源码分析
package org.apache.dubbo.rpc;public class RpcContext {
/** * use internal thread local to improve performance */ private static final InternalThreadLocal<RpcContext> LOCAL = new InternalThreadLocal<RpcContext>() { @Override protected RpcContext initialValue() { return new RpcContext(); } }; private static final InternalThreadLocal<RpcContext> SERVER_LOCAL = new InternalThreadLocal<RpcContext>() { @Override protected RpcContext initialValue() { return new RpcContext(); } };
private final Map<String, String> attachments = new HashMap<String, String>(); private final Map<String, Object> values = new HashMap<String, Object>(); //省略其它成员变量 /** * get context. * * @return context */ public static RpcContext getContext() { return LOCAL.get(); }
public static void restoreContext(RpcContext oldContext) { LOCAL.set(oldContext); } //省略其它方法
}

从本质的理论上讲,和方法三的原理是一样的,最终都是通过将追踪数据存入RpcInvocation,然后传递过去。查看源码如下,主要用注释标注的关键点

/** * AbstractClusterInvoker */public abstract class AbstractClusterInvoker<T> implements Invoker<T> { @Override public Result invoke(final Invocation invocation) throws RpcException { checkWhetherDestroyed();
// binding attachments into invocation. Map<String, String> contextAttachments = RpcContext.getContext().getAttachments(); if (contextAttachments != null && contextAttachments.size() != 0) { //(关键点)将RpcContext中的Attachments信息存入RpcInvocation ((RpcInvocation) invocation).addAttachments(contextAttachments); }
List<Invoker<T>> invokers = list(invocation); LoadBalance loadbalance = initLoadBalance(invokers, invocation); RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation); return doInvoke(invocation, invokers, loadbalance); }}

测试代码
//在Consumer端,调用Service方法前RpcContext.getContext().setAttachment("traceId", "trackValue");//代码1userService.invokeMehtod();
//在Provider端,在Service的实现方法中String traceId = RpcContext.getContext().getAttachment("traceId");//代码2

分析

该方法咋一看没什么大问题,但是却有一个不可忽略的缺点。就是RpcContext中的内容,在进行一次RPC调用之后会被清空。当在一个线程中,有多次RPC方法调用时,只有在调用RpcContext.getContext().setAttachment()方法设置了内容之后,最先调用的一次请求在Provider端能收到RpcContext中上下文的信息,而其它后面的请求都是没有的。所以如果要保证每次RPC调用都携带追踪数据,就都要在调用业务方法之前调用上面代码1的语句,如下。显然,这是不够优雅的。

//在Consumer端,调用Service方法前RpcContext.getContext().setAttachment("traceId", "trackValue");//代码1userService.invokeMehtod1();RpcContext.getContext().setAttachment("traceId", "trackValue");//代码1userService.invokeMehtod2();RpcContext.getContext().setAttachment("traceId", "trackValue");//代码1userService.invokeMehtod3();

方法五

描述

在方法四的基础上,加入Dubbo过滤器的使用。自定义过滤器TrackFilter实现com.alibaba.dubbo.rpc.Filter接口。这样即使RpcContext中的上下文内容在每次PRC调用后会被清理,我们也可以在过滤器中重新填入追踪数据trackId。

代码

自定义

package com.ittheima.util;/** * 使用ThreadLocal保存现在trackID * @author jogeen * @version 1.0 * @date 2018年11月21日 */public class TraceIdUtils {
private static final ThreadLocal<String> TRACE_ID_THREAD_LOCAL=new ThreadLocal<>(); public static String getTraceId(){ return TRACE_ID_THREAD_LOCAL.get(); }
public static void setTraceId(String traceId){ TRACE_ID_THREAD_LOCAL.set(traceId); }}自定义日志类
/** * 追踪日志工具类 * @author: jogeen * @version: v1.0 * @date:2018年11月21日 */public class TrackLogUtils {
private static Logger logger = Logger.getLogger(TrackLogUtils.class);
public static void info(String message){ logger.info(TraceIdUtils.getTraceId()+":"+message); }
}

自定义过滤器

/** * dubbo过滤器 * @author: jogeen * @version: v1.0 */public class TraceIdFilter implements Filter {
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException { String traceId = RpcContext.getContext().getAttachment("traceId");
if ( !StringUtils.isEmpty(traceId) ) { //如果RpcContext携带了traceId,将其作为线程共享变量保存起来。 TraceIdUtils.setTraceId(traceId); } else { //如果RpcContext未携带了traceId,从ThreadLocal中查询 traceId=TraceIdUtils.getTraceId(); if ( StringUtils.isEmpty(traceId) ) { //如果ThreadLocal没有TrackId,说明这是调用链的开端,所以生成一个trackId traceId= UUID.randomUUID().toString(); //保存 TraceIdUtils.setTraceId(traceId); } //重新给RpcContext设置traceId RpcContext.getContext().setAttachment("traceId", TraceIdUtils.getTraceId()); } //实际的rpc调用 return invoker.invoke(invocation);

注意:要让我们自定义的过滤器生效,还需要在resource目录下, 添加META-INF/dubbo目录,并且创建com.alibaba.dubbo.rpc.Filter文件

在服务注册的地方,加入自定义的过滤器(此次为注解形式,配置文件方式也是一样)

@Service(filter = "traceIdFilter")public class UserServiceImpl implements UserService 

在服务调用的地方,加入自定义的过滤器(此次为注解形式,配置文件方式也是一样)

@Reference(filter = "traceIdFilter")private UserService userService;

客户端方法调用片段

public String showName(){ TrackLogUtils.info("请求方法第1次"); userService.getName("jogeen1"); TrackLogUtils.info("请求方法第2次"); userService.getName("jogeen2"); TrackLogUtils.info("请求方法第3次"); userService.getName("jogeen3"); return null; }

服务端代码片段

 @Override public String getName(String name) { TrackLogUtils.info("进入请求体name="+name); return "jogeen"; }

结果:

客户端

[INFO ] 2018-12-22 20:37:00,987 3c57472f-be31-4c35-91a9-ea408553df6a:请求方法第1次完成

[INFO ] 2018-12-22 20:37:00,991 3c57472f-be31-4c35-91a9-ea408553df6a:请求方法第2次完成

[INFO ] 2018-12-22 20:37:00,995 3c57472f-be31-4c35-91a9-ea408553df6a:请求方法第3次完成

服务端:

[INFO ] 2018-12-22 20:38:44,490 01301855-12cb-4d2a-b124-4364a775b045:进入请求体name=jogeen1

[INFO ] 2018-12-22 20:38:44,501 01301855-12cb-4d2a-b124-4364a775b045:进入请求体name=jogeen2

[INFO ] 2018-12-22 20:38:44,504 01301855-12cb-4d2a-b124-4364a775b045:进入请求体name=jogeen3

分析

该方法非常完美,每次调用之前都会在filter设置追踪数据,不仅对已有代码无任何侵入性,而且还可以实现多服务之间的连续传递。如A->B->C->D,从服务A开始产生的追踪数据可以一直传递到服务D中。

总结

程序员在编写业务代码的同时,也要向公司架构牛人多学习,他们是如何搭建我们的系统框架,以满足日益复杂的业务需求。作为Dubbo过滤器的使用,还可以用于服务间调用响应的耗时记录,以及服务间的安全检验工作等。作为Dubbo功能的扩充,filter还是很有研究价值。


以上是关于Dubbo数据透传的主要内容,如果未能解决你的问题,请参考以下文章

dubbo traceId透传实现日志链路追踪(基于Filter和RpcContext实现)

全链路参数透传

全链路参数透传

全链路参数透传

全链路参数透传

架构师如何选型分布式业务网关