agent实现apm上报

Posted go大鸡腿

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了agent实现apm上报相关的知识,希望对你有一定的参考价值。

前言

最近刚刚换了工作,进了公司的架构组,有些项目蛮有意思,也是很感兴趣,也会工作之余自行学习,比如说有个自研apm项目

当然在此声明一下,本篇代码属于个人学习编写,并非copy公司代码

对于springcloud,有一套sleuth(主要是traceId+spanId生成)+zipkin(数据统计功能)
skywalking,淘宝的鹰眼,蚂蚁金服sofatrace等等

动手实现apm上报功能

  1. 首先agent用的是byte-buddy
  2. 定义一个context方便储存traceid,spanid以及上报的数据
  3. 只是一个比较简陋的demo,需要后续一些功能优化

代码

编写一个agent

package com.example.demo.agent;

import com.example.demo.interceptor.MyInterceptor;
import net.bytebuddy.agent.builder.AgentBuilder;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.dynamic.DynamicType;
import net.bytebuddy.implementation.MethodDelegation;
import net.bytebuddy.matcher.ElementMatchers;
import net.bytebuddy.utility.JavaModule;

import java.lang.instrument.Instrumentation;

public class MyAgent {

    public static final ThreadLocal<TraceContext> LOCAL = new ThreadLocal<>();

    public static void premain(String agentArgs, Instrumentation inst) {
        System.out.println("this is an perform monitor agent.");

        AgentBuilder.Transformer transformer = (builder, typeDescription, classLoader) -> {
            return builder
                    .method(ElementMatchers.any()) // 拦截任意方法
                    .intercept(MethodDelegation.to(MyInterceptor.class)); // 委托
        };

        AgentBuilder.Listener listener = new AgentBuilder.Listener() {
            @Override
            public void onTransformation(TypeDescription typeDescription, ClassLoader classLoader, JavaModule module, DynamicType dynamicType) {
            }

            @Override
            public void onIgnored(TypeDescription typeDescription, ClassLoader classLoader, JavaModule module) {
            }

            @Override
            public void onError(String typeName, ClassLoader classLoader, JavaModule module, Throwable throwable) {
            }

            @Override
            public void onComplete(String typeName, ClassLoader classLoader, JavaModule module) {
            }
        };

        new AgentBuilder
                .Default()
                .type(ElementMatchers.nameStartsWith("com.example.demo").and(ElementMatchers.not(ElementMatchers.nameStartsWith("com.example.demo.agent"))))
                // 指定需要拦截的类
                .transform(transformer)
                .with(listener)
                .installOn(inst);
    }
}

然后写下agent拦截下这些类之后需要做的操作

package com.example.demo.interceptor;

import com.alibaba.fastjson.JSON;
import com.example.demo.agent.TraceContext;
import net.bytebuddy.implementation.bind.annotation.Origin;
import net.bytebuddy.implementation.bind.annotation.RuntimeType;
import net.bytebuddy.implementation.bind.annotation.SuperCall;

import java.lang.reflect.Method;
import java.util.concurrent.Callable;

import static com.example.demo.agent.MyAgent.LOCAL;

public class MyInterceptor {

    @RuntimeType
    public static Object intercept(@Origin Method method,
                                   @SuperCall Callable<?> callable) throws Exception {
        long start = System.currentTimeMillis();
        try {
            // 原有函数执行
            return callable.call();
        } finally {
            TraceContext context = LOCAL.get();
            if(context != null){
                context.setMethodType(method.getDeclaringClass().getName()+"."+method.getName());
                context.setTime("调用方法时间:"+ (System.currentTimeMillis() - start) +"ms");
                //上报操作,rpc,这里还需要修改sql等等打印到logback也上报到收集中心
                System.out.println(JSON.toJSONString(context));
                context.clear();
            }
        }
    }

}

看下自定义上下文context

package com.example.demo.agent;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.io.Serializable;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * @author M
 */
@Builder
@Data
@AllArgsConstructor
@NoArgsConstructor
public class TraceContext implements Serializable {
    private String traceId;
    private String spanId;
    private String parentSpaceId;
    private String time;
    private String methodType;
    private String data;
    //spanId separator
    public static final String RPC_ID_SEPARATOR = ".";

    /**
     * sub-context counter
     */
    private static AtomicInteger childContextIndex = new AtomicInteger(0);

    /**
     * 如果rpc调用的时候需要将spanid传递成这个方法的值
     *
     * @return
     */
    public String nextChildContextId() {
        return this.spanId + RPC_ID_SEPARATOR + childContextIndex.incrementAndGet();
    }

    public static TraceContext cloneContext(TraceContext context) {
        if(context==null){
            return new TraceContext();
        }
        return TraceContext.builder()
                .spanId(context.nextChildContextId())
                .parentSpaceId(context.getSpanId())
                .traceId(context.getTraceId())
                .build();
    }

    public void clear() {
        //上报之后需要清理之前的一些数据
        this.data = "";
    }
}

生成agent jar包

这个自行百度,启动的时候加上 -javaagent:D:\\github\\agent-apm\\out\\artifacts\\MyAgent\\MyAgent.jar

单元测试

package com.example.demo.test;

import com.alibaba.fastjson.JSON;
import com.example.demo.agent.TraceContext;

import java.util.UUID;

import static com.example.demo.agent.MyAgent.LOCAL;

public class AgentTest {

    private void fun1() throws Exception {
        TraceContext context = LOCAL.get();
        if (context != null) {
            //由于没有集成sleuth,spaceId需要自己模调用的时候简单的自增
            String spaceId = context.getSpanId();
            //rpc调用的时候需要+1
            context.setSpanId(spaceId);
            context.setParentSpaceId(spaceId);
            context.setData("fun1需要上报的数据");
        }
        System.out.println("this is fun 1.");
        Thread.sleep(500);
    }

    private void fun2() throws Exception {
        TraceContext context = LOCAL.get();
        if (context != null) {
            //由于没有集成sleuth,spaceId需要自己模调用的时候简单的自增
            String spaceId = context.getSpanId();
            //rpc调用的时候需要+1
            context.setSpanId(spaceId);
            context.setParentSpaceId(spaceId);
            if (!"".equals(context.getData())) {
                System.out.println("fun2可以拿到之前context上传数据:" + context.getData());
            }
        }
        System.out.println("this is fun 2.");
        Thread.sleep(500);
        //模拟调用rpc
        TraceContext rpcContext = TraceContext.cloneContext(context);
        System.out.println("上报rpc context:" + JSON.toJSONString(rpcContext));

    }


    /**
     * 可以重写logback append逻辑,打印日志也上报到收集数据的系统
     *
     * @param args
     * @throws Exception
     */
    public static void main(String[] args) throws Exception {
        //实际开发由sleuth来生成traceId
        String traceId = UUID.randomUUID().toString();
        String spaceId = "0";
        TraceContext context = TraceContext.builder()
                .spanId(spaceId)
                .parentSpaceId("0")
                .traceId(traceId)
                .build();
        //如果是rpc的话,需要使用拦截器,将context塞到LOCAL里面
        LOCAL.set(context);
        AgentTest test = new AgentTest();
        test.fun1();
        test.fun2();
        //实际开发需要拦截器去删除本地变量
        LOCAL.remove();
    }
}

打印结果

this is an perform monitor agent.
this is fun 1.
{"data":"fun1需要上报的数据","methodType":"com.example.demo.test.AgentTest.fun1","parentSpaceId":"0","spanId":"0","time":"调用方法时间:503ms","traceId":"add0ac18-9918-4c59-846d-a04802000bae"}
this is fun 2.
上报rpc context:{"parentSpaceId":"0","spanId":"0.1","traceId":"add0ac18-9918-4c59-846d-a04802000bae"}
{"data":"","methodType":"com.example.demo.test.AgentTest.fun2","parentSpaceId":"0","spanId":"0","time":"调用方法时间:507ms","traceId":"add0ac18-9918-4c59-846d-a04802000bae"}

检验结果

有traceid、spanid可以构建一条调用链路,其次的话会打印方法名,执行时间,可以进行后续相应的代码优化,以及加上了需要自定义上报的数据

总结

我们可以看到打印结果,可以看出同一个traceId,以及同一个应用spanid也是一样的,如果说跨应用,spanid需要重新设置,然后进行传递

spanId生成规则

参考下sofatrace 生成规则

就是上面的0.1,0.1.1 spanId

等待优化点

  1. rpc部分需要重新,从header头拿到context赋值到threadlocal
  2. mysql打印sql以及执行时间,也需要重写
  3. 重写logback append逻辑,我们平时打印的日志也需要上报的数据中心

项目链接

github:https://github.com/dajitui/agent-apm

以上是关于agent实现apm上报的主要内容,如果未能解决你的问题,请参考以下文章

基于Java Agent实现APM

基于Java Agent实现APM

apm中间件切入点

apm中间件切入点

打造一个通用可配置多句柄的数据上报 SDK

Kustomize 生产实战-注入监控 APM Agent