skywalking plugin 开发初探 ONS plugin 实践

Posted 汪小哥

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了skywalking plugin 开发初探 ONS plugin 实践相关的知识,希望对你有一定的参考价值。

最近支持一下 ONS 内部skywalking 增强支持,RocketMQ 开源版本支持skywalking ,阿里云上的skywalking不支持 简单的实现一下,了解整个实现的逻辑。

一、开发指南

参考文档

官方开发指南
https://skywalking.apache.org/docs/skywalking-java/v8.10.0/en/setup/service-agent/java-agent/java-plugin-development-guide/

https://skywalking.apache.org/zh/2019-01-21-agent-plugin-practice/
非官方文档 很详细
https://www.jianshu.com/p/e5fb4d46c618
https://blog.csdn.net/kaiyuanshe/article/details/109685249

1.1 核心参数传递 ContextCarrier

在客户端,创建一个新的 traceId 所有信息放到HTTP heads、Dubbo attachments 或者Kafka messages。
通过服务调用,traceId 传递。
skywalking 中使用 ContextCarrier 传递 信息
比如: 这里通过 Mq 用户自定义属性传递 skywalking 中的携带信息

Properties userProperties = message.getUserProperties();
CarrierItem next = contextCarrier.items();
while (next.hasNext()) 
    next = next.next();
    if (!StringUtil.isEmpty(next.getHeadValue())) 
        userProperties.setProperty(next.getHeadKey(), next.getHeadValue());
    

1.1.1 跨线程传递 ContextSnapshot

听名字 就有意思 上下文快照,直接粘贴官方的文档 。
Besides cross-process tracing, cross-thread tracing has to be supported as well. For instance, both async process (in-memory MQ) and batch process are common in Java. Cross-process and cross-thread tracing are very similar in that they both require propagating context, except that cross-thread tracing does not require serialization.
Here are the three steps on cross-thread propagation:

  1. Use ContextManager#capture to get the ContextSnapshot object.
  2. Let the sub-thread access the ContextSnapshot through method arguments or being carried by existing arguments
  3. Use ContextManager#continued in sub-thread.

2. 字节码增强

https://skywalking.apache.org/docs/skywalking-java/v8.10.0/en/setup/service-agent/java-agent/java-plugin-development-guide/

字节码增强使用 bytebuddy

可以先去官方的Java agent 代码下载下来湫湫一下子. 很多的例子

 <dependency>
    <groupId>org.apache.skywalking</groupId>
    <artifactId>apm-agent-core</artifactId>
    <version>$sky-agent-version</version>
    <scope>provided</scope>
</dependency>

ClassInstanceMethodsEnhancePluginDefine �
比如增强这个类的实例 com.aliyun.openservices.ons.api.impl.rocketmq.ConsumerImpl


package com.aliyun.openservices.ons.api;

public interface Consumer extends Admin 
    void subscribe(String var1, String var2, MessageListener var3);

    void subscribe(String var1, MessageSelector var2, MessageListener var3);

    void unsubscribe(String var1);

注意这里为什么不能直接增强 MessageListener? 如下写法为lambda 表达式 skywalking不支持
https://blog.csdn.net/weixin_39850981/article/details/118846538 bytebuddy 支持官方没有实现
所以改为覆盖参数进行增强
consumer.subscribe(topic, tag, (msg, context) ->
)

2.1 定义你对哪个class 增强?

NameMatch.byName(ENHANCE_CLASS);

2.2 定义你对哪个方法增强 &构造函数?

ProducerConstructorInterceptor & InstanceMethodsAroundInterceptor 信息订阅
如下为 阿里云ONS 消费者的定义增强

public class ConsumerImplInstrumentation extends ClassInstanceMethodsEnhancePluginDefine 

    private static final String ENHANCE_CLASS = "com.aliyun.openservices.ons.api.impl.rocketmq.ConsumerImpl";
    private static final String CONSUMER_MESSAGE_METHOD = "subscribe";
    private static final String INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.ons.ConsumerImplInterceptor";

    public static final String CONSTRUCTOR_INTERCEPT_TYPE = "java.util.Properties";

    public static final String CONSUMER_CONFIG_CONSTRUCTOR_INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.ons.ConsumerConstructorInterceptor";

    @Override
    protected ClassMatch enhanceClass() 
        return NameMatch.byName(ENHANCE_CLASS);
    

    @Override
    public ConstructorInterceptPoint[] getConstructorsInterceptPoints() 
        return new ConstructorInterceptPoint[]
                new ConstructorInterceptPoint() 
                    @Override
                    public ElementMatcher<MethodDescription> getConstructorMatcher() 
                        return takesArgumentWithType(0, CONSTRUCTOR_INTERCEPT_TYPE);
                    

                    @Override
                    public String getConstructorInterceptor() 
                        return CONSUMER_CONFIG_CONSTRUCTOR_INTERCEPTOR_CLASS;
                    
                

        ;
    

    @Override
    public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() 
        return new InstanceMethodsInterceptPoint[]
                new InstanceMethodsInterceptPoint() 
                    @Override
                    public ElementMatcher<MethodDescription> getMethodsMatcher() 
                        return named(CONSUMER_MESSAGE_METHOD)
                                .and(takesArgumentWithType(1, "java.lang.String"))
                                .and(takesArgumentWithType(2, "com.aliyun.openservices.ons.api.MessageListener"))
                                ;
                    

                    @Override
                    public String getMethodsInterceptor() 
                        return INTERCEPTOR_CLASS;
                    

                    @Override
                    public boolean isOverrideArgs() 
                        return true;
                    
                
        ;
    

2.3 定义增强的处理逻辑

2.3.1 消费者构造函数处理获取参数
/**
 * @link  com.aliyun.openservices.ons.api.impl.rocketmq.ProducerImpl
 *
 * @author wangji
 * @date 2022-04-12 09:46
 */
public class ProducerConstructorInterceptor implements InstanceConstructorInterceptor 
    private static final ILog LOGGER = LogManager.getLogger(ProducerConstructorInterceptor.class);
    @Override
    public void onConstruct(EnhancedInstance objInst, Object[] allArguments) throws Throwable 
        try 
            Properties properties = (Properties) allArguments[0];
            ConfigProducerPropertiesCache cache = new ConfigProducerPropertiesCache();
            cache.setNameServer(properties.getProperty(PropertyKeyConst.NAMESRV_ADDR));
            cache.setGroupId(properties.getProperty(PropertyKeyConst.GROUP_ID));
            cache.setmQType(properties.getProperty(PropertyKeyConst.MQType));
            objInst.setSkyWalkingDynamicField(cache);
         catch (Exception e) 
            LOGGER.error("ProducerImpl ",e);
        
    

2.3.2 消费者实现处理进行参数替换

consumer.subscribe(topic, tag, (msg, context) ->
)
这里为什么进行替换参数是由于内部写法为lambda表达式 so 采用替换参数,debug 看日志的时候发现一直增强不行.

/**
 * @link  com.aliyun.openservices.ons.api.impl.rocketmq.ConsumerImpl
 *
 * @author wangji
 * @date 2022-04-09 18:03
 */
public class ConsumerImplInterceptor implements InstanceMethodsAroundInterceptor 

    @Override
    public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, MethodInterceptResult result) throws Throwable 
        ConfigConsumerPropertiesCache configConsumerPropertiesCache = (ConfigConsumerPropertiesCache) objInst.getSkyWalkingDynamicField();
        Object messageListener = allArguments[2];
        if (null != messageListener) 
            if (messageListener instanceof EnhancedInstance) 

             else if (messageListener instanceof MessageListener) 
                MessageListenerCache messageListenerCache = new MessageListenerCache();
                messageListenerCache.setMessageListener((MessageListener) messageListener);

                messageListenerCache.setConsumerPropertiesCache(configConsumerPropertiesCache);
                allArguments[2] = new MessageListenerAdapterInterceptor(messageListenerCache);
            
        
    

    @Override
    public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
                              Object ret) throws Throwable 
        return ret;
    

    @Override
    public void handleMethodException(EnhancedInstance enhancedInstance, Method method, Object[] objects, Class<?>[] classes, Throwable throwable) 
    


2.3.3 消费者Listener 监听逻辑

从userMessage 中获取 上下文信息,设置 ContextCarrier 信息 创建 Span

/**
 * 消息监听处理..
 *
 * @author wangji
 * @date 2022-04-11 18:42
 */
public class MessageListenerAdapterInterceptor implements MessageListener, EnhancedInstance 

    public static final String CONSUMER_OPERATION_NAME_PREFIX = "ALiYunOns/";

    private MessageListenerCache cache;

    public MessageListenerAdapterInterceptor(MessageListenerCache cache) 
        this.cache = cache;
    

    @Override
    public Action consume(Message message, ConsumeContext context) 
        ContextCarrier contextCarrier = getContextCarrierFromMessage(message);
        ConfigConsumerPropertiesCache consumerPropertiesCache = cache.getConsumerPropertiesCache();
        AbstractSpan span = ContextManager.createEntrySpan(CONSUMER_OPERATION_NAME_PREFIX + message
                .getTopic() + "/Consumer", contextCarrier);

        span.setComponent(ComponentsDefine.ROCKET_MQ_CONSUMER);
        span.setPeer(consumerPropertiesCache.getNameServer());
        SpanLayer.asMQ(span);
        ContextManager.extract(getContextCarrierFromMessage(message));

        StringTag groupIdTag = (StringTag) Tags.ofKey(PropertyKeyConst.GROUP_ID);
        groupIdTag.set(span,consumerPropertiesCache.getGroupId());

        StringTag messageModel = (StringTag) Tags.ofKey(PropertyKeyConst.MessageModel);
        messageModel.set(span,consumerPropertiesCache.getMessageModel());

        StringTag msgIdTag = (StringTag) Tags.ofKey(Message.SystemPropKey.MSGID);
        msgIdTag.set(span,message.getMsgID());

        StringTag tagTag = (StringTag) Tags.ofKey(Message.SystemPropKey.TAG);
        tagTag.set(span,message.getTag());
        if(message.getKey() !=null && message.getKey().length()>0)
            StringTag keyTag = (StringTag) Tags.ofKey(Message.SystemPropKey.KEY);
            keyTag.set(span,message.getKey());
        

        try 
            Action consume = cache.getMessageListener().consume(message, context);
            if (consume != null) 
                AbstractSpan activeSpan = ContextManager.activeSpan();
                Tags.MQ_STATUS.set(activeSpan, consume.name());
                if (consume != Action.CommitMessage) 
                    activeSpan.errorOccurred();
                
            
            return consume;
         catch (Throwable t) 
            ContextManager.activeSpan().log(t);
            if (t instanceof RuntimeException) 
                throw (RuntimeException) t;
             else 
                throw new RuntimeException(t);
            
         finally 
            ContextManager.stopSpan();
        

    

    @Override
    public Object getSkyWalkingDynamicField() 
        return cache;
    

    @Override
    public void setSkyWalkingDynamicField(Object o) 

    

    private ContextCarrier getContextCarrierFromMessage(Message message) 
        ContextCarrier contextCarrier = new ContextCarrier();

        CarrierItem next = contextCarrier.items();
        while (next.hasNext()) 
            next = next.next();
            next.setHeadValue(message.getUserProperties(next.getHeadKey()));
        

        return contextCarrier;
    

3、打包

https://blog.csdn.net/kaiyuanshe/article/details/109685249

3.1 定义增强的配置

src/main/resources/skywalking-plugin.def

# Key=value的形式
# key随便写;value是Instrumentation类的包名类名全路径
aliyun-ons-plugin=org.apache.skywalking.apm.plugin.ons.define.ProducerInstrumentation
aliyun-ons-plugin=org.apache.skywalking.apm.plugin.ons.define.ConsumerImplInstrumentation

3.2 maven shade 定义

因为 skywalking里面增对 bytebuddy 进行了shade 所以也需要处理一下,然后扔进plugin 里面就可以了

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.gongdao</groupId>
    <artifactId>apm-aliyun-mq-ons-plugin</artifactId>
    <version>2022-04-15-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <sky-agent-version>8.9.0</sky-agent-version>
        <bytebuddy.version>1.11.18</bytebuddy.version>
        <shade.package>org.apache.skywalking.apm.dependencies</shade.package>
        <shade.net.bytebuddy.source>net.bytebuddy</shade.net.bytebuddy.source>
        <shade.net.bytebuddy.target>$shade.package.$shade.net.bytebuddy.source</shade.net.bytebuddy.target>
    </properties>
    <dependencies>
        <dependency>
            <groupId>com.aliyun.openservices</groupId>
            <artifactId>ons-client</artifactId>
            <version>1.8.7.1.Final</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.skywalking</groupId>
            <artifactId>apm-agent-core</artifactId>
            <version>$sky-agent-version</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>net.bytebuddy</groupId>
            <artifactId>byte-buddy</artifactId>
            <version>$bytebuddy.version</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.skywalking</groupId>
            <artifactId>java-agent-util</artifactId>
            <version>$sky-agent-version</version>
            skywalking plugin 开发初探 ONS plugin 实践

浅谈skywalking的spring-webflux-plugin

浅谈skywalking的spring-webflux-plugin

Skywalking系列博客3-Java Agent插件

Skywalking系列博客3-Java Agent插件

Skywalking Java 插件开发太简单了