ChunJun任务提交-源码分析

Posted 蒋含竹

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ChunJun任务提交-源码分析相关的知识,希望对你有一定的参考价值。

ChunJun任务提交-源码分析

  • 版本 ChunJun 1.12
  • 注:阅读过Spark任务提交、Flink任务提交源码的朋友,应该可以看出“从SHELL提交到任务,到反射执行实际的FLINK应用代码(main方法)”部分和Spark、Flink非常相似

任务提交的参数配置解析

  1. 任务提交命令样例
# 手动执行的内容-样例
sh bin/chunjun-local.sh  -job my-examples/task_script_multi_table.json

# chunjun打印的信息-样例
start command: /usr/lib/jvm/java-1.8.0-openjdk-1.8.0.292.b10-1.el7_9.x86_64/bin/java -cp /home/chunjun/chunjun-dist/../lib/* com.dtstack.chunjun.client.Launcher -job my-examples/task_script_multi_table.json -mode local -jobType sync -chunjunDistDir /home/chunjun/chunjun-dist
  1. 该命令会执行Java,运行class com.dtstack.chunjun.client.Launcher的main方法
package com.dtstack.chunjun.client;

// import ...

public class Launcher 
    // code ...

    public static void main(String[] args) throws Exception 
        OptionParser optionParser = new OptionParser(args);
        Options launcherOptions = optionParser.getOptions();

        findDefaultConfigDir(launcherOptions);

        List<String> argList = optionParser.getProgramExeArgList();

        // 将argList转化为HashMap,方便通过参数名称来获取参数值
        HashMap<String, String> temp = new HashMap<>(16);
        for (int i = 0; i < argList.size(); i += 2) 
            temp.put(argList.get(i), argList.get(i + 1));
        

        // 清空list,填充修改后的参数值
        argList.clear();
        for (int i = 0; i < temp.size(); i++) 
            argList.add(temp.keySet().toArray()[i].toString());
            argList.add(temp.values().toArray()[i].toString());
        

        JobDeployer jobDeployer = new JobDeployer(launcherOptions, argList);

        ClusterClientHelper clusterClientHelper;
        switch (ClusterMode.getByName(launcherOptions.getMode())) 
            case local:
                clusterClientHelper = new LocalClusterClientHelper();
                break;
            case standalone:
                clusterClientHelper = new StandaloneClusterClientHelper();
                break;
            case yarnSession:
                clusterClientHelper = new YarnSessionClusterClientHelper();
                break;
            case yarnPerJob:
                clusterClientHelper = new YarnPerJobClusterClientHelper();
                break;
            case yarnApplication:
                throw new ClusterDeploymentException(
                        "Application Mode not supported by Yarn deployments.");
            case kubernetesSession:
                clusterClientHelper = new KubernetesSessionClusterClientHelper();
                break;
            case kubernetesPerJob:
                throw new ClusterDeploymentException(
                        "Per-Job Mode not supported by Kubernetes deployments.");
            case kubernetesApplication:
                clusterClientHelper = new KubernetesApplicationClusterClientHelper();
                break;
            default:
                throw new ClusterDeploymentException(
                        launcherOptions.getMode() + " Mode not supported.");
        

        // add ext class
        URLClassLoader urlClassLoader = (URLClassLoader) Launcher.class.getClassLoader();
        List<URL> jarUrlList = ExecuteProcessHelper.getExternalJarUrls(launcherOptions.getAddjar());
        ClassLoaderManager.loadExtraJar(jarUrlList, urlClassLoader);
        clusterClientHelper.submit(jobDeployer);
    

    // code ...

  1. 当调用optionParser.getProgramExeArgList()时,会通过job参数对应的值(即文件路径)读取文件的内容(sync、sql脚本)
package com.dtstack.chunjun.options;

// import ...

public class OptionParser 

    @VisibleForTesting protected static final String OPTION_JOB = "job";

    // code ...

    public List<String> getProgramExeArgList() throws Exception 
        Map<String, Object> mapConf = MapUtil.objectToMap(properties);
        List<String> args = new ArrayList<>();
        for (Map.Entry<String, Object> one : mapConf.entrySet()) 
            String key = one.getKey();
            Object value = one.getValue();
            if (value == null) 
                continue;
             else if (OPTION_JOB.equalsIgnoreCase(key)) 
                File file = new File(value.toString());
                try (FileInputStream in = new FileInputStream(file)) 
                    byte[] fileContent = new byte[(int) file.length()];
                    in.read(fileContent);
                    value =
                            URLEncoder.encode(
                                    new String(fileContent, StandardCharsets.UTF_8),
                                    StandardCharsets.UTF_8.name());
                
            
            args.add("-" + key);
            args.add(value.toString());
        
        return args;
    

    // code ...

  1. 不同类型的任务会走不同的ClusterClientHelper,例如LocalClusterClientHelper
package com.dtstack.chunjun.client.local;

// import ...

public class LocalClusterClientHelper implements ClusterClientHelper 
    @Override
    public ClusterClient submit(JobDeployer jobDeployer) throws Exception 
        String[] args = jobDeployer.getProgramArgs().toArray(new String[0]);
        Main.main(args);
        return null;
    

  1. 接着LocalClusterClientHelper会执行Main中的main方法,传入参数(其他模式下也是类似的,会利用PluginInfoUtil.getMainClass()获取到要执行的class)
  2. 之前提交的args会随着调用传进main方法,经过解析、处理后,再分别按SQL、SYNC区分任务种类,将replacedJob(即我们编写的任务脚本)传入
package com.dtstack.chunjun;

// import ...

public class Main 

    public static Logger LOG = LoggerFactory.getLogger(Main.class);

    public static void main(String[] args) throws Exception 
        LOG.info("------------program params-------------------------");
        Arrays.stream(args).forEach(arg -> LOG.info("", arg));
        LOG.info("-------------------------------------------");

        Options options = new OptionParser(args).getOptions();
        String job = URLDecoder.decode(options.getJob(), StandardCharsets.UTF_8.name());
        String replacedJob = JobUtil.replaceJobParameter(options.getP(), job);
        Properties confProperties = PropertiesUtil.parseConf(options.getConfProp());
        StreamExecutionEnvironment env = EnvFactory.createStreamExecutionEnvironment(options);
        StreamTableEnvironment tEnv =
                EnvFactory.createStreamTableEnvironment(env, confProperties, options.getJobName());
        LOG.info(
                "Register to table configuration:",
                tEnv.getConfig().getConfiguration().toString());
        switch (EJobType.getByName(options.getJobType())) 
            case SQL:
                exeSqlJob(env, tEnv, replacedJob, options);
                break;
            case SYNC:
                exeSyncJob(env, tEnv, replacedJob, options);
                break;
            default:
                throw new ChunJunRuntimeException(
                        "unknown jobType: ["
                                + options.getJobType()
                                + "], jobType must in [SQL, SYNC].");
        

        LOG.info("program  execution success", options.getJobName());
    

    // code ...

ChunJun任务-SYNC

  1. SYNC为例,接着会调用exeSyncJob
  2. 对于任务脚本的解析:会再依次调用parseConfSyncConf.parseJob,最终利用Gson将任务脚本解析为com.dtstack.chunjun.conf.SyncConf对象。关键代码如下:
    • exeSyncJob
package com.dtstack.chunjun;

public class Main 
    // code ...

    private static void exeSyncJob(
            StreamExecutionEnvironment env,
            StreamTableEnvironment tableEnv,
            String job,
            Options options)
            throws Exception 
        SyncConf config = parseConf(job, options);

        // code ...
    

    // code ...

  • parseConf
package com.dtstack.chunjun;

public class Main 
    // code ...

    public static SyncConf parseConf(String job, Options options) 
        SyncConf config;
        try 
            config = SyncConf.parseJob(job);

            // code ...
         catch (Exception e) 
            throw new ChunJunRuntimeException(e);
        
        return config;
    

    // code ...

  • SyncConf.parseJob
package com.dtstack.chunjun.conf;

// import ...

public class SyncConf 
    // code ...

    public static SyncConf parseJob(String jobJson) 
        SyncConf config = GsonUtil.GSON.fromJson(jobJson, SyncConf.class);
        checkJob(config);
        return config;
    

    // code ...

  1. com.dtstack.chunjun.conf.SyncConf中有成员变量JobConf job
 package com.dtstack.chunjun.conf;

 // import ...

public class SyncConf implements Serializable 
    private static final long serialVersionUID = 1L;

    /** ChunJun job */
    private JobConf job;

    /** ChunJun提交端的插件包路径 */
    private String pluginRoot;
    /** ChunJun运行时服务器上的远程端插件包路径 */
    private String remotePluginPath;

    private String savePointPath;

    /** 本次任务所需插件jar包路径列表 */
    private List<String> syncJarList;

    // code ...

  1. com.dtstack.chunjun.conf.JobConf中则有我们配置的任务脚本中的contentsetting
    • 可以看到content虽然在脚本中是JSONArray,但解析时目前只处理了第一条配置
package com.dtstack.chunjun.conf;

// import ...

public class JobConf implements Serializable 
    private static final long serialVersionUID = 1L;

    private LinkedList<ContentConf> content;
    private SettingConf setting = new SettingConf();

    public OperatorConf getReader() 
        return content.get(0).getReader();
    

    public OperatorConf getWriter() 
        return content.get(0).getWriter();
    

    public CdcConf getCdcConf() 
        return content.get(0).getRestoration();
    

    public MappingConf getNameMapping() 
        return content.get(0).getNameMapping();
    

    public TransformerConf getTransformer() 
        return content.get(0).getTransformer();
    

    public LinkedList<ContentConf> getContent() 
        return content;
    

    public void setContent(LinkedList<ContentConf> content) 
        this.content = content;
    

    public SettingConf getSetting() 
        return setting;
    

    public void setSetting(SettingConf setting) 
        this.setting = setting;
    

    @Override
    public String toString() 
        return "JobConf" + "content=" + content + ", setting=" + setting + '';
    

  1. 跳回com.dtstack.chunjun.Main,再看exeSyncJob方法
package com.dtstack.chunjun;

// import ...

public class Main 

    // code ...

    private static void exeSyncJob(
            StreamExecutionEnvironment env,
            StreamTableEnvironment tableEnv,
            String job,
            Options options)
            throws Exception 
        SyncConf config = parseConf(job, options);
        configStreamExecutionEnvironment(env, options, config);

        SourceFactory sourceFactory = DataSyncFactoryUtil.discoverSource(config, env);
        DataStream<RowData> dataStreamSource = sourceFactory.createSource();
        SpeedConf speed = config.getSpeed();
        if (speed.getReaderChannel() > 0) 
            dataStreamSource =
                    ((DataStreamSource<RowData>) dataStreamSource)
                            .setParallelism(speed.getReaderChannel());
        

        dataStreamSource = addMappingOperator(config, dataStreamSource);

        if (null != config.getCdcConf()
                && (null != config.getCdcConf().getDdl()
                        && null != config.getCdcConf().getCache())) 
            CdcConf cdcConf = config.getCdcConf();
            DDLHandler ddlHandler = DataSyncFactoryUtil.discoverDdlHandler(cdcConf, config);

            纯钧chunjun的http-x插件修复

MapReduce与Yarn 的详细工作流程分析

spark任务运行过程的源码分析

MapReduce任务提交源码分析

Yarn任务提交流程(源码分析)

1. spark-2.4.6源码分析(基于yarn cluster模式)-任务提交