ChunJun任务提交-源码分析
Posted 蒋含竹
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ChunJun任务提交-源码分析相关的知识,希望对你有一定的参考价值。
ChunJun任务提交-源码分析
- 版本 ChunJun 1.12
- 注:阅读过Spark任务提交、Flink任务提交源码的朋友,应该可以看出“从SHELL提交到任务,到反射执行实际的FLINK应用代码(main方法)”部分和Spark、Flink非常相似
任务提交的参数配置解析
- 任务提交命令样例
# 手动执行的内容-样例
sh bin/chunjun-local.sh -job my-examples/task_script_multi_table.json
# chunjun打印的信息-样例
start command: /usr/lib/jvm/java-1.8.0-openjdk-1.8.0.292.b10-1.el7_9.x86_64/bin/java -cp /home/chunjun/chunjun-dist/../lib/* com.dtstack.chunjun.client.Launcher -job my-examples/task_script_multi_table.json -mode local -jobType sync -chunjunDistDir /home/chunjun/chunjun-dist
- 该命令会执行Java,运行class
com.dtstack.chunjun.client.Launcher
的main方法
package com.dtstack.chunjun.client;
// import ...
public class Launcher
// code ...
public static void main(String[] args) throws Exception
OptionParser optionParser = new OptionParser(args);
Options launcherOptions = optionParser.getOptions();
findDefaultConfigDir(launcherOptions);
List<String> argList = optionParser.getProgramExeArgList();
// 将argList转化为HashMap,方便通过参数名称来获取参数值
HashMap<String, String> temp = new HashMap<>(16);
for (int i = 0; i < argList.size(); i += 2)
temp.put(argList.get(i), argList.get(i + 1));
// 清空list,填充修改后的参数值
argList.clear();
for (int i = 0; i < temp.size(); i++)
argList.add(temp.keySet().toArray()[i].toString());
argList.add(temp.values().toArray()[i].toString());
JobDeployer jobDeployer = new JobDeployer(launcherOptions, argList);
ClusterClientHelper clusterClientHelper;
switch (ClusterMode.getByName(launcherOptions.getMode()))
case local:
clusterClientHelper = new LocalClusterClientHelper();
break;
case standalone:
clusterClientHelper = new StandaloneClusterClientHelper();
break;
case yarnSession:
clusterClientHelper = new YarnSessionClusterClientHelper();
break;
case yarnPerJob:
clusterClientHelper = new YarnPerJobClusterClientHelper();
break;
case yarnApplication:
throw new ClusterDeploymentException(
"Application Mode not supported by Yarn deployments.");
case kubernetesSession:
clusterClientHelper = new KubernetesSessionClusterClientHelper();
break;
case kubernetesPerJob:
throw new ClusterDeploymentException(
"Per-Job Mode not supported by Kubernetes deployments.");
case kubernetesApplication:
clusterClientHelper = new KubernetesApplicationClusterClientHelper();
break;
default:
throw new ClusterDeploymentException(
launcherOptions.getMode() + " Mode not supported.");
// add ext class
URLClassLoader urlClassLoader = (URLClassLoader) Launcher.class.getClassLoader();
List<URL> jarUrlList = ExecuteProcessHelper.getExternalJarUrls(launcherOptions.getAddjar());
ClassLoaderManager.loadExtraJar(jarUrlList, urlClassLoader);
clusterClientHelper.submit(jobDeployer);
// code ...
- 当调用
optionParser.getProgramExeArgList()
时,会通过job
参数对应的值(即文件路径)读取文件的内容(sync、sql脚本)
package com.dtstack.chunjun.options;
// import ...
public class OptionParser
@VisibleForTesting protected static final String OPTION_JOB = "job";
// code ...
public List<String> getProgramExeArgList() throws Exception
Map<String, Object> mapConf = MapUtil.objectToMap(properties);
List<String> args = new ArrayList<>();
for (Map.Entry<String, Object> one : mapConf.entrySet())
String key = one.getKey();
Object value = one.getValue();
if (value == null)
continue;
else if (OPTION_JOB.equalsIgnoreCase(key))
File file = new File(value.toString());
try (FileInputStream in = new FileInputStream(file))
byte[] fileContent = new byte[(int) file.length()];
in.read(fileContent);
value =
URLEncoder.encode(
new String(fileContent, StandardCharsets.UTF_8),
StandardCharsets.UTF_8.name());
args.add("-" + key);
args.add(value.toString());
return args;
// code ...
- 不同类型的任务会走不同的
ClusterClientHelper
,例如LocalClusterClientHelper
package com.dtstack.chunjun.client.local;
// import ...
public class LocalClusterClientHelper implements ClusterClientHelper
@Override
public ClusterClient submit(JobDeployer jobDeployer) throws Exception
String[] args = jobDeployer.getProgramArgs().toArray(new String[0]);
Main.main(args);
return null;
- 接着
LocalClusterClientHelper
会执行Main
中的main
方法,传入参数(其他模式下也是类似的,会利用PluginInfoUtil.getMainClass()
获取到要执行的class) - 之前提交的
args
会随着调用传进main
方法,经过解析、处理后,再分别按SQL、SYNC区分任务种类,将replacedJob(即我们编写的任务脚本)传入
package com.dtstack.chunjun;
// import ...
public class Main
public static Logger LOG = LoggerFactory.getLogger(Main.class);
public static void main(String[] args) throws Exception
LOG.info("------------program params-------------------------");
Arrays.stream(args).forEach(arg -> LOG.info("", arg));
LOG.info("-------------------------------------------");
Options options = new OptionParser(args).getOptions();
String job = URLDecoder.decode(options.getJob(), StandardCharsets.UTF_8.name());
String replacedJob = JobUtil.replaceJobParameter(options.getP(), job);
Properties confProperties = PropertiesUtil.parseConf(options.getConfProp());
StreamExecutionEnvironment env = EnvFactory.createStreamExecutionEnvironment(options);
StreamTableEnvironment tEnv =
EnvFactory.createStreamTableEnvironment(env, confProperties, options.getJobName());
LOG.info(
"Register to table configuration:",
tEnv.getConfig().getConfiguration().toString());
switch (EJobType.getByName(options.getJobType()))
case SQL:
exeSqlJob(env, tEnv, replacedJob, options);
break;
case SYNC:
exeSyncJob(env, tEnv, replacedJob, options);
break;
default:
throw new ChunJunRuntimeException(
"unknown jobType: ["
+ options.getJobType()
+ "], jobType must in [SQL, SYNC].");
LOG.info("program execution success", options.getJobName());
// code ...
ChunJun任务-SYNC
- 以
SYNC
为例,接着会调用exeSyncJob
- 对于任务脚本的解析:会再依次调用
parseConf
和SyncConf.parseJob
,最终利用Gson
将任务脚本解析为com.dtstack.chunjun.conf.SyncConf
对象。关键代码如下:- exeSyncJob
package com.dtstack.chunjun;
public class Main
// code ...
private static void exeSyncJob(
StreamExecutionEnvironment env,
StreamTableEnvironment tableEnv,
String job,
Options options)
throws Exception
SyncConf config = parseConf(job, options);
// code ...
// code ...
- parseConf
package com.dtstack.chunjun;
public class Main
// code ...
public static SyncConf parseConf(String job, Options options)
SyncConf config;
try
config = SyncConf.parseJob(job);
// code ...
catch (Exception e)
throw new ChunJunRuntimeException(e);
return config;
// code ...
- SyncConf.parseJob
package com.dtstack.chunjun.conf;
// import ...
public class SyncConf
// code ...
public static SyncConf parseJob(String jobJson)
SyncConf config = GsonUtil.GSON.fromJson(jobJson, SyncConf.class);
checkJob(config);
return config;
// code ...
com.dtstack.chunjun.conf.SyncConf
中有成员变量JobConf job
等
package com.dtstack.chunjun.conf;
// import ...
public class SyncConf implements Serializable
private static final long serialVersionUID = 1L;
/** ChunJun job */
private JobConf job;
/** ChunJun提交端的插件包路径 */
private String pluginRoot;
/** ChunJun运行时服务器上的远程端插件包路径 */
private String remotePluginPath;
private String savePointPath;
/** 本次任务所需插件jar包路径列表 */
private List<String> syncJarList;
// code ...
com.dtstack.chunjun.conf.JobConf
中则有我们配置的任务脚本中的content
、setting
- 可以看到
content
虽然在脚本中是JSONArray,但解析时目前只处理了第一条配置
- 可以看到
package com.dtstack.chunjun.conf;
// import ...
public class JobConf implements Serializable
private static final long serialVersionUID = 1L;
private LinkedList<ContentConf> content;
private SettingConf setting = new SettingConf();
public OperatorConf getReader()
return content.get(0).getReader();
public OperatorConf getWriter()
return content.get(0).getWriter();
public CdcConf getCdcConf()
return content.get(0).getRestoration();
public MappingConf getNameMapping()
return content.get(0).getNameMapping();
public TransformerConf getTransformer()
return content.get(0).getTransformer();
public LinkedList<ContentConf> getContent()
return content;
public void setContent(LinkedList<ContentConf> content)
this.content = content;
public SettingConf getSetting()
return setting;
public void setSetting(SettingConf setting)
this.setting = setting;
@Override
public String toString()
return "JobConf" + "content=" + content + ", setting=" + setting + '';
- 跳回
com.dtstack.chunjun.Main
,再看exeSyncJob
方法
package com.dtstack.chunjun;
// import ...
public class Main
// code ...
private static void exeSyncJob(
StreamExecutionEnvironment env,
StreamTableEnvironment tableEnv,
String job,
Options options)
throws Exception
SyncConf config = parseConf(job, options);
configStreamExecutionEnvironment(env, options, config);
SourceFactory sourceFactory = DataSyncFactoryUtil.discoverSource(config, env);
DataStream<RowData> dataStreamSource = sourceFactory.createSource();
SpeedConf speed = config.getSpeed();
if (speed.getReaderChannel() > 0)
dataStreamSource =
((DataStreamSource<RowData>) dataStreamSource)
.setParallelism(speed.getReaderChannel());
dataStreamSource = addMappingOperator(config, dataStreamSource);
if (null != config.getCdcConf()
&& (null != config.getCdcConf().getDdl()
&& null != config.getCdcConf().getCache()))
CdcConf cdcConf = config.getCdcConf();
DDLHandler ddlHandler = DataSyncFactoryUtil.discoverDdlHandler(cdcConf, config);
纯钧chunjun的http-x插件修复