datax源码解析-JobContainer的初始化阶段解析
Posted 犀牛饲养员
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了datax源码解析-JobContainer的初始化阶段解析相关的知识,希望对你有一定的参考价值。
datax源码解析-JobContainer的初始化阶段解析
写在前面
此次源码分析的版本是3.0。因为插件是datax重要的组成部分,源码分析过程中会涉及到插件部分的源码,为了保持一致性,插件都已大部分人比较熟悉的mysql为例子说明。
我所使用的任务模版的json文件是:
"job":
"content":[
"reader":
"name":"mysqlreader",
"parameter":
"column":[
"id",
"name",
"age"
],
"connection":[
"jdbcUrl":[
"jdbc:mysql://127.0.0.1:3306/test"
],
"table":[
"t_datax_test"
]
],
"password":"11111111",
"username":"root"
,
"writer":
"name":"mysqlwriter",
"parameter":
"column":[
"id",
"name",
"age"
],
"connection":[
"jdbcUrl":"jdbc:mysql://127.0.0.1:3306/test2",
"table":[
"t_datax_test"
]
],
"password":"11111111",
"username":"root"
],
"setting":
"speed":
"channel":"2"
另外就是,我在阅读源码的时候加入了很多注释,大家看我贴出来的代码要注意看注释哦。贴出来的代码有些可能只是包含核心的片段。
JobContainer初始化阶段
接着上篇文章:
进入JobContainer的start
方法,jobContainer主要负责的工作全部在start()里面,包括:
- preHandle,前置处理
- init,初始化,主要是调用插件的init方法实现初始化
- prepare,准备工作,比如清空目标表。内部还是调用各类型插件的方法来实现准备工作
- split,根据配置的并发参数,对job进行切分,切分为多个task
- scheduler,把上一步reader和writer split的结果整合到taskGroupContainer,启动任务
- post,执行完任务后的后置操作
- invokeHooks,DataX 的 Hook 机制,比如我们可以实现将datax任务的执行结果发送的邮箱,短信通知等
从代码中看,也可以清晰的看到这几个过程:
public void start()
LOG.info("DataX jobContainer starts job.");
boolean hasException = false;
boolean isDryRun = false;
try
this.startTimeStamp = System.currentTimeMillis();
isDryRun = configuration.getBool(CoreConstant.DATAX_JOB_SETTING_DRYRUN, false);
if(isDryRun)
LOG.info("jobContainer starts to do preCheck ...");
this.preCheck();
else
//线程安全考虑
userConf = configuration.clone();
LOG.debug("jobContainer starts to do preHandle ...");
初始化preHandler插件并执行插件的preHandler
this.preHandle();
LOG.debug("jobContainer starts to do init ...");
//初始化reader和writer
this.init();
LOG.info("jobContainer starts to do prepare ...");
//准备工作,比如清空目标表。内部还是调用各类型插件的方法来实现准备工作。
this.prepare();
LOG.info("jobContainer starts to do split ...");
//拆分task,实际的拆分工作还是调用插件的实现
this.totalStage = this.split();
LOG.info("jobContainer starts to do schedule ...");
//把上一步reader和writer split的结果整合到taskGroupContainer,启动任务
this.schedule();
LOG.debug("jobContainer starts to do post ...");
//执行任务后的操作
this.post();
LOG.debug("jobContainer starts to do postHandle ...");
//不知道是干啥的
this.postHandle();
LOG.info("DataX jobId [] completed successfully.", this.jobId);
//DataX 的 Hook 机制,比如我们可以实现将datax任务的执行结果发送的邮箱,短信通知等
this.invokeHooks();
...
本篇文章只关注前面三个部分,也就是preHandle,init,prepare三个阶段,我认为这三个阶段都属于任务开始前的初始化阶段。
preHandler
preHandler目前官方也没有实现,com.alibaba.datax.common.plugin.AbstractPlugin#preHandler
方法目前是空的,所以这里我们也先略过。
init
继续看init方法,
private void init()
//从配置中获取jobid
this.jobId = this.configuration.getLong(
CoreConstant.DATAX_CORE_CONTAINER_JOB_ID, -1);
if (this.jobId < 0)
LOG.info("Set jobId = 0");
this.jobId = 0;
this.configuration.set(CoreConstant.DATAX_CORE_CONTAINER_JOB_ID,
this.jobId);
Thread.currentThread().setName("job-" + this.jobId);
//DataX所有的状态及统计信息交互类,job、taskGroup、task等的消息汇报
JobPluginCollector jobPluginCollector = new DefaultJobPluginCollector(
this.getContainerCommunicator());
//必须先Reader ,后Writer
this.jobReader = this.initJobReader(jobPluginCollector);
//writer的初始化做的事情会多一些,比如会检查写入表的字段和指定的字段个数是否一致等
this.jobWriter = this.initJobWriter(jobPluginCollector);
可以看到,init方法分别调用的是reader和writer的init方法进行初始化。先来看下initJobReader
方法,
private Reader.Job initJobReader(
JobPluginCollector jobPluginCollector)
this.readerPluginName = this.configuration.getString(
CoreConstant.DATAX_JOB_CONTENT_READER_NAME);
classLoaderSwapper.setCurrentThreadClassLoader(LoadUtil.getJarLoader(
PluginType.READER, this.readerPluginName));
//loadJobPlugin需要用到jarLoader
Reader.Job jobReader = (Reader.Job) LoadUtil.loadJobPlugin(
PluginType.READER, this.readerPluginName);
// 设置reader的jobConfig
jobReader.setPluginJobConf(this.configuration.getConfiguration(
CoreConstant.DATAX_JOB_CONTENT_READER_PARAMETER));
// 设置reader的readerConfig
jobReader.setPeerPluginJobConf(this.configuration.getConfiguration(
CoreConstant.DATAX_JOB_CONTENT_WRITER_PARAMETER));
jobReader.setJobPluginCollector(jobPluginCollector);
//调用插件自己内部的init方法进行个性初始化,以mysql的初始化为例
//mysql reader会检查username,password等是否存在
jobReader.init();
classLoaderSwapper.restoreCurrentThreadClassLoader();
return jobReader;
首先看这个方法返回的是Reader.Job这样的一个内部类,这个类是AbstractJobPlugin
的一个实现。所以返回的其实是一个reader插件的实例。
接着看到是com.alibaba.datax.core.util.container.LoadUtil#getJarLoader
方法,它根据类型和名称从缓存中获取,如果没有则去创建,创建的流程首先获取插件的路径.比如:D:\\DataX\\target\\datax\\datax\\plugin\\reader\\mysqlreader
,然后根据JarLoader里面的getURLs(paths)获取插件路径下所有的jar包。创建单独的JarLoader,把创建的JarLoader缓存起来。
然后它返回一个是一个自定义的类加载器JarLoader,根据java类加载器的原理我们知道,JarLoader
是Application ClassLoader的子类。DataX通过Thread.currentThread().setContextClassLoader在每次对插件调用前后的进行classLoader的切换实现jar隔离的加载机制。
接下里的loadJobPlugin
就会用到这个类加载器去实例化插件的实现类。
插件加载这部分的设计还是值得学习的,即实现了jar的隔离加载,也实现了热加载功能。
最后就是调用插件本身的init方法,以mysql为例,这里主要是检查 username/password 配置是否存在等。
writer的初始化流程基本是一样的,这里不展开了。
prepare
prepare也是调用插件的prepare方法进行准备阶段的工作,
private void prepare()
this.prepareJobReader();
this.prepareJobWriter();
mysql reader的prepare没有实现,意味着不需要prepare,我们直接来看下writer的prepare方法。
// 一般来说,是需要推迟到 task 中进行pre 的执行(单表情况例外)
public void prepare(Configuration originalConfig)
int tableNumber = originalConfig.getInt(Constant.TABLE_NUMBER_MARK);
if (tableNumber == 1)
String username = originalConfig.getString(Key.USERNAME);
String password = originalConfig.getString(Key.PASSWORD);
List<Object> conns = originalConfig.getList(Constant.CONN_MARK,
Object.class);
Configuration connConf = Configuration.from(conns.get(0)
.toString());
// 这里的 jdbcUrl 已经 append 了合适后缀参数
String jdbcUrl = connConf.getString(Key.JDBC_URL);
originalConfig.set(Key.JDBC_URL, jdbcUrl);
//表名
String table = connConf.getList(Key.TABLE, String.class).get(0);
originalConfig.set(Key.TABLE, table);
//如果有需要提前执行的sql,比如清空表等
List<String> preSqls = originalConfig.getList(Key.PRE_SQL,
String.class);
/**
* sql转换,比如把@table换成实际的table name
*
*/
List<String> renderedPreSqls = WriterUtil.renderPreOrPostSqls(
preSqls, table);
originalConfig.remove(Constant.CONN_MARK);
if (null != renderedPreSqls && !renderedPreSqls.isEmpty())
// 说明有 preSql 配置,则此处删除掉
originalConfig.remove(Key.PRE_SQL);
Connection conn = DBUtil.getConnection(dataBaseType,
jdbcUrl, username, password);
LOG.info("Begin to execute preSqls:[]. context info:.",
StringUtils.join(renderedPreSqls, ";"), jdbcUrl);
WriterUtil.executeSqls(conn, renderedPreSqls, jdbcUrl, dataBaseType);
DBUtil.closeDBResources(null, null, conn);
其实prepare的核心思想就是,看下任务的配置文件有没有需要提前执行的sql,比如清空表之类的,有的话就先执行了。
参考:
- https://www.cnblogs.com/xmzpc/p/15193622.html
以上是关于datax源码解析-JobContainer的初始化阶段解析的主要内容,如果未能解决你的问题,请参考以下文章