skywalking plugin 开发初探 ONS plugin 实践
Posted 汪小哥
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了skywalking plugin 开发初探 ONS plugin 实践相关的知识,希望对你有一定的参考价值。
最近支持一下 ONS 内部skywalking 增强支持,RocketMQ 开源版本支持skywalking ,阿里云上的skywalking不支持 简单的实现一下,了解整个实现的逻辑。
一、开发指南
参考文档
https://skywalking.apache.org/zh/2019-01-21-agent-plugin-practice/
非官方文档 很详细
https://www.jianshu.com/p/e5fb4d46c618
https://blog.csdn.net/kaiyuanshe/article/details/109685249
1.1 核心参数传递 ContextCarrier
在客户端,创建一个新的 traceId 所有信息放到HTTP heads、Dubbo attachments 或者Kafka messages。
通过服务调用,traceId 传递。
skywalking 中使用 ContextCarrier 传递 信息
比如: 这里通过 Mq 用户自定义属性传递 skywalking 中的携带信息
Properties userProperties = message.getUserProperties();
CarrierItem next = contextCarrier.items();
while (next.hasNext())
next = next.next();
if (!StringUtil.isEmpty(next.getHeadValue()))
userProperties.setProperty(next.getHeadKey(), next.getHeadValue());
1.1.1 跨线程传递 ContextSnapshot
听名字 就有意思 上下文快照,直接粘贴官方的文档 。
Besides cross-process tracing, cross-thread tracing has to be supported as well. For instance, both async process (in-memory MQ) and batch process are common in Java. Cross-process and cross-thread tracing are very similar in that they both require propagating context, except that cross-thread tracing does not require serialization.
Here are the three steps on cross-thread propagation:
- Use ContextManager#capture to get the ContextSnapshot object.
- Let the sub-thread access the ContextSnapshot through method arguments or being carried by existing arguments
- Use ContextManager#continued in sub-thread.
2. 字节码增强
字节码增强使用 bytebuddy
可以先去官方的Java agent 代码下载下来湫湫一下子. 很多的例子
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>apm-agent-core</artifactId>
<version>$sky-agent-version</version>
<scope>provided</scope>
</dependency>
ClassInstanceMethodsEnhancePluginDefine �
比如增强这个类的实例 com.aliyun.openservices.ons.api.impl.rocketmq.ConsumerImpl
package com.aliyun.openservices.ons.api;
public interface Consumer extends Admin
void subscribe(String var1, String var2, MessageListener var3);
void subscribe(String var1, MessageSelector var2, MessageListener var3);
void unsubscribe(String var1);
注意这里为什么不能直接增强 MessageListener? 如下写法为lambda 表达式 skywalking不支持
https://blog.csdn.net/weixin_39850981/article/details/118846538 bytebuddy 支持官方没有实现
所以改为覆盖参数进行增强
consumer.subscribe(topic, tag, (msg, context) ->
)
2.1 定义你对哪个class 增强?
NameMatch.byName(ENHANCE_CLASS);
2.2 定义你对哪个方法增强 &构造函数?
ProducerConstructorInterceptor & InstanceMethodsAroundInterceptor 信息订阅
如下为 阿里云ONS 消费者的定义增强
public class ConsumerImplInstrumentation extends ClassInstanceMethodsEnhancePluginDefine
private static final String ENHANCE_CLASS = "com.aliyun.openservices.ons.api.impl.rocketmq.ConsumerImpl";
private static final String CONSUMER_MESSAGE_METHOD = "subscribe";
private static final String INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.ons.ConsumerImplInterceptor";
public static final String CONSTRUCTOR_INTERCEPT_TYPE = "java.util.Properties";
public static final String CONSUMER_CONFIG_CONSTRUCTOR_INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.ons.ConsumerConstructorInterceptor";
@Override
protected ClassMatch enhanceClass()
return NameMatch.byName(ENHANCE_CLASS);
@Override
public ConstructorInterceptPoint[] getConstructorsInterceptPoints()
return new ConstructorInterceptPoint[]
new ConstructorInterceptPoint()
@Override
public ElementMatcher<MethodDescription> getConstructorMatcher()
return takesArgumentWithType(0, CONSTRUCTOR_INTERCEPT_TYPE);
@Override
public String getConstructorInterceptor()
return CONSUMER_CONFIG_CONSTRUCTOR_INTERCEPTOR_CLASS;
;
@Override
public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints()
return new InstanceMethodsInterceptPoint[]
new InstanceMethodsInterceptPoint()
@Override
public ElementMatcher<MethodDescription> getMethodsMatcher()
return named(CONSUMER_MESSAGE_METHOD)
.and(takesArgumentWithType(1, "java.lang.String"))
.and(takesArgumentWithType(2, "com.aliyun.openservices.ons.api.MessageListener"))
;
@Override
public String getMethodsInterceptor()
return INTERCEPTOR_CLASS;
@Override
public boolean isOverrideArgs()
return true;
;
2.3 定义增强的处理逻辑
2.3.1 消费者构造函数处理获取参数
/**
* @link com.aliyun.openservices.ons.api.impl.rocketmq.ProducerImpl
*
* @author wangji
* @date 2022-04-12 09:46
*/
public class ProducerConstructorInterceptor implements InstanceConstructorInterceptor
private static final ILog LOGGER = LogManager.getLogger(ProducerConstructorInterceptor.class);
@Override
public void onConstruct(EnhancedInstance objInst, Object[] allArguments) throws Throwable
try
Properties properties = (Properties) allArguments[0];
ConfigProducerPropertiesCache cache = new ConfigProducerPropertiesCache();
cache.setNameServer(properties.getProperty(PropertyKeyConst.NAMESRV_ADDR));
cache.setGroupId(properties.getProperty(PropertyKeyConst.GROUP_ID));
cache.setmQType(properties.getProperty(PropertyKeyConst.MQType));
objInst.setSkyWalkingDynamicField(cache);
catch (Exception e)
LOGGER.error("ProducerImpl ",e);
2.3.2 消费者实现处理进行参数替换
consumer.subscribe(topic, tag, (msg, context) ->
)
这里为什么进行替换参数是由于内部写法为lambda表达式 so 采用替换参数,debug 看日志的时候发现一直增强不行.
/**
* @link com.aliyun.openservices.ons.api.impl.rocketmq.ConsumerImpl
*
* @author wangji
* @date 2022-04-09 18:03
*/
public class ConsumerImplInterceptor implements InstanceMethodsAroundInterceptor
@Override
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, MethodInterceptResult result) throws Throwable
ConfigConsumerPropertiesCache configConsumerPropertiesCache = (ConfigConsumerPropertiesCache) objInst.getSkyWalkingDynamicField();
Object messageListener = allArguments[2];
if (null != messageListener)
if (messageListener instanceof EnhancedInstance)
else if (messageListener instanceof MessageListener)
MessageListenerCache messageListenerCache = new MessageListenerCache();
messageListenerCache.setMessageListener((MessageListener) messageListener);
messageListenerCache.setConsumerPropertiesCache(configConsumerPropertiesCache);
allArguments[2] = new MessageListenerAdapterInterceptor(messageListenerCache);
@Override
public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
Object ret) throws Throwable
return ret;
@Override
public void handleMethodException(EnhancedInstance enhancedInstance, Method method, Object[] objects, Class<?>[] classes, Throwable throwable)
2.3.3 消费者Listener 监听逻辑
从userMessage 中获取 上下文信息,设置 ContextCarrier 信息 创建 Span
/**
* 消息监听处理..
*
* @author wangji
* @date 2022-04-11 18:42
*/
public class MessageListenerAdapterInterceptor implements MessageListener, EnhancedInstance
public static final String CONSUMER_OPERATION_NAME_PREFIX = "ALiYunOns/";
private MessageListenerCache cache;
public MessageListenerAdapterInterceptor(MessageListenerCache cache)
this.cache = cache;
@Override
public Action consume(Message message, ConsumeContext context)
ContextCarrier contextCarrier = getContextCarrierFromMessage(message);
ConfigConsumerPropertiesCache consumerPropertiesCache = cache.getConsumerPropertiesCache();
AbstractSpan span = ContextManager.createEntrySpan(CONSUMER_OPERATION_NAME_PREFIX + message
.getTopic() + "/Consumer", contextCarrier);
span.setComponent(ComponentsDefine.ROCKET_MQ_CONSUMER);
span.setPeer(consumerPropertiesCache.getNameServer());
SpanLayer.asMQ(span);
ContextManager.extract(getContextCarrierFromMessage(message));
StringTag groupIdTag = (StringTag) Tags.ofKey(PropertyKeyConst.GROUP_ID);
groupIdTag.set(span,consumerPropertiesCache.getGroupId());
StringTag messageModel = (StringTag) Tags.ofKey(PropertyKeyConst.MessageModel);
messageModel.set(span,consumerPropertiesCache.getMessageModel());
StringTag msgIdTag = (StringTag) Tags.ofKey(Message.SystemPropKey.MSGID);
msgIdTag.set(span,message.getMsgID());
StringTag tagTag = (StringTag) Tags.ofKey(Message.SystemPropKey.TAG);
tagTag.set(span,message.getTag());
if(message.getKey() !=null && message.getKey().length()>0)
StringTag keyTag = (StringTag) Tags.ofKey(Message.SystemPropKey.KEY);
keyTag.set(span,message.getKey());
try
Action consume = cache.getMessageListener().consume(message, context);
if (consume != null)
AbstractSpan activeSpan = ContextManager.activeSpan();
Tags.MQ_STATUS.set(activeSpan, consume.name());
if (consume != Action.CommitMessage)
activeSpan.errorOccurred();
return consume;
catch (Throwable t)
ContextManager.activeSpan().log(t);
if (t instanceof RuntimeException)
throw (RuntimeException) t;
else
throw new RuntimeException(t);
finally
ContextManager.stopSpan();
@Override
public Object getSkyWalkingDynamicField()
return cache;
@Override
public void setSkyWalkingDynamicField(Object o)
private ContextCarrier getContextCarrierFromMessage(Message message)
ContextCarrier contextCarrier = new ContextCarrier();
CarrierItem next = contextCarrier.items();
while (next.hasNext())
next = next.next();
next.setHeadValue(message.getUserProperties(next.getHeadKey()));
return contextCarrier;
3、打包
3.1 定义增强的配置
src/main/resources/skywalking-plugin.def
# Key=value的形式
# key随便写;value是Instrumentation类的包名类名全路径
aliyun-ons-plugin=org.apache.skywalking.apm.plugin.ons.define.ProducerInstrumentation
aliyun-ons-plugin=org.apache.skywalking.apm.plugin.ons.define.ConsumerImplInstrumentation
3.2 maven shade 定义
因为 skywalking里面增对 bytebuddy 进行了shade 所以也需要处理一下,然后扔进plugin 里面就可以了
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.gongdao</groupId>
<artifactId>apm-aliyun-mq-ons-plugin</artifactId>
<version>2022-04-15-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<sky-agent-version>8.9.0</sky-agent-version>
<bytebuddy.version>1.11.18</bytebuddy.version>
<shade.package>org.apache.skywalking.apm.dependencies</shade.package>
<shade.net.bytebuddy.source>net.bytebuddy</shade.net.bytebuddy.source>
<shade.net.bytebuddy.target>$shade.package.$shade.net.bytebuddy.source</shade.net.bytebuddy.target>
</properties>
<dependencies>
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>ons-client</artifactId>
<version>1.8.7.1.Final</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>apm-agent-core</artifactId>
<version>$sky-agent-version</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>net.bytebuddy</groupId>
<artifactId>byte-buddy</artifactId>
<version>$bytebuddy.version</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>java-agent-util</artifactId>
<version>$sky-agent-version</version>
skywalking plugin 开发初探 ONS plugin 实践
浅谈skywalking的spring-webflux-plugin