datax分析与思考(一)
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了datax分析与思考(一)相关的知识,希望对你有一定的参考价值。
参考技术A 先看执行的第一个步骤:在最上层抽象类,这个里面相当于获取全局公共信息,java入口部分就是这个Engine的main方法直接启动
阿里这边有提供多级JSON配置信息无损存储
但是平常的使用中很少有直接以json的格式操作,一般是转成对象bean,然后进行操作,
他在这边新写这个方法的目的主要在于对封闭的json进行处理,直接从key获取到value,但是我感觉还是可以直接转成bean的形式去获取比较方便.毕竟只有一个json的文件
直接main方法启动,在datax.py脚本中设置了锁需要的参数和json地址, 在启动的时候通过读取启动参数,获取到当前的启动配置文件.
这边阿里使用了
这个包 ,然后使用
来根据参数的名字进行读取.这个地方平常很少有用到,可以记录下.
datax在执行的过程中打了很多的日志,有个打印vminfo的class我感觉可以在平常使用到,之前我也写过这种类似的,但是说效果不是很好,因为我之前是使用的脚本去查询的服务器上面的信息.
这个类的话,在日志监控或者效率监控也可以使用到.
前面基本上都是在对性能参数的赋值,然后获取到了一个 List<Configuration> taskConfigs ,将这个list转成了map,然后再循环中开多线程执行任务.
datax源码解析-启动类分析
写在前面
此次源码分析的版本是3.0。因为插件是datax重要的组成部分,源码分析过程中会涉及到插件部分的源码,为了保持一致性,插件都已大部分人比较熟悉的mysql为例子说明。
我所使用的任务模版的json文件是:
"job":
"content":[
"reader":
"name":"mysqlreader",
"parameter":
"column":[
"id",
"name",
"age"
],
"connection":[
"jdbcUrl":[
"jdbc:mysql://127.0.0.1:3306/test"
],
"table":[
"t_datax_test"
]
],
"password":"11111111",
"username":"root"
,
"writer":
"name":"mysqlwriter",
"parameter":
"column":[
"id",
"name",
"age"
],
"connection":[
"jdbcUrl":"jdbc:mysql://127.0.0.1:3306/test2",
"table":[
"t_datax_test"
]
],
"password":"11111111",
"username":"root"
],
"setting":
"speed":
"channel":"2"
另外就是,我在阅读源码的时候加入了很多注释,大家看我贴出来的代码要注意看注释哦。贴出来的代码有些可能只是包含核心的片段。
启动类分析
datax的启动类是com.alibaba.datax.core.Engine
,通过main方法启动datax进程。
public static void main(String[] args) throws Exception
int exitCode = 0;
try
Engine.entry(args);
catch (Throwable e)
exitCode = 1;
LOG.error("\\n\\n经DataX智能分析,该任务最可能的错误原因是:\\n" + ExceptionTracker.trace(e));
...
继续看entry方法,
public static void entry(final String[] args) throws Throwable
Options options = new Options();
options.addOption("job", true, "Job config.");
options.addOption("jobid", true, "Job unique id.");
options.addOption("mode", true, "Job runtime mode.");
BasicParser parser = new BasicParser();
CommandLine cl = parser.parse(options, args);
//datax运行目录/xxx.json
String jobPath = cl.getOptionValue("job");
// 如果用户没有明确指定jobid, 则 datax.py 会指定 jobid 默认值为-1
String jobIdString = cl.getOptionValue("jobid");
RUNTIME_MODE = cl.getOptionValue("mode");
Configuration configuration = ConfigParser.parse(jobPath);
long jobId;
if (!"-1".equalsIgnoreCase(jobIdString))
jobId = Long.parseLong(jobIdString);
else
// only for dsc & ds & datax 3 update
String dscJobUrlPatternString = "/instance/(\\\\d1,)/config.xml";
String dsJobUrlPatternString = "/inner/job/(\\\\d1,)/config";
String dsTaskGroupUrlPatternString = "/inner/job/(\\\\d1,)/taskGroup/";
List<String> patternStringList = Arrays.asList(dscJobUrlPatternString,
dsJobUrlPatternString, dsTaskGroupUrlPatternString);
jobId = parseJobIdFromUrl(patternStringList, jobPath);
boolean isStandAloneMode = "standalone".equalsIgnoreCase(RUNTIME_MODE);
if (!isStandAloneMode && jobId == -1)
// 如果不是 standalone 模式,那么 jobId 一定不能为-1
throw DataXException.asDataXException(FrameworkErrorCode.CONFIG_ERROR, "非 standalone 模式必须在 URL 中提供有效的 jobId.");
configuration.set(CoreConstant.DATAX_CORE_CONTAINER_JOB_ID, jobId);
//打印vmInfo
VMInfo vmInfo = VMInfo.getVmInfo();
if (vmInfo != null)
LOG.info(vmInfo.toString());
LOG.info("\\n" + Engine.filterJobConfiguration(configuration) + "\\n");
LOG.debug(configuration.toJSON());
ConfigurationValidate.doValidate(configuration);
Engine engine = new Engine();
//完成配置初始化后该方法将实例化本身并调用其start方法
engine.start(configuration);
首先是解析我们运行datax制定的运行参数,比如我在idea里给的配置是
-mode
standalone
-jobid
-1
-job
/Users/malu/Documents/code/idea_study/DataX/core/target/datax/job/mysql2mysql.json
那自然的,jobPath的值就是/Users/malu/Documents/code/idea_study/DataX/core/target/datax/job/mysql2mysql.json
,jobIdString的值是-1,RUNTIME_MODE的值是standalone
。
这几个关键的变量值明确之后,下面的流程明确了。
接着看一个比较重要的方法,ConfigParser.parse
,这个方法返回的是Configuration
类的实例,这个类在datax里非常重要,所有的配置信息都由它来管理,相当于大管家的角色。我后面打算专门写一篇介绍这个类。
/**
* 指定Job配置路径,ConfigParser会解析Job、Plugin、Core全部信息,并以Configuration返回
*/
public static Configuration parse(final String jobPath)
//首先从任务配置文件解析基本的配置,包括reader、writer的信息,channel的数量等
Configuration configuration = ConfigParser.parseJobConfig(jobPath);
//合并datax本身的一些配置,主要是在core.json文件里,比如限速的一些配置等
configuration.merge(
ConfigParser.parseCoreConfig(CoreConstant.DATAX_CONF_PATH),
false);
// todo config优化,只捕获需要的plugin
//reader plugin的名字,比如mysql是mysqlreader
String readerPluginName = configuration.getString(
CoreConstant.DATAX_JOB_CONTENT_READER_NAME);
//writer plugin的名字,比如mysql是mysqlwriter
String writerPluginName = configuration.getString(
CoreConstant.DATAX_JOB_CONTENT_WRITER_NAME);
String preHandlerName = configuration.getString(
CoreConstant.DATAX_JOB_PREHANDLER_PLUGINNAME);
String postHandlerName = configuration.getString(
CoreConstant.DATAX_JOB_POSTHANDLER_PLUGINNAME);
Set<String> pluginList = new HashSet<String>();
pluginList.add(readerPluginName);
pluginList.add(writerPluginName);
if(StringUtils.isNotEmpty(preHandlerName))
pluginList.add(preHandlerName);
if(StringUtils.isNotEmpty(postHandlerName))
pluginList.add(postHandlerName);
try
configuration.merge(parsePluginConfig(new ArrayList<String>(pluginList)), false);
...
注释写得很清楚了。
VMInfo里面放的是电脑本身的一些配置信息,这里不表。
接着是filterJobConfiguration
方法,
public static String filterJobConfiguration(final Configuration configuration)
//clone一份,因为后面会修改
Configuration jobConfWithSetting = configuration.getConfiguration("job").clone();
Configuration jobContent = jobConfWithSetting.getConfiguration("content");
//过滤敏感信息,比如password
filterSensitiveConfiguration(jobContent);
jobConfWithSetting.set("content",jobContent);
//格式化json字符串显示
return jobConfWithSetting.beautify();
这里也没啥好说的,都是一些基本操作。然后进入start
方法,
/* check job model (job/task) first */
public void start(Configuration allConf)
// 绑定column转换信息
ColumnCast.bind(allConf);
/**
* 初始化PluginLoader,可以获取各种插件配置
*/
LoadUtil.bind(allConf);
boolean isJob = !("taskGroup".equalsIgnoreCase(allConf
.getString(CoreConstant.DATAX_CORE_CONTAINER_MODEL)));
//JobContainer会在schedule后再行进行设置和调整值
int channelNumber =0;
AbstractContainer container;
...
//缺省打开perfTrace
boolean traceEnable = allConf.getBool(CoreConstant.DATAX_CORE_CONTAINER_TRACE_ENABLE, true);
boolean perfReportEnable = allConf.getBool(CoreConstant.DATAX_CORE_REPORT_DATAX_PERFLOG, true);
//standlone模式的datax shell任务不进行汇报
if(instanceId == -1)
perfReportEnable = false;
int priority = 0;
try
priority = Integer.parseInt(System.getenv("SKYNET_PRIORITY"));
catch (NumberFormatException e)
LOG.warn("prioriy set to 0, because NumberFormatException, the value is: "+System.getProperty("PROIORY"));
Configuration jobInfoConfig = allConf.getConfiguration(CoreConstant.DATAX_JOB_JOBINFO);
//初始化PerfTrace
PerfTrace perfTrace = PerfTrace.getInstance(isJob, instanceId, taskGroupId, priority, traceEnable);
perfTrace.setJobInfo(jobInfoConfig,perfReportEnable,channelNumber);
/**
* 有两个实现:JobContainer和TaskGroupContainer
* 从配置看,基本上都是JobContainer,所以主要分析它
*/
container.start();
注释也比较清楚了,这里说明一点就是PerfTrace
类,它是一个追踪性能的类,也就是datax在执行任务的时候记录一些指标,比如传输了多少数据,耗时多少等。下面是一个示例,它是datax在执行完一个任务后其中一部分打印内容:
2021-11-28 09:11:32.532 [job-0] INFO StandAloneJobContainerCommunicator - Total 5 records, 39 bytes | Speed 3B/s, 0 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 0.000s | All Task WaitReaderTime 0.000s | Percentage 100.00%
container.start方法就进入JobContainer内部了,放在下一篇文章讲吧。
以上是关于datax分析与思考(一)的主要内容,如果未能解决你的问题,请参考以下文章