datax源码解析-任务拆分机制详解

Posted 犀牛饲养员

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了datax源码解析-任务拆分机制详解相关的知识,希望对你有一定的参考价值。

datax源码解析-任务拆分机制详解

写在前面

此次源码分析的版本是3.0。因为插件是datax重要的组成部分,源码分析过程中会涉及到插件部分的源码,为了保持一致性,插件都已大部分人比较熟悉的mysql为例子说明。

本文我们来看看datax的任务拆分机制。

正文

先来看一幅图,

主要是要通过这幅图,理解datax中关于job和task的关系以及概念。

  • DataX完成单个数据同步的作业,我们称之为Job,DataX接受到一个Job之后,将启动一个进程来完成整个作业同步过程。DataX Job模块是单个作业的中枢管理节点,承担了数据清理、子任务切分(将单一作业计算转化为多个子Task)、TaskGroup管理等功能。
  • DataXJob启动后,会根据不同的源端切分策略,将Job切分成多个小的Task(子任务),以便于并发执行。Task便是DataX作业的最小单元,每一个Task都会负责一部分数据的同步工作。
  • 切分多个Task之后,DataX Job会调用Scheduler模块,根据配置的并发数据量,将拆分成的Task重新组合,组装成TaskGroup(任务组)。每一个TaskGroup负责以一定的并发运行完毕分配好的所有Task,默认单个任务组的并发数量为5。
  • 每一个Task都由TaskGroup负责启动,Task启动后,会固定启动Reader—>Channel—>Writer的线程来完成任务同步工作。
  • DataX作业运行起来之后, Job监控并等待多个TaskGroup模块任务完成,等待所有TaskGroup任务完成后Job成功退出。否则,异常退出,进程退出值非0

我们这篇文章其实就是关注的第二个步骤,拆分task。

任务拆分的入口函数是com.alibaba.datax.core.job.JobContainer#split,我们来一点点分析这个方法。

//计算needChannelNumber
        this.adjustChannelNumber();

        if (this.needChannelNumber <= 0) 
            this.needChannelNumber = 1;
        

        //切分读插件,返回包含各个切分后的读插件配置列表,后续一个服务使用一个
        List<Configuration> readerTaskConfigs = this
                .doReaderSplit(this.needChannelNumber);

        ...

首先是计算needChannelNumber这个变量,这个变量是后面执行具体拆分成task的依据。adjustChannelNumber方法如下:

private void adjustChannelNumber() 
        int needChannelNumberByByte = Integer.MAX_VALUE;
        int needChannelNumberByRecord = Integer.MAX_VALUE;

        //是否指定了字节限速,来自任务配置文件
        boolean isByteLimit = (this.configuration.getInt(
                CoreConstant.DATAX_JOB_SETTING_SPEED_BYTE, 0) > 0);
        if (isByteLimit) 
            //全局的限速字节数
            long globalLimitedByteSpeed = this.configuration.getInt(
                    CoreConstant.DATAX_JOB_SETTING_SPEED_BYTE, 10 * 1024 * 1024);

            // 在byte流控情况下,单个Channel流量最大值必须设置,否则报错!
            Long channelLimitedByteSpeed = this.configuration
                    .getLong(CoreConstant.DATAX_CORE_TRANSPORT_CHANNEL_SPEED_BYTE);
            if (channelLimitedByteSpeed == null || channelLimitedByteSpeed <= 0) 
                throw DataXException.asDataXException(
                        FrameworkErrorCode.CONFIG_ERROR,
                        "在有总bps限速条件下,单个channel的bps值不能为空,也不能为非正数");
            

            //计算channel的数量
            needChannelNumberByByte =
                    (int) (globalLimitedByteSpeed / channelLimitedByteSpeed);
            needChannelNumberByByte =
                    needChannelNumberByByte > 0 ? needChannelNumberByByte : 1;
            LOG.info("Job set Max-Byte-Speed to " + globalLimitedByteSpeed + " bytes.");
        

        //是否指定了记录数量限流
        boolean isRecordLimit = (this.configuration.getInt(
                CoreConstant.DATAX_JOB_SETTING_SPEED_RECORD, 0)) > 0;
        if (isRecordLimit) 
            long globalLimitedRecordSpeed = this.configuration.getInt(
                    CoreConstant.DATAX_JOB_SETTING_SPEED_RECORD, 100000);

            Long channelLimitedRecordSpeed = this.configuration.getLong(
                    CoreConstant.DATAX_CORE_TRANSPORT_CHANNEL_SPEED_RECORD);
            if (channelLimitedRecordSpeed == null || channelLimitedRecordSpeed <= 0) 
                throw DataXException.asDataXException(FrameworkErrorCode.CONFIG_ERROR,
                        "在有总tps限速条件下,单个channel的tps值不能为空,也不能为非正数");
            

            needChannelNumberByRecord =
                    (int) (globalLimitedRecordSpeed / channelLimitedRecordSpeed);
            needChannelNumberByRecord =
                    needChannelNumberByRecord > 0 ? needChannelNumberByRecord : 1;
            LOG.info("Job set Max-Record-Speed to " + globalLimitedRecordSpeed + " records.");
        

        // 取较小值
        this.needChannelNumber = needChannelNumberByByte < needChannelNumberByRecord ?
                needChannelNumberByByte : needChannelNumberByRecord;

        // 如果从byte或record上设置了needChannelNumber则退出
        if (this.needChannelNumber < Integer.MAX_VALUE) 
            return;
        

        //是否直接指定了channel数量
        boolean isChannelLimit = (this.configuration.getInt(
                CoreConstant.DATAX_JOB_SETTING_SPEED_CHANNEL, 0) > 0);
        if (isChannelLimit) 
            this.needChannelNumber = this.configuration.getInt(
                    CoreConstant.DATAX_JOB_SETTING_SPEED_CHANNEL);

            LOG.info("Job set Channel-Number to " + this.needChannelNumber
                    + " channels.");

            return;
        

        throw DataXException.asDataXException(
                FrameworkErrorCode.CONFIG_ERROR,
                "Job运行速度必须设置");
    

注释写得比较详细了,总结下该方法的逻辑是,如果指定字节数限流,则据此计算并发数目A。如果指定记录数限流,则据此计算一个并发数目B。再取A和B两者中最小值作为needChannelNumber变量的值。如果两者限流都没指定,则看是否配置文件指定了channel并发数目。配置的示例是这样的:


    "core": 
       "transport" : 
          "channel": 
             "speed": 
                "record": 100,
                "byte": 100
             
          
       
    ,
    "job": 
      "setting": 
        "speed": 
          "record": 5000,
          "byte": 10000,
          "channel" : 1
        
      
    

或者直接指定了channel数量:

"job": 
      "setting":
            "speed":
                "channel":"2"
            
        
    

继续看split代码,

//切分读插件,返回包含各个切分后的读插件配置列表,后续一个服务使用一个
        List<Configuration> readerTaskConfigs = this
                .doReaderSplit(this.needChannelNumber);

        //拆分的任务数量
        int taskNumber = readerTaskConfigs.size();
        //先拆reader,再拆writer
        List<Configuration> writerTaskConfigs = this
                .doWriterSplit(taskNumber);
    
        ...

这里似乎有点奇怪,为啥reader拆分传入的是needChannelNumber,而writer拆分入参是taskNumber。这是因为datax的执行逻辑就是,必须先切分Reader,然后Writer是根据Reader切分后的数目进行切分的。这个仔细想想也可以理解,毕竟传输的源头是reader,根据reader进行分工是自然的。

深入到doReaderSplit方法继续看,

private List<Configuration> doReaderSplit(int adviceNumber) 
        classLoaderSwapper.setCurrentThreadClassLoader(LoadUtil.getJarLoader(
                PluginType.READER, this.readerPluginName));
        //内部还是调用插件的split
        List<Configuration> readerSlicesConfigs =
                this.jobReader.split(adviceNumber);
        if (readerSlicesConfigs == null || readerSlicesConfigs.size() <= 0) 
            throw DataXException.asDataXException(
                    FrameworkErrorCode.PLUGIN_SPLIT_ERROR,
                    "reader切分的task数目不能小于等于0");
        
        LOG.info("DataX Reader.Job [] splits to [] tasks.",
                this.readerPluginName, readerSlicesConfigs.size());
        classLoaderSwapper.restoreCurrentThreadClassLoader();
        return readerSlicesConfigs;
    

没啥东西,因为委托给了插件自己的split方法进行拆分,这里以mysql为例,最终调用的是com.alibaba.datax.plugin.rdbms.reader.util.ReaderSplitUtil#doSplit方法,来看下,

public static List<Configuration> doSplit(
            Configuration originalSliceConfig, int adviceNumber) 
        //默认isTableMode是true
        boolean isTableMode = originalSliceConfig.getBool(Constant.IS_TABLE_MODE).booleanValue();
        int eachTableShouldSplittedNumber = -1;
        if (isTableMode) 
            // adviceNumber这里是channel数量大小, 即datax并发task数量
            // eachTableShouldSplittedNumber是单表应该切分的份数, 向上取整可能和adviceNumber没有比例关系了已经
            eachTableShouldSplittedNumber = calculateEachTableShouldSplittedNumber(
                    adviceNumber, originalSliceConfig.getInt(Constant.TABLE_NUMBER_MARK));
        

        //从配置文件获取列信息
        String column = originalSliceConfig.getString(Key.COLUMN);
        //从配置文件获取where设置,如果配置文件没有指定就是空
        String where = originalSliceConfig.getString(Key.WHERE, null);

        //数据库连接信息,这里仅指reader的连接信息
        List<Object> conns = originalSliceConfig.getList(Constant.CONN_MARK, Object.class);

        List<Configuration> splittedConfigs = new ArrayList<Configuration>();

        for (int i = 0, len = conns.size(); i < len; i++) 
            Configuration sliceConfig = originalSliceConfig.clone();

            Configuration connConf = Configuration.from(conns.get(i).toString());
            String jdbcUrl = connConf.getString(Key.JDBC_URL);
            sliceConfig.set(Key.JDBC_URL, jdbcUrl);

            // 抽取 jdbcUrl 中的 ip/port 进行资源使用的打标,以提供给 core 做有意义的 shuffle 操作
            sliceConfig.set(CommonConstant.LOAD_BALANCE_RESOURCE_MARK, DataBaseType.parseIpFromJdbcUrl(jdbcUrl));

            sliceConfig.remove(Constant.CONN_MARK);

            Configuration tempSlice;

            // 说明是配置的 table 方式
            if (isTableMode) 
                // 已在之前进行了扩展和`处理,可以直接使用
                List<String> tables = connConf.getList(Key.TABLE, String.class);

                Validate.isTrue(null != tables && !tables.isEmpty(), "您读取数据库表配置错误.");

                //要不要根据主键进一步拆分,如果配置文件没有指定就不需要拆分
                String splitPk = originalSliceConfig.getString(Key.SPLIT_PK, null);

                //最终切分份数不一定等于 eachTableShouldSplittedNumber
                boolean needSplitTable = eachTableShouldSplittedNumber > 1
                        && StringUtils.isNotBlank(splitPk);
                //是否需要对单表进行拆分
                //当满足并发数要求较高,并且配置了splitPk(表分割的主键)参数时,则要求进行单表拆分
                if (needSplitTable) 
                    if (tables.size() == 1) 
                        //原来:如果是单表的,主键切分num=num*2+1
                        // splitPk is null这类的情况的数据量本身就比真实数据量少很多, 和channel大小比率关系时,不建议考虑
                        //eachTableShouldSplittedNumber = eachTableShouldSplittedNumber * 2 + 1;// 不应该加1导致长尾
                        
                        //考虑其他比率数字?(splitPk is null, 忽略此长尾)
                        //eachTableShouldSplittedNumber = eachTableShouldSplittedNumber * 5;

                        //为避免导入hive小文件 默认基数为5,可以通过 splitFactor 配置基数
                        // 最终task数为(channel/tableNum)向上取整*splitFactor
                        Integer splitFactor = originalSliceConfig.getInt(Key.SPLIT_FACTOR, Constant.SPLIT_FACTOR);
                        eachTableShouldSplittedNumber = eachTableShouldSplittedNumber * splitFactor;
                    
                    // 尝试对每个表,切分为eachTableShouldSplittedNumber 份
                    for (String table : tables) 
                        tempSlice = sliceConfig.clone();
                        tempSlice.set(Key.TABLE, table);

                        List<Configuration> splittedSlices = SingleTableSplitUtil
                                .splitSingleTable(tempSlice, eachTableShouldSplittedNumber);

                        splittedConfigs.addAll(splittedSlices);
                    
                 else 
                    for (String table : tables) 
                        tempSlice = sliceConfig.clone();
                        tempSlice.set(Key.TABLE, table);
                        String queryColumn = HintUtil.buildQueryColumn(jdbcUrl, table, column);
                        //sql的示例:select col1,col2,col3 from table1
                        tempSlice.set(Key.QUERY_SQL, SingleTableSplitUtil.buildQuerySql(queryColumn, table, where));
                        splittedConfigs.add(tempSlice);
                    
                
             else 
                // 说明是配置的 querySql 方式
                List<String> sqls = connConf.getList(Key.QUERY_SQL, String.class);

                // TODO 是否check 配置为多条语句??
                for (String querySql : sqls) 
                    tempSlice = sliceConfig.clone();
                    tempSlice.set(Key.QUERY_SQL, querySql);
                    splittedConfigs.add(tempSlice);
                
            

        

        return splittedConfigs;
    

这个方法比较长,我加了比较详细的注释。其实就是先判断是否需要进行单表切分,当满足并发数要求较高,并且配置了splitPk(表分割的主键)参数时,则要求进行单表拆分,拆分个数前面已经经过计算得出,如果不需要就是几张表开启几个并发。拆分之后会返回一个Configuration的List,每个Configuration代表原先总配置文件中需要同步的数据的一部分。并加入到总配置文件存储,为后续调用提供配置的支持。

然后继续看writer的拆分方法,最终调用的是com.alibaba.datax.plugin.rdbms.writer.util.WriterUtil#doSplit方法,来看下,

public static List<Configuration> doSplit(Configuration simplifiedConf,
                                              int adviceNumber) 

        List<Configuration> splitResultConfigs = new ArrayList<Configuration>();

        int tableNumber = simplifiedConf.getInt(Constant.TABLE_NUMBER_MARK);

        //处理单表的情况
        if (tableNumber == 1) 
            //由于在之前的  master prepare 中已经把 table,jdbcUrl 提取出来,所以这里处理十分简单
            for (int j = 0; j < adviceNumber; j++) 
                splitResultConfigs.add(simplifiedConf.clone());
            

            return splitResultConfigs;
        
        ...

其中adviceNumber传入的是根据reader切分的任务数,simplifiedConf是从配置文件获取的writer相关的配置。为了做到Reader、Writer任务数对等,这里要求Writer插件必须按照源端的切分数进行切分。否则会报错,

if (tableNumber != adviceNumber) 
            throw DataXException.asDataXException(DBUtilErrorCode.CONF_ERROR,
                    String.format("您的配置文件中的列配置信息有误. 您要写入的目的端的表个数是:%s , 但是根据系统建议需要切分的份数是:%s. 请检查您的配置并作出修改.",
                            tableNumber, adviceNumber));
        

拆分完reader和writer之后,接下来有一行代码:

List<Configuration> transformerList = this.configuration.getListConfiguration(CoreConstant.DATAX_JOB_CONTENT_TRANSFORMER);

这个是做什么的呢?我举个例子,我们定义任务配置的时候可以指定转换的规则,比如:


"job": 
 "setting": 
   "speed": 
     "channel": 2
   ,
   "errorLimit": 
     "record": 10000,
     "percentage": 1
   
 ,
 "content": [
   
     // 字段转换部分
     "transformer": [
       
         // 使用字段截取转换
         "name": "dx_substr",
         "parameter": 
           // 操作读取出来的record的第一列
           "columnIndex": 0,
           // 意思是截取第0到4个字符
           "paras": ["0","4"]
         
       
     ],
     ...

如下图所示,在数据同步、传输过程中,存在用户对于数据传输进行特殊定制化的需求场景,包括裁剪列、转换列等工作,可以借助ETL的T过程实现(Transformer)。DataX包含了完整的E(Extract)、T(Transformer)、L(Load)支持。

最后是合并配置,方法是mergeReaderAndWriterTaskConfigs

private List<Configuration> mergeReaderAndWriterTaskConfigs(
            List<Configuration> readerTasksConfigs,
            List<Configuration> writerTasksConfigs,
            List<Configuration> transformerConfigs) 
        //reader和writer切分的数量要相等
        if (readerTasksConfigs.size() != writerTasksConfigs.size()) 
            throw DataXException.asDataXException(
                    FrameworkErrorCode.PLUGIN_SPLIT_ERROR,
                    String.format("reader切分的task数目[%d]不等于writer切分的task数目[%d].",
                            readerTasksConfigs.size(), writerTasksConfigs.size())
            );
        

        List<Configuration> contentConfigs = new ArrayList<Configuration>();
        for (int i = 0; i < readerTasksConfigs.size(); i++) 
            Configuration taskConfig = Configuration.newDefault();
            
            //reader相关的配置
            taskConfig.set(CoreConstant.JOB_READER_NAME,
                    this.readerPluginName);
            taskConfig.set(CoreConstant.JOB_READER_PARAMETER,
                    readerTasksConfigs.get(i));
            
            //writer相关的配置
            taskConfig.set(CoreConstant.JOB_WRITER_NAME,
                    this.writerPluginName);
            taskConfig.set(CoreConstant.JOB_WRITER_PARAMETER,
                    writerTasksConfigs.get(i));

            //transform相关的配置,可以为空
            if(transformerConfigs!=null && transformerConfigs.size()>0)
                taskConfig.set(CoreConstant.JOB_TRANSFORMER, transformerConfigs);
            

            taskConfig.set(CoreConstant.TASK_ID, i);
            contentConfigs.add(taskConfig);
        

        return contentConfigs;
    

这个其实就是把任务整合后输出,输出的配置文件可以在task中使用。


参考:

  • https://github.com/alibaba/DataX/blob/master/introduction.md
  • https://www.jianshu.com/p/6b4173d3fc74

以上是关于datax源码解析-任务拆分机制详解的主要内容,如果未能解决你的问题,请参考以下文章

datax源码解析-任务调度机制解析

datax源码解析-任务调度机制解析

datax源码解析-datax的hook机制解析

datax源码解析-datax的hook机制解析

datax源码解析-JobContainer的初始化阶段解析

datax源码解析-启动类分析