Hive一条SQL的旅行

Posted 笨小孩撸代码

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Hive一条SQL的旅行相关的知识,希望对你有一定的参考价值。

从今天开始我们从hive源码跟踪一下一条sql语句在hive执行引擎到底做了哪些事情。 

整个hive程序的入口是在包org.apache.hadoop.hive.cli下面的 

CliDriver 类 

下面这个就是main 方法

public static void main(String[] args) throws Exception {

  int ret = new CliDriver().run(args);

  System.exit(ret);

}


main 方法里面调用了CliDriver里面的run方法, 

这个run 函数最后返回了executeDriver方法

public  int run(String[] args) throws Exception {

// 省略。。。。。

//这里做一个判断,  判断你用 hive -e  还是 hive -f 等等参数

    try {

    //这里主要跟踪return这段代码,

      return executeDriver(ss, conf, oproc);

    } finally {

      ss.resetThreadName();

      ss.close();

    }

  }


跟踪executeDriver方法。从这个方法我们可以看到hive在这里将语句根据“;”拆分成多条语句, 拆分后的语句都会执行这条语句 

ret = cli.processLine(line, true)

private int executeDriver(CliSessionState ss, HiveConf conf, OptionsProcessor oproc)

      throws Exception {


  //省略。。。

  //这里也是做了一个执行前的判断。主要分析下面这段代码

    String line;

    int ret = 0;

    String prefix = "";

    String curDB = getFormattedDb(conf, ss);

    String curPrompt = prompt + curDB;

    String dbSpaces = spacesForString(curDB);


    while ((line = reader.readLine(curPrompt + "> ")) != null) {

      if (!prefix.equals("")) {

        prefix += '\n';

      }

      if (line.trim().startsWith("--")) {

        continue;

      }

      if (line.trim().endsWith(";") && !line.trim().endsWith("\\;")) {

        line = prefix + line;

        ret = cli.processLine(line, true);

        prefix = "";

        curDB = getFormattedDb(conf, ss);

        curPrompt = prompt + curDB;

        dbSpaces = dbSpaces.length() == curDB.length() ? dbSpaces : spacesForString(curDB);

      } else {

        prefix = prefix + line;

        curPrompt = prompt2 + dbSpaces;

        continue;

      }

    }


    return ret;

  }


继续跟踪processLine 方法。这里主要调用了processCmd 方法 

processLine

public int processLine(String line, boolean allowInterrupting) {

   //省略

   //这里主要有接受中断命令的请求,当用户发出 ctrl+C的命令的时候,会kill任务

//省略

        ret = processCmd(command);

   //省略

  }


跟踪processCmd 方法,这个里面做了一些判断,判断开头是不是 quit,exit这种退出关键字。判断是不是以source 或者!开头的。 

processCmd

 public int processCmd(String cmd) {

   //省略


    if (cmd_trimmed.toLowerCase().equals("quit") || cmd_trimmed.toLowerCase().equals("exit")) {

      //省略


    } else if (tokens[0].equalsIgnoreCase("source")) {

       //省略

    } else if (cmd_trimmed.startsWith("!")) {

        //省略

    }  else { // local mode

      try {

        CommandProcessor proc = CommandProcessorFactory.get(tokens, (HiveConf) conf);

        ret = processLocalCmd(cmd, proc, ss);

      } 

      //省略

    }


   //省略

  }


从这里我们可以到这里有2条重要的sql语句。其实从这里我们可以看到最后会执行的语句是processLocalCmd。但是这里我们要弄清楚proc这个变量是一个什么类型。

 CommandProcessor proc = CommandProcessorFactory.get(tokens, (HiveConf) conf);

 ret = processLocalCmd(cmd, proc, ss);


跟踪CommandProcessor。我们可以看到有一个 return new Driver()。这个我们大概的知道这个返回和Driver有关联的类型,CommandProcessor这个类型应该是Driver的上一级 

CommandProcessor

public static CommandProcessor get(String[] cmd, HiveConf conf)

      throws SQLException {

    CommandProcessor result = getForHiveCommand(cmd, conf);

    if (result != null) {

      return result;

    }

    if (isBlank(cmd[0])) {

      return null;

    } else {

      if (conf == null) {

        return new Driver();

      }

      Driver drv = mapDrivers.get(conf);

      if (drv == null) {

        drv = new Driver();

        mapDrivers.put(conf, drv);

      } else {

        drv.resetQueryState();

      }

      drv.init();

      return drv;

    }

  }


接下来我们继续跟踪processLocalCmd。从这里我们可以看到这里调用了Driver的run方法 

processLocalCmd

int processLocalCmd(String cmd, CommandProcessor proc, CliSessionState ss) {

    int tryCount = 0;

    boolean needRetry;

    int ret = 0;


    do {

      try {

        needRetry = false;

        if (proc != null) {

          if (proc instanceof Driver) { // 这里的proc是Driver类型

            Driver qp = (Driver) proc;

            PrintStream out = ss.out;

            long start = System.currentTimeMillis();

            if (ss.getIsVerbose()) {

              out.println(cmd);

            }


            qp.setTryCount(tryCount);

            ret = qp.run(cmd).getResponseCode();  //这里我们知道直接调用了Driver的run方法

            if (ret != 0) {

              qp.close();

              return ret;

            }

        //省略

  }


这里我们跟踪一下Driver中的run方法,这里其实主要关注 

CommandProcessorResponse cpr = runInternal(command, alreadyCompiled);


public CommandProcessorResponse run(String command)

      throws CommandNeedRetryException {

    return run(command, false);

  }


  public CommandProcessorResponse run()

      throws CommandNeedRetryException {

    return run(null, true);

  }


  public CommandProcessorResponse run(String command, boolean alreadyCompiled)

        throws CommandNeedRetryException {

    CommandProcessorResponse cpr = runInternal(command, alreadyCompiled);


    if(cpr.getResponseCode() == 0) {

      return cpr;

    }

    SessionState ss = SessionState.get();

    if(ss == null) {

      return cpr;

    }

    MetaDataFormatter mdf = MetaDataFormatUtils.getFormatter(ss.getConf());

    if(!(mdf instanceof JsonMetaDataFormatter)) {

      return cpr;

    }

    //省略

  }


我们继续跟踪runInternal方法。这个方法里面做了一些状态的判断,后面又调用了compileInternal方法 

runInternal


private CommandProcessorResponse runInternal(String command, boolean alreadyCompiled)

      throws CommandNeedRetryException {

  //省略

    //下面这段是判断这个sql现在的状态

    stateLock.lock();

    try {

      if (alreadyCompiled) {

        if (driverState == DriverState.COMPILED) {

          driverState = DriverState.EXECUTING;

        } else {

          errorMessage = "FAILED: Precompiled query has been cancelled or closed.";

          console.printError(errorMessage);

          return createProcessorResponse(12);

        }

      } else {

        driverState = DriverState.COMPILING;

      }

    } finally {

      stateLock.unlock();

    }


      int ret;

      if (!alreadyCompiled) {

      // compile internal will automatically reset the perf logger

        ret = compileInternal(command, true);

        // then we continue to use this perf logger

        perfLogger = SessionState.getPerfLogger();

        if (ret != 0) {

          return createProcessorResponse(ret);

        }

      } 

     //省略

  }


跟踪compileInternal方法,这个方法主要做了一个提交,调用了compile方法。 

compileInternal


private int compileInternal(String command, boolean deferClose) {

  //省略

    try {

      ret = compile(command, true, deferClose);

    } finally {

      compileLock.unlock();

    }

//省略

  }


好我们看到已经调用了compile 方法,这个会真正的进入hive,compile 这个会对sql进行解析和分析,这个我们下一篇在细说hive中sql是怎么解析和优化的


以上是关于Hive一条SQL的旅行的主要内容,如果未能解决你的问题,请参考以下文章

免费旅行&学宫廷泰菜︱没上过Hive的旅行家不是好厨师

Hive游学及义工旅行:年轻人不如去闯|旅讯8点正

旅行路线

Hive x Uber | 一键呼叫任意门,免费来一场说走就走的旅行!

[BZOJ2388]旅行规划

TSP问题(旅行商问题)[分支限界法]