agent实现apm上报
Posted go大鸡腿
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了agent实现apm上报相关的知识,希望对你有一定的参考价值。
前言
最近刚刚换了工作,进了公司的架构组,有些项目蛮有意思,也是很感兴趣,也会工作之余自行学习,比如说有个自研apm项目
当然在此声明一下,本篇代码属于个人学习编写,并非copy公司代码
对于springcloud,有一套sleuth(主要是traceId+spanId生成)+zipkin(数据统计功能)
skywalking,淘宝的鹰眼,蚂蚁金服sofatrace等等
动手实现apm上报功能
- 首先agent用的是byte-buddy
- 定义一个context方便储存traceid,spanid以及上报的数据
- 只是一个比较简陋的demo,需要后续一些功能优化
代码
编写一个agent
package com.example.demo.agent;
import com.example.demo.interceptor.MyInterceptor;
import net.bytebuddy.agent.builder.AgentBuilder;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.dynamic.DynamicType;
import net.bytebuddy.implementation.MethodDelegation;
import net.bytebuddy.matcher.ElementMatchers;
import net.bytebuddy.utility.JavaModule;
import java.lang.instrument.Instrumentation;
public class MyAgent {
public static final ThreadLocal<TraceContext> LOCAL = new ThreadLocal<>();
public static void premain(String agentArgs, Instrumentation inst) {
System.out.println("this is an perform monitor agent.");
AgentBuilder.Transformer transformer = (builder, typeDescription, classLoader) -> {
return builder
.method(ElementMatchers.any()) // 拦截任意方法
.intercept(MethodDelegation.to(MyInterceptor.class)); // 委托
};
AgentBuilder.Listener listener = new AgentBuilder.Listener() {
@Override
public void onTransformation(TypeDescription typeDescription, ClassLoader classLoader, JavaModule module, DynamicType dynamicType) {
}
@Override
public void onIgnored(TypeDescription typeDescription, ClassLoader classLoader, JavaModule module) {
}
@Override
public void onError(String typeName, ClassLoader classLoader, JavaModule module, Throwable throwable) {
}
@Override
public void onComplete(String typeName, ClassLoader classLoader, JavaModule module) {
}
};
new AgentBuilder
.Default()
.type(ElementMatchers.nameStartsWith("com.example.demo").and(ElementMatchers.not(ElementMatchers.nameStartsWith("com.example.demo.agent"))))
// 指定需要拦截的类
.transform(transformer)
.with(listener)
.installOn(inst);
}
}
然后写下agent拦截下这些类之后需要做的操作
package com.example.demo.interceptor;
import com.alibaba.fastjson.JSON;
import com.example.demo.agent.TraceContext;
import net.bytebuddy.implementation.bind.annotation.Origin;
import net.bytebuddy.implementation.bind.annotation.RuntimeType;
import net.bytebuddy.implementation.bind.annotation.SuperCall;
import java.lang.reflect.Method;
import java.util.concurrent.Callable;
import static com.example.demo.agent.MyAgent.LOCAL;
public class MyInterceptor {
@RuntimeType
public static Object intercept(@Origin Method method,
@SuperCall Callable<?> callable) throws Exception {
long start = System.currentTimeMillis();
try {
// 原有函数执行
return callable.call();
} finally {
TraceContext context = LOCAL.get();
if(context != null){
context.setMethodType(method.getDeclaringClass().getName()+"."+method.getName());
context.setTime("调用方法时间:"+ (System.currentTimeMillis() - start) +"ms");
//上报操作,rpc,这里还需要修改sql等等打印到logback也上报到收集中心
System.out.println(JSON.toJSONString(context));
context.clear();
}
}
}
}
看下自定义上下文context
package com.example.demo.agent;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @author M
*/
@Builder
@Data
@AllArgsConstructor
@NoArgsConstructor
public class TraceContext implements Serializable {
private String traceId;
private String spanId;
private String parentSpaceId;
private String time;
private String methodType;
private String data;
//spanId separator
public static final String RPC_ID_SEPARATOR = ".";
/**
* sub-context counter
*/
private static AtomicInteger childContextIndex = new AtomicInteger(0);
/**
* 如果rpc调用的时候需要将spanid传递成这个方法的值
*
* @return
*/
public String nextChildContextId() {
return this.spanId + RPC_ID_SEPARATOR + childContextIndex.incrementAndGet();
}
public static TraceContext cloneContext(TraceContext context) {
if(context==null){
return new TraceContext();
}
return TraceContext.builder()
.spanId(context.nextChildContextId())
.parentSpaceId(context.getSpanId())
.traceId(context.getTraceId())
.build();
}
public void clear() {
//上报之后需要清理之前的一些数据
this.data = "";
}
}
生成agent jar包
这个自行百度,启动的时候加上 -javaagent:D:\\github\\agent-apm\\out\\artifacts\\MyAgent\\MyAgent.jar
单元测试
package com.example.demo.test;
import com.alibaba.fastjson.JSON;
import com.example.demo.agent.TraceContext;
import java.util.UUID;
import static com.example.demo.agent.MyAgent.LOCAL;
public class AgentTest {
private void fun1() throws Exception {
TraceContext context = LOCAL.get();
if (context != null) {
//由于没有集成sleuth,spaceId需要自己模调用的时候简单的自增
String spaceId = context.getSpanId();
//rpc调用的时候需要+1
context.setSpanId(spaceId);
context.setParentSpaceId(spaceId);
context.setData("fun1需要上报的数据");
}
System.out.println("this is fun 1.");
Thread.sleep(500);
}
private void fun2() throws Exception {
TraceContext context = LOCAL.get();
if (context != null) {
//由于没有集成sleuth,spaceId需要自己模调用的时候简单的自增
String spaceId = context.getSpanId();
//rpc调用的时候需要+1
context.setSpanId(spaceId);
context.setParentSpaceId(spaceId);
if (!"".equals(context.getData())) {
System.out.println("fun2可以拿到之前context上传数据:" + context.getData());
}
}
System.out.println("this is fun 2.");
Thread.sleep(500);
//模拟调用rpc
TraceContext rpcContext = TraceContext.cloneContext(context);
System.out.println("上报rpc context:" + JSON.toJSONString(rpcContext));
}
/**
* 可以重写logback append逻辑,打印日志也上报到收集数据的系统
*
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
//实际开发由sleuth来生成traceId
String traceId = UUID.randomUUID().toString();
String spaceId = "0";
TraceContext context = TraceContext.builder()
.spanId(spaceId)
.parentSpaceId("0")
.traceId(traceId)
.build();
//如果是rpc的话,需要使用拦截器,将context塞到LOCAL里面
LOCAL.set(context);
AgentTest test = new AgentTest();
test.fun1();
test.fun2();
//实际开发需要拦截器去删除本地变量
LOCAL.remove();
}
}
打印结果
this is an perform monitor agent.
this is fun 1.
{"data":"fun1需要上报的数据","methodType":"com.example.demo.test.AgentTest.fun1","parentSpaceId":"0","spanId":"0","time":"调用方法时间:503ms","traceId":"add0ac18-9918-4c59-846d-a04802000bae"}
this is fun 2.
上报rpc context:{"parentSpaceId":"0","spanId":"0.1","traceId":"add0ac18-9918-4c59-846d-a04802000bae"}
{"data":"","methodType":"com.example.demo.test.AgentTest.fun2","parentSpaceId":"0","spanId":"0","time":"调用方法时间:507ms","traceId":"add0ac18-9918-4c59-846d-a04802000bae"}
检验结果
有traceid、spanid可以构建一条调用链路,其次的话会打印方法名,执行时间,可以进行后续相应的代码优化,以及加上了需要自定义上报的数据
总结
我们可以看到打印结果,可以看出同一个traceId,以及同一个应用spanid也是一样的,如果说跨应用,spanid需要重新设置,然后进行传递
spanId生成规则
参考下sofatrace 生成规则
就是上面的0.1,0.1.1 spanId
等待优化点
- rpc部分需要重新,从header头拿到context赋值到threadlocal
- mysql打印sql以及执行时间,也需要重写
- 重写logback append逻辑,我们平时打印的日志也需要上报的数据中心
项目链接
以上是关于agent实现apm上报的主要内容,如果未能解决你的问题,请参考以下文章