datax源码解析-datax的hook机制解析

Posted 犀牛饲养员

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了datax源码解析-datax的hook机制解析相关的知识,希望对你有一定的参考价值。

datax源码解析-datax的hook机制解析

JobContainer的start方法,最后一步调用的是invokeHooks,这个方法就是datax的自定义hook被调用的地方。datax的hook提供了一种机制,可以让开发者再任务执行完成后做一些定制化的事情,比如给任务的负责人发送一条短信提醒之类的。

我们顺着invokeHooks方法来分析下,

private void invokeHooks() 
        Communication comm = super.getContainerCommunicator().collect();
        //配置也要传过去,因为你实现的hook可能需要一些参数,比如accessKey等,配置的格式是可以自己定义的
        HookInvoker invoker = new HookInvoker(CoreConstant.DATAX_HOME + "/hook", configuration, comm.getCounter());
        invoker.invokeAll();
    

HookInvoker是datax实现hook机制的一个管理类,我们看到它接受三个参数,一个是目录,这里存放的是datax主目录+/hook。第二个参数是配置,因为你自己实现的hook可能需要一些外部配置的参数,比如你要实现发送短信功能,可能会把一些api密钥之类的东西放在配置里传进去。第三个参数是用来统计的。

接着看下invokeAll方法,

public void invokeAll() 
        if (!baseDir.exists() || baseDir.isFile()) 
            LOG.info("No hook invoked, because base dir not exists or is a file: " + baseDir.getAbsolutePath());
            return;
        

        //过滤子目录保存在subDirs
        String[] subDirs = baseDir.list(new FilenameFilter() 
            @Override
            public boolean accept(File dir, String name) 
                return new File(dir, name).isDirectory();
            
        );

        //没有子目录,不符合hook目录要求
        if (subDirs == null) 
            throw DataXException.asDataXException(FrameworkErrorCode.HOOK_LOAD_ERROR, "获取HOOK子目录返回null");
        

        /**
         * 子目录的格式示例:
         * META-INF/services/com.example.CodecSet
         */
        for (String subDir : subDirs) 
            doInvoke(new File(baseDir, subDir).getAbsolutePath());
        

    

首先是做一些基本的检查,确保目录结构是合法的。接着扫描给定目录的所有一级子目录,每个子目录当作一个Hook的目录。对于每个子目录,必须符合ServiceLoader的标准目录格式, 关于 ServiceLoader,可以参考java规范:

http://docs.oracle.com/javase/6/docs/api/java/util/ServiceLoader.html

hook的目录结构看起来类似这个样子:

简单来讲,ServiceLoader实现了一种机制,可以动态加载指定目录的实现类并且实例化,它是java SPI机制的重要组成部分。

Java SPI 实际上是“基于接口的编程+策略模式+配置文件”组合实现的动态加载机制,提供了通过interface寻找implement的方法。类似于IOC的思想,将装配的控制权移到程序之外,从而实现解耦。如下图所示:

继续看doInvoke方法,

private void doInvoke(String path) 
        ClassLoader oldClassLoader = Thread.currentThread().getContextClassLoader();
        try 
            JarLoader jarLoader = new JarLoader(new String[]path);
            Thread.currentThread().setContextClassLoader(jarLoader);
            //这里通过 ServiceLoader 机制加载Hook的实现类
            Iterator<Hook> hookIt = ServiceLoader.load(Hook.class).iterator();
            if (!hookIt.hasNext()) 
                LOG.warn("No hook defined under path: " + path);
             else 
                Hook hook = hookIt.next();
                LOG.info("Invoke hook [], path: ", hook.getName(), path);
                hook.invoke(conf, msg);//调用实际的hook实现
            
         catch (Exception e) 
            LOG.error("Exception when invoke hook", e);
            throw DataXException.asDataXException(
                    CommonErrorCode.HOOK_INTERNAL_ERROR, "Exception when invoke hook", e);
         finally 
            Thread.currentThread().setContextClassLoader(oldClassLoader);
        
    

这里首先是把类加载器设置成jar包的加载器,这是继承的应用加载器,然后使用ServiceLoader加载用户自己是实现的Hook接口的实现类并进行实例化。实例化后调用invoke方法执行自定义的逻辑。Hook接口的定义如下:

/**
 * Created by xiafei.qiuxf on 14/12/17.
 * DataX 的 Hook 机制,这里定义了开放的接口
 * https://xie.infoq.cn/article/68102f356019f52560f4b8c70
 */
public interface Hook 

    /**
     * 返回名字
     *
     * @return
     */
    public String getName();

    /**
     * TODO 文档
     *
     * @param jobConf
     * @param msg
     */
    public void invoke(Configuration jobConf, Map<String, Number> msg);


我们自己的实现类示例:

public class SMSReport implements Hook 
	

	@Override
	public String getName() 
		return "SMSReportHook";
	

	@Override
	public void invoke(Configuration configuration, Map<String, Number> map) 
		//调用第三方的短信api发送短信

		....
	

实现类写完后我们打成jar包,在 DataX 根目录下,新建hook文件夹,然后在其下创建sms文件夹,将jar包放在这里就可以了。

我们来总结下:

datax提供了一种Hook机制,可以在执行完核心逻辑后触发一个开发者自己定义的逻辑。实现的原理是利用了java SPI机制,datax定义了一个Hook接口,开发者实现这个接口。通过interface寻找implement的方法。


参考:

  • https://xie.infoq.cn/article/68102f356019f52560f4b8c70
  • https://docs.oracle.com/javase/6/docs/api/java/util/ServiceLoader.html

以上是关于datax源码解析-datax的hook机制解析的主要内容,如果未能解决你的问题,请参考以下文章

datax源码解析-任务拆分机制详解

datax源码解析-任务拆分机制详解

datax源码解析-任务调度机制解析

datax源码解析-任务调度机制解析

datax源码解析-JobContainer的初始化阶段解析

datax源码解析-启动类分析