datax的启动文体datax.py解析

Posted 犀牛饲养员

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了datax的启动文体datax.py解析相关的知识,希望对你有一定的参考价值。

datax生产环境启动运行是通过datax.py启动的,如下:

$ python datax.py job/{YOUR_JOB.json}

这篇文章就是打算解读下datax.py这个源码。

我们从main函数开始,沿着程序的执行流程慢慢解读。

if __name__ == "__main__":
    printCopyright()    //打印版权信息
    parser = getOptionParser()  //获取参数解析器
    options, args = parser.parse_args(sys.argv[1:]) //解析参数
    if options.reader is not None and options.writer is not None:
    //如果解析后,入参的 reader和writer不为空,在从github上构建出一个 json的样例模板
        generateJobConfigTemplate(options.reader,options.writer)
        sys.exit(RET_STATE['OK'])
    if len(args) != 1:
        parser.print_help()
        sys.exit(RET_STATE['FAIL'])

    startCommand = buildStartCommand(options, args) //构建job命令
    # print startCommand

    child_process = subprocess.Popen(startCommand, shell=True)  //启动java子进程执行真正的命令(也就是job进程)
    register_signal()
    (stdout, stderr) = child_process.communicate()

    sys.exit(child_process.returncode)

我代码里加了注释,可以看到步骤都很清晰。一共有四个大步骤:

  • 1.打印datax版权信息
  • 2.获取参数解析器解析参数
  • 3.构建启动命令
  • 4.启动java子进程

下面依次展开这个4个流程详细解读。

打印版权信息

这个简单,省略。

获取参数解析器解析参数

def getOptionParser():
    usage = "usage: %prog [options] job-url-or-path"
    parser = OptionParser(usage=usage)

    prodEnvOptionGroup = OptionGroup(parser, "Product Env Options",
                                     "Normal user use these options to set jvm parameters, job runtime mode etc. "
                                     "Make sure these options can be used in Product Env.")
    prodEnvOptionGroup.add_option("-j", "--jvm", metavar="<jvm parameters>", dest="jvmParameters", action="store",
                                  default=DEFAULT_JVM, help="Set jvm parameters if necessary.")
    prodEnvOptionGroup.add_option("--jobid", metavar="<job unique id>", dest="jobid", action="store", default="-1",
                                  help="Set job unique id when running by Distribute/Local Mode.")
    prodEnvOptionGroup.add_option("-m", "--mode", metavar="<job runtime mode>",
                                  action="store", default="standalone",
                                  help="Set job runtime mode such as: standalone, local, distribute. "
                                       "Default mode is standalone.")
    prodEnvOptionGroup.add_option("-p", "--params", metavar="<parameter used in job config>",
                                  action="store", dest="params",
                                  help='Set job parameter, eg: the source tableName you want to set it by command, '
                                       'then you can use like this: -p"-DtableName=your-table-name", '
                                       'if you have mutiple parameters: -p"-DtableName=your-table-name -DcolumnName=your-column-name".'
                                       'Note: you should config in you job tableName with ${tableName}.')
    prodEnvOptionGroup.add_option("-r", "--reader", metavar="<parameter used in view job config[reader] template>",
                                  action="store", dest="reader",type="string",
                                  help='View job config[reader] template, eg: mysqlreader,streamreader')
    prodEnvOptionGroup.add_option("-w", "--writer", metavar="<parameter used in view job config[writer] template>",
                                  action="store", dest="writer",type="string",
                                  help='View job config[writer] template, eg: mysqlwriter,streamwriter')
    parser.add_option_group(prodEnvOptionGroup)

    devEnvOptionGroup = OptionGroup(parser, "Develop/Debug Options",
                                    "Developer use these options to trace more details of DataX.")
    devEnvOptionGroup.add_option("-d", "--debug", dest="remoteDebug", action="store_true",
                                 help="Set to remote debug mode.")
    devEnvOptionGroup.add_option("--loglevel", metavar="<log level>", dest="loglevel", action="store",
                                 default="info", help="Set log level such as: debug, info, all etc.")
    parser.add_option_group(devEnvOptionGroup)
    return parser

这里使用python内置的OptionParser来构建parser,通过usage我们知道执行命令的姿势是

datax.py [options] job-url-or-path

options就再接下来的代码中通过add_option来添加。比如我们可以用类似如下的方式执行:

python datax.py -r txtReader -w txtFileWriter

注意到上面的示例并没有指定json文件,因为datax会自动从github拉取对应插件的json的模版给我们,这块的处理正是下面这个代码:

if options.reader is not None and options.writer is not None:
        generateJobConfigTemplate(options.reader,options.writer)

构建启动命令

def buildStartCommand(options, args):
    commandMap = {}
    tempJVMCommand = DEFAULT_JVM
    if options.jvmParameters:
        tempJVMCommand = tempJVMCommand + " " + options.jvmParameters

    if options.remoteDebug:
        tempJVMCommand = tempJVMCommand + " " + REMOTE_DEBUG_CONFIG
        print 'local ip: ', getLocalIp()

    if options.loglevel:
        tempJVMCommand = tempJVMCommand + " " + ("-Dloglevel=%s" % (options.loglevel))

    if options.mode:
        commandMap["mode"] = options.mode

    # jobResource 可能是 URL,也可能是本地文件路径(相对,绝对)
    jobResource = args[0]
    if not isUrl(jobResource):
        jobResource = os.path.abspath(jobResource)
        if jobResource.lower().startswith("file://"):
            jobResource = jobResource[len("file://"):]

    jobParams = ("-Dlog.file.name=%s") % (jobResource[-20:].replace('/', '_').replace('.', '_'))
    if options.params:
        jobParams = jobParams + " " + options.params

    if options.jobid:
        commandMap["jobid"] = options.jobid

    commandMap["jvm"] = tempJVMCommand
    commandMap["params"] = jobParams
    commandMap["job"] = jobResource

    return Template(ENGINE_COMMAND).substitute(**commandMap)

流程如下:

  1. 处理jvm参数
  2. 处理datax参数(要执行的json,日志等)
  3. 组装java命令

可以通过

print startCommand

打印最终的命令看看,就是一个标准的java命令。类似下面这种:

    # java -server -Xms1g -Xmx1g -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=D:\\xxxx\\github\\DataX\\target\\datax\\datax/log -Xms1g -Xmx1g -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=D:\\xxxx\\github\\DataX\\target\\datax\\datax/log -Dloglevel=info -Dfile.encoding=UTF-8 -Dlogback.statusListenerClass=ch.qos.logback.core.status.NopStatusListener -Djava.security.egd=file:///dev/urandom -Ddatax.home=D:\\xxxx\\github\\DataX\\target\\datax\\datax -Dlogback.configurationFile=D:\\xxxx\\github\\DataX\\target\\datax\\datax/conf/logback.xml -classpath D:\\xxxx\\github\\DataX\\target\\datax\\datax/lib/*  -Dlog.file.name=x\\datax\\job\\job_json com.alibaba.datax.core.Engine -mode standalone -jobid -1 -job D:\\xxxx\\github\\DataX\\target\\datax\\datax\\job\\job.json

以上是关于datax的启动文体datax.py解析的主要内容,如果未能解决你的问题,请参考以下文章

Datax跨时区

DATAX 按照官方实例 python datax.py ./stream2stream.json 乱码

datax分析与思考(一)

Datax-Web失败任务重跑

Datax-Web失败任务重跑

DataX 实战案例 -- 使用datax实现将hdfs数据导入到mysql表中