datax源码解析-任务拆分机制详解
Posted 犀牛饲养员
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了datax源码解析-任务拆分机制详解相关的知识,希望对你有一定的参考价值。
datax源码解析-任务拆分机制详解
写在前面
此次源码分析的版本是3.0。因为插件是datax重要的组成部分,源码分析过程中会涉及到插件部分的源码,为了保持一致性,插件都已大部分人比较熟悉的mysql为例子说明。
本文我们来看看datax的任务拆分机制。
正文
先来看一幅图,
主要是要通过这幅图,理解datax中关于job和task的关系以及概念。
- DataX完成单个数据同步的作业,我们称之为Job,DataX接受到一个Job之后,将启动一个进程来完成整个作业同步过程。DataX Job模块是单个作业的中枢管理节点,承担了数据清理、子任务切分(将单一作业计算转化为多个子Task)、TaskGroup管理等功能。
- DataXJob启动后,会根据不同的源端切分策略,将Job切分成多个小的Task(子任务),以便于并发执行。Task便是DataX作业的最小单元,每一个Task都会负责一部分数据的同步工作。
- 切分多个Task之后,DataX Job会调用Scheduler模块,根据配置的并发数据量,将拆分成的Task重新组合,组装成TaskGroup(任务组)。每一个TaskGroup负责以一定的并发运行完毕分配好的所有Task,默认单个任务组的并发数量为5。
- 每一个Task都由TaskGroup负责启动,Task启动后,会固定启动Reader—>Channel—>Writer的线程来完成任务同步工作。
- DataX作业运行起来之后, Job监控并等待多个TaskGroup模块任务完成,等待所有TaskGroup任务完成后Job成功退出。否则,异常退出,进程退出值非0
我们这篇文章其实就是关注的第二个步骤,拆分task。
任务拆分的入口函数是com.alibaba.datax.core.job.JobContainer#split
,我们来一点点分析这个方法。
//计算needChannelNumber
this.adjustChannelNumber();
if (this.needChannelNumber <= 0)
this.needChannelNumber = 1;
//切分读插件,返回包含各个切分后的读插件配置列表,后续一个服务使用一个
List<Configuration> readerTaskConfigs = this
.doReaderSplit(this.needChannelNumber);
...
首先是计算needChannelNumber
这个变量,这个变量是后面执行具体拆分成task的依据。adjustChannelNumber
方法如下:
private void adjustChannelNumber()
int needChannelNumberByByte = Integer.MAX_VALUE;
int needChannelNumberByRecord = Integer.MAX_VALUE;
//是否指定了字节限速,来自任务配置文件
boolean isByteLimit = (this.configuration.getInt(
CoreConstant.DATAX_JOB_SETTING_SPEED_BYTE, 0) > 0);
if (isByteLimit)
//全局的限速字节数
long globalLimitedByteSpeed = this.configuration.getInt(
CoreConstant.DATAX_JOB_SETTING_SPEED_BYTE, 10 * 1024 * 1024);
// 在byte流控情况下,单个Channel流量最大值必须设置,否则报错!
Long channelLimitedByteSpeed = this.configuration
.getLong(CoreConstant.DATAX_CORE_TRANSPORT_CHANNEL_SPEED_BYTE);
if (channelLimitedByteSpeed == null || channelLimitedByteSpeed <= 0)
throw DataXException.asDataXException(
FrameworkErrorCode.CONFIG_ERROR,
"在有总bps限速条件下,单个channel的bps值不能为空,也不能为非正数");
//计算channel的数量
needChannelNumberByByte =
(int) (globalLimitedByteSpeed / channelLimitedByteSpeed);
needChannelNumberByByte =
needChannelNumberByByte > 0 ? needChannelNumberByByte : 1;
LOG.info("Job set Max-Byte-Speed to " + globalLimitedByteSpeed + " bytes.");
//是否指定了记录数量限流
boolean isRecordLimit = (this.configuration.getInt(
CoreConstant.DATAX_JOB_SETTING_SPEED_RECORD, 0)) > 0;
if (isRecordLimit)
long globalLimitedRecordSpeed = this.configuration.getInt(
CoreConstant.DATAX_JOB_SETTING_SPEED_RECORD, 100000);
Long channelLimitedRecordSpeed = this.configuration.getLong(
CoreConstant.DATAX_CORE_TRANSPORT_CHANNEL_SPEED_RECORD);
if (channelLimitedRecordSpeed == null || channelLimitedRecordSpeed <= 0)
throw DataXException.asDataXException(FrameworkErrorCode.CONFIG_ERROR,
"在有总tps限速条件下,单个channel的tps值不能为空,也不能为非正数");
needChannelNumberByRecord =
(int) (globalLimitedRecordSpeed / channelLimitedRecordSpeed);
needChannelNumberByRecord =
needChannelNumberByRecord > 0 ? needChannelNumberByRecord : 1;
LOG.info("Job set Max-Record-Speed to " + globalLimitedRecordSpeed + " records.");
// 取较小值
this.needChannelNumber = needChannelNumberByByte < needChannelNumberByRecord ?
needChannelNumberByByte : needChannelNumberByRecord;
// 如果从byte或record上设置了needChannelNumber则退出
if (this.needChannelNumber < Integer.MAX_VALUE)
return;
//是否直接指定了channel数量
boolean isChannelLimit = (this.configuration.getInt(
CoreConstant.DATAX_JOB_SETTING_SPEED_CHANNEL, 0) > 0);
if (isChannelLimit)
this.needChannelNumber = this.configuration.getInt(
CoreConstant.DATAX_JOB_SETTING_SPEED_CHANNEL);
LOG.info("Job set Channel-Number to " + this.needChannelNumber
+ " channels.");
return;
throw DataXException.asDataXException(
FrameworkErrorCode.CONFIG_ERROR,
"Job运行速度必须设置");
注释写得比较详细了,总结下该方法的逻辑是,如果指定字节数限流,则据此计算并发数目A。如果指定记录数限流,则据此计算一个并发数目B。再取A和B两者中最小值作为needChannelNumber变量的值。如果两者限流都没指定,则看是否配置文件指定了channel并发数目。配置的示例是这样的:
"core":
"transport" :
"channel":
"speed":
"record": 100,
"byte": 100
,
"job":
"setting":
"speed":
"record": 5000,
"byte": 10000,
"channel" : 1
或者直接指定了channel数量:
"job":
"setting":
"speed":
"channel":"2"
继续看split代码,
//切分读插件,返回包含各个切分后的读插件配置列表,后续一个服务使用一个
List<Configuration> readerTaskConfigs = this
.doReaderSplit(this.needChannelNumber);
//拆分的任务数量
int taskNumber = readerTaskConfigs.size();
//先拆reader,再拆writer
List<Configuration> writerTaskConfigs = this
.doWriterSplit(taskNumber);
...
这里似乎有点奇怪,为啥reader拆分传入的是needChannelNumber
,而writer拆分入参是taskNumber
。这是因为datax的执行逻辑就是,必须先切分Reader,然后Writer是根据Reader切分后的数目进行切分的。这个仔细想想也可以理解,毕竟传输的源头是reader,根据reader进行分工是自然的。
深入到doReaderSplit
方法继续看,
private List<Configuration> doReaderSplit(int adviceNumber)
classLoaderSwapper.setCurrentThreadClassLoader(LoadUtil.getJarLoader(
PluginType.READER, this.readerPluginName));
//内部还是调用插件的split
List<Configuration> readerSlicesConfigs =
this.jobReader.split(adviceNumber);
if (readerSlicesConfigs == null || readerSlicesConfigs.size() <= 0)
throw DataXException.asDataXException(
FrameworkErrorCode.PLUGIN_SPLIT_ERROR,
"reader切分的task数目不能小于等于0");
LOG.info("DataX Reader.Job [] splits to [] tasks.",
this.readerPluginName, readerSlicesConfigs.size());
classLoaderSwapper.restoreCurrentThreadClassLoader();
return readerSlicesConfigs;
没啥东西,因为委托给了插件自己的split方法进行拆分,这里以mysql为例,最终调用的是com.alibaba.datax.plugin.rdbms.reader.util.ReaderSplitUtil#doSplit
方法,来看下,
public static List<Configuration> doSplit(
Configuration originalSliceConfig, int adviceNumber)
//默认isTableMode是true
boolean isTableMode = originalSliceConfig.getBool(Constant.IS_TABLE_MODE).booleanValue();
int eachTableShouldSplittedNumber = -1;
if (isTableMode)
// adviceNumber这里是channel数量大小, 即datax并发task数量
// eachTableShouldSplittedNumber是单表应该切分的份数, 向上取整可能和adviceNumber没有比例关系了已经
eachTableShouldSplittedNumber = calculateEachTableShouldSplittedNumber(
adviceNumber, originalSliceConfig.getInt(Constant.TABLE_NUMBER_MARK));
//从配置文件获取列信息
String column = originalSliceConfig.getString(Key.COLUMN);
//从配置文件获取where设置,如果配置文件没有指定就是空
String where = originalSliceConfig.getString(Key.WHERE, null);
//数据库连接信息,这里仅指reader的连接信息
List<Object> conns = originalSliceConfig.getList(Constant.CONN_MARK, Object.class);
List<Configuration> splittedConfigs = new ArrayList<Configuration>();
for (int i = 0, len = conns.size(); i < len; i++)
Configuration sliceConfig = originalSliceConfig.clone();
Configuration connConf = Configuration.from(conns.get(i).toString());
String jdbcUrl = connConf.getString(Key.JDBC_URL);
sliceConfig.set(Key.JDBC_URL, jdbcUrl);
// 抽取 jdbcUrl 中的 ip/port 进行资源使用的打标,以提供给 core 做有意义的 shuffle 操作
sliceConfig.set(CommonConstant.LOAD_BALANCE_RESOURCE_MARK, DataBaseType.parseIpFromJdbcUrl(jdbcUrl));
sliceConfig.remove(Constant.CONN_MARK);
Configuration tempSlice;
// 说明是配置的 table 方式
if (isTableMode)
// 已在之前进行了扩展和`处理,可以直接使用
List<String> tables = connConf.getList(Key.TABLE, String.class);
Validate.isTrue(null != tables && !tables.isEmpty(), "您读取数据库表配置错误.");
//要不要根据主键进一步拆分,如果配置文件没有指定就不需要拆分
String splitPk = originalSliceConfig.getString(Key.SPLIT_PK, null);
//最终切分份数不一定等于 eachTableShouldSplittedNumber
boolean needSplitTable = eachTableShouldSplittedNumber > 1
&& StringUtils.isNotBlank(splitPk);
//是否需要对单表进行拆分
//当满足并发数要求较高,并且配置了splitPk(表分割的主键)参数时,则要求进行单表拆分
if (needSplitTable)
if (tables.size() == 1)
//原来:如果是单表的,主键切分num=num*2+1
// splitPk is null这类的情况的数据量本身就比真实数据量少很多, 和channel大小比率关系时,不建议考虑
//eachTableShouldSplittedNumber = eachTableShouldSplittedNumber * 2 + 1;// 不应该加1导致长尾
//考虑其他比率数字?(splitPk is null, 忽略此长尾)
//eachTableShouldSplittedNumber = eachTableShouldSplittedNumber * 5;
//为避免导入hive小文件 默认基数为5,可以通过 splitFactor 配置基数
// 最终task数为(channel/tableNum)向上取整*splitFactor
Integer splitFactor = originalSliceConfig.getInt(Key.SPLIT_FACTOR, Constant.SPLIT_FACTOR);
eachTableShouldSplittedNumber = eachTableShouldSplittedNumber * splitFactor;
// 尝试对每个表,切分为eachTableShouldSplittedNumber 份
for (String table : tables)
tempSlice = sliceConfig.clone();
tempSlice.set(Key.TABLE, table);
List<Configuration> splittedSlices = SingleTableSplitUtil
.splitSingleTable(tempSlice, eachTableShouldSplittedNumber);
splittedConfigs.addAll(splittedSlices);
else
for (String table : tables)
tempSlice = sliceConfig.clone();
tempSlice.set(Key.TABLE, table);
String queryColumn = HintUtil.buildQueryColumn(jdbcUrl, table, column);
//sql的示例:select col1,col2,col3 from table1
tempSlice.set(Key.QUERY_SQL, SingleTableSplitUtil.buildQuerySql(queryColumn, table, where));
splittedConfigs.add(tempSlice);
else
// 说明是配置的 querySql 方式
List<String> sqls = connConf.getList(Key.QUERY_SQL, String.class);
// TODO 是否check 配置为多条语句??
for (String querySql : sqls)
tempSlice = sliceConfig.clone();
tempSlice.set(Key.QUERY_SQL, querySql);
splittedConfigs.add(tempSlice);
return splittedConfigs;
这个方法比较长,我加了比较详细的注释。其实就是先判断是否需要进行单表切分,当满足并发数要求较高,并且配置了splitPk(表分割的主键)参数时,则要求进行单表拆分,拆分个数前面已经经过计算得出,如果不需要就是几张表开启几个并发。拆分之后会返回一个Configuration的List,每个Configuration代表原先总配置文件中需要同步的数据的一部分。并加入到总配置文件存储,为后续调用提供配置的支持。
然后继续看writer的拆分方法,最终调用的是com.alibaba.datax.plugin.rdbms.writer.util.WriterUtil#doSplit
方法,来看下,
public static List<Configuration> doSplit(Configuration simplifiedConf,
int adviceNumber)
List<Configuration> splitResultConfigs = new ArrayList<Configuration>();
int tableNumber = simplifiedConf.getInt(Constant.TABLE_NUMBER_MARK);
//处理单表的情况
if (tableNumber == 1)
//由于在之前的 master prepare 中已经把 table,jdbcUrl 提取出来,所以这里处理十分简单
for (int j = 0; j < adviceNumber; j++)
splitResultConfigs.add(simplifiedConf.clone());
return splitResultConfigs;
...
其中adviceNumber
传入的是根据reader切分的任务数,simplifiedConf是从配置文件获取的writer相关的配置。为了做到Reader、Writer任务数对等,这里要求Writer插件必须按照源端的切分数进行切分。否则会报错,
if (tableNumber != adviceNumber)
throw DataXException.asDataXException(DBUtilErrorCode.CONF_ERROR,
String.format("您的配置文件中的列配置信息有误. 您要写入的目的端的表个数是:%s , 但是根据系统建议需要切分的份数是:%s. 请检查您的配置并作出修改.",
tableNumber, adviceNumber));
拆分完reader和writer之后,接下来有一行代码:
List<Configuration> transformerList = this.configuration.getListConfiguration(CoreConstant.DATAX_JOB_CONTENT_TRANSFORMER);
这个是做什么的呢?我举个例子,我们定义任务配置的时候可以指定转换的规则,比如:
"job":
"setting":
"speed":
"channel": 2
,
"errorLimit":
"record": 10000,
"percentage": 1
,
"content": [
// 字段转换部分
"transformer": [
// 使用字段截取转换
"name": "dx_substr",
"parameter":
// 操作读取出来的record的第一列
"columnIndex": 0,
// 意思是截取第0到4个字符
"paras": ["0","4"]
],
...
如下图所示,在数据同步、传输过程中,存在用户对于数据传输进行特殊定制化的需求场景,包括裁剪列、转换列等工作,可以借助ETL的T过程实现(Transformer)。DataX包含了完整的E(Extract)、T(Transformer)、L(Load)支持。
最后是合并配置,方法是mergeReaderAndWriterTaskConfigs
,
private List<Configuration> mergeReaderAndWriterTaskConfigs(
List<Configuration> readerTasksConfigs,
List<Configuration> writerTasksConfigs,
List<Configuration> transformerConfigs)
//reader和writer切分的数量要相等
if (readerTasksConfigs.size() != writerTasksConfigs.size())
throw DataXException.asDataXException(
FrameworkErrorCode.PLUGIN_SPLIT_ERROR,
String.format("reader切分的task数目[%d]不等于writer切分的task数目[%d].",
readerTasksConfigs.size(), writerTasksConfigs.size())
);
List<Configuration> contentConfigs = new ArrayList<Configuration>();
for (int i = 0; i < readerTasksConfigs.size(); i++)
Configuration taskConfig = Configuration.newDefault();
//reader相关的配置
taskConfig.set(CoreConstant.JOB_READER_NAME,
this.readerPluginName);
taskConfig.set(CoreConstant.JOB_READER_PARAMETER,
readerTasksConfigs.get(i));
//writer相关的配置
taskConfig.set(CoreConstant.JOB_WRITER_NAME,
this.writerPluginName);
taskConfig.set(CoreConstant.JOB_WRITER_PARAMETER,
writerTasksConfigs.get(i));
//transform相关的配置,可以为空
if(transformerConfigs!=null && transformerConfigs.size()>0)
taskConfig.set(CoreConstant.JOB_TRANSFORMER, transformerConfigs);
taskConfig.set(CoreConstant.TASK_ID, i);
contentConfigs.add(taskConfig);
return contentConfigs;
这个其实就是把任务整合后输出,输出的配置文件可以在task中使用。
参考:
- https://github.com/alibaba/DataX/blob/master/introduction.md
- https://www.jianshu.com/p/6b4173d3fc74
以上是关于datax源码解析-任务拆分机制详解的主要内容,如果未能解决你的问题,请参考以下文章