从Hive源码解读大数据开发为什么可以脱离SQLJavaScala

Posted 虎鲸不是鱼

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了从Hive源码解读大数据开发为什么可以脱离SQLJavaScala相关的知识,希望对你有一定的参考价值。

从Hive源码解读大数据开发为什么可以脱离SQL、Java、Scala

前言

【本文适合有一定计算机基础/半年工作经验的读者食用。立个Flg,愿天下不再有肤浅的SQL Boy】

谈到大数据开发,占据绝大多数人口的就是SQL Boy,不接受反驳,毕竟大数据主要就是为机器学习和统计报表服务的,自然从Oracle数据库开发转过来并且还是只会写几句SQL的人不在少数,个别会Python写个spark.sql(“一个sql字符串”)的已经是SQL Boy中的人才。这种只能处理结构化表的最基础的大数据开发人员,就是我们常提到的梗:肤浅的SQL Boy。。。对大数据完全不懂,思想还停留在数据库时代,大数据组件也都是拿来当RDBMS来用。。。这种业务开发人员的技术水平其实不敢恭维。

还有从Java后端开发转过来的,虽然不适应,但还是可以一个Main方法流畅地操作Spark、Flink,手写个JDBC,做点简单的二开,这种就是平台开发人员,技术水平要更高一些。Java写得好,Scala其实上手也快。

但是。。。这并不代表做大数据只能用SQL/Java/Scala。。。这么局限的话,也不比SQL Boy强到哪里去。

笔者最早还搞过嵌入式开发,自然明白C/C#/C++也可以搞大数据。。。

本文将以大数据开发中最常见的数仓组件Hive的drop table为例,抛砖引玉,解读为神马大数据开发可以脱离SQL、Java、Scala。

为神马可以脱离SQL

数据不外乎结构化数据和非结构化数据,SQL只能处理极其有限的结构化表【RDBMS、整齐的csv/tsv等】,绝大多数的半结构化、非结构化数据SQL是无能为力的【log日志文件、音图等】。古代的MapReduce本身就不可以用SQL,Spark和Flink老版本都是基于API的,没有SQL的年代大家也活得好好的。大数据组件对SQL的支持日渐友好都是后来的事情,主要是为了降低门槛,让SQL Boy也可以用上大数据技术。

肤浅的SQL Boy们当然只知道:

drop table db_name.tb_name;

正常情况这个Hive表就会被drop掉,认知也就局限于Hive是个数据库。

但是大数据平台开发知道去翻看Hive的Java API:

https://svn.apache.org/repos/infra/websites/production/hive/content/javadocs/r3.1.3/api/index.html

知道还有这种方式:

package com.zhiyong;

import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;

/**
 * @program: zhiyong_study
 * @description: 测试MetaStore
 * @author: zhiyong
 * @create: 2023-03-22 22:57
 **/
public class MetaStoreDemo 
    public static void main(String[] args) throws Exception
        HiveConf hiveConf = new HiveConf();
        HiveMetaStoreClient client = new HiveMetaStoreClient(hiveConf);
        client.dropTable("db_name","tb_name");
    

通过调用API的方式,同样可以drop掉表。显然不一定要用DDL。通过HiveMetaStoreClient的方式,还可以create建表等操作。

懂大数据底层的平台开发当然还有更狠的方式:直接连Hive存元数据的mysql,对元数据表的数据做精准crud。。。

对结构化表的ETL或者其它的运算处理完全可以用Spark的DataFrame、Flink的DataStream编程,纯API方式实现,SQL能实现的Java和Scala都能实现,至于SQL实现不了的Java和Scala也能实现。。。

笔者实在是想不到除了RDBMS和各类包皮产品【在开源的Apache组件基础上做一些封装】,还有哪些场景是只能用SQL的。。。

至此,可以说明大数据可以脱离SQL。

为神马可以脱离Java

虽然Hive底层是Java写的,但是这并不意味着只能用Java操作Hive。认知这么肤浅的话,也就活该一辈子调参调API了。。。

找到dropTable的实际入口

从Hive3.1.2源码,可以找到dropTable方法:

@Override
  public void dropTable(String dbname, String name, boolean deleteData,
      boolean ignoreUnknownTab) throws MetaException, TException,
      NoSuchObjectException, UnsupportedOperationException 
    dropTable(getDefaultCatalog(conf), dbname, name, deleteData, ignoreUnknownTab, null);
  

  @Override
  public void dropTable(String dbname, String name, boolean deleteData,
      boolean ignoreUnknownTab, boolean ifPurge) throws TException 
    dropTable(getDefaultCatalog(conf), dbname, name, deleteData, ignoreUnknownTab, ifPurge);
  

  @Override
  public void dropTable(String dbname, String name) throws TException 
    dropTable(getDefaultCatalog(conf), dbname, name, true, true, null);
  

  @Override
  public void dropTable(String catName, String dbName, String tableName, boolean deleteData,
                        boolean ignoreUnknownTable, boolean ifPurge) throws TException 
    //build new environmentContext with ifPurge;
    EnvironmentContext envContext = null;
    if(ifPurge)
      Map<String, String> warehouseOptions;
      warehouseOptions = new HashMap<>();
      warehouseOptions.put("ifPurge", "TRUE");
      envContext = new EnvironmentContext(warehouseOptions);
    
    dropTable(catName, dbName, tableName, deleteData, ignoreUnknownTable, envContext);

  

虽然有多个同名方法,但是底层调用的还是同一个方法:

  /**
   * Drop the table and choose whether to: delete the underlying table data;
   * throw if the table doesn't exist; save the data in the trash.
   *
   * @param catName catalog name
   * @param dbname database name
   * @param name table name
   * @param deleteData
   *          delete the underlying data or just delete the table in metadata
   * @param ignoreUnknownTab
   *          don't throw if the requested table doesn't exist
   * @param envContext
   *          for communicating with thrift
   * @throws MetaException
   *           could not drop table properly
   * @throws NoSuchObjectException
   *           the table wasn't found
   * @throws TException
   *           a thrift communication error occurred
   * @throws UnsupportedOperationException
   *           dropping an index table is not allowed
   * @see org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore.Iface#drop_table(java.lang.String,
   *      java.lang.String, boolean)
   */
  public void dropTable(String catName, String dbname, String name, boolean deleteData,
      boolean ignoreUnknownTab, EnvironmentContext envContext) throws MetaException, TException,
      NoSuchObjectException, UnsupportedOperationException 
    Table tbl;
    try 
      tbl = getTable(catName, dbname, name);
     catch (NoSuchObjectException e) 
      if (!ignoreUnknownTab) 
        throw e;
      
      return;
    
    HiveMetaHook hook = getHook(tbl);
    if (hook != null) 
      hook.preDropTable(tbl);
    
    boolean success = false;
    try 
      drop_table_with_environment_context(catName, dbname, name, deleteData, envContext);
      if (hook != null) 
        hook.commitDropTable(tbl, deleteData || (envContext != null && "TRUE".equals(envContext.getProperties().get("ifPurge"))));
      
      success=true;
     catch (NoSuchObjectException e) 
      if (!ignoreUnknownTab) 
        throw e;
      
     finally 
      if (!success && (hook != null)) 
        hook.rollbackDropTable(tbl);
      
    
  

主要就是获取了表对象,然后做了preDropTable预提交和commitDropTable实际的提交。这种2PC方式表面上还是很严谨。。。

可以发现HiveMetaHook这其实是个接口:

package org.apache.hadoop.hive.metastore;

/**
 * HiveMetaHook defines notification methods which are invoked as part
 * of transactions against the metastore, allowing external catalogs
 * such as HBase to be kept in sync with Hive's metastore.
 *
 *<p>
 *
 * Implementations can use @link MetaStoreUtils#isExternalTable to
 * distinguish external tables from managed tables.
 */
@InterfaceAudience.Public
@InterfaceStability.Stable
public interface HiveMetaHook 

  public String ALTER_TABLE_OPERATION_TYPE = "alterTableOpType";

  public List<String> allowedAlterTypes = ImmutableList.of("ADDPROPS", "DROPPROPS");

  /**
   * Called before a table definition is removed from the metastore
   * during DROP TABLE.
   *
   * @param table table definition
   */
  public void preDropTable(Table table)
    throws MetaException;

  /**
   * Called after failure removing a table definition from the metastore
   * during DROP TABLE.
   *
   * @param table table definition
   */
  public void rollbackDropTable(Table table)
    throws MetaException;

  /**
   * Called after successfully removing a table definition from the metastore
   * during DROP TABLE.
   *
   * @param table table definition
   *
   * @param deleteData whether to delete data as well; this should typically
   * be ignored in the case of an external table
   */
  public void commitDropTable(Table table, boolean deleteData)
    throws MetaException;

继承关系:

显然不是这个:

package org.apache.hadoop.hive.metastore;

public abstract class DefaultHiveMetaHook implements HiveMetaHook 
  /**
   * Called after successfully INSERT [OVERWRITE] statement is executed.
   * @param table table definition
   * @param overwrite true if it is INSERT OVERWRITE
   *
   * @throws MetaException
   */
  public abstract void commitInsertTable(Table table, boolean overwrite) throws MetaException;

  /**
   * called before commit insert method is called
   * @param table table definition
   * @param overwrite true if it is INSERT OVERWRITE
   *
   * @throws MetaException
   */
  public abstract void preInsertTable(Table table, boolean overwrite) throws MetaException;

  /**
   * called in case pre commit or commit insert fail.
   * @param table table definition
   * @param overwrite true if it is INSERT OVERWRITE
   *
   * @throws MetaException
   */
  public abstract void rollbackInsertTable(Table table, boolean overwrite) throws MetaException;

更不可能是这个test的Mock类:

/**
 * Mock class used for unit testing.
 * @link org.apache.hadoop.hive.ql.lockmgr.TestDbTxnManager2#testLockingOnInsertIntoNonNativeTables()
 */
public class StorageHandlerMock extends DefaultStorageHandler 
 

所以是AccumuloStorageHandler这个类:

package org.apache.hadoop.hive.accumulo;

/**
 * Create table mapping to Accumulo for Hive. Handle predicate pushdown if necessary.
 */
public class AccumuloStorageHandler extends DefaultStorageHandler implements HiveMetaHook,
    HiveStoragePredicateHandler 
    

但是:

  @Override
  public void preDropTable(Table table) throws MetaException 
    // do nothing
  

这个do nothing!!!一言难尽。这种2PC方式表面上确实很严谨。。。

所以dropTable的入口是:

  @Override
  public void commitDropTable(Table table, boolean deleteData) throws MetaException 
    String tblName = getTableName(table);
    if (!isExternalTable(table)) 
      try 
        if (deleteData) 
          TableOperations tblOpts = connectionParams.getConnector().tableOperations();
          if (tblOpts.exists(tblName)) 
            tblOpts.delete(tblName);
          
        
       catch (AccumuloException e) 
        throw new MetaException(StringUtils.stringifyException(e));
       catch (AccumuloSecurityException e) 
        throw new MetaException(StringUtils.stringifyException(e));
       catch (TableNotFoundException e) 
        throw new MetaException(StringUtils.stringifyException(e));
      
    
  

按照最简单的内部表、需要删数据来看,实际上调用的是这个delete方法。而TableOperations又是个接口:

package org.apache.accumulo.core.client.admin;

/**
 * Provides a class for administering tables
 *
 */

public interface TableOperations 
  /**
   * Delete a table
   *
   * @param tableName
   *          the name of the table
   * @throws AccumuloException
   *           if a general error occurs
   * @throws AccumuloSecurityException
   *           if the user does not have permission
   * @throws TableNotFoundException
   *           if the table does not exist
   */
  void delete(String tableName) throws AccumuloException, AccumuloSecurityException, TableNotFoundException;

继承关系简单:

当然就是这个实现类:

package org.apache.accumulo.core.client.impl;

public class TableOperationsImpl extends TableOperationsHelper 
  @Override
  public void delete(String tableName) throws AccumuloException, AccumuloSecurityException, TableNotFoundException 
    checkArgument(tableName != null, "tableName is null");

    List<ByteBuffer> args = Arrays.asList(ByteBuffer.wrap(tableName.getBytes(UTF_8)));
    Map<String,String> opts = new HashMap<>();

    try 
      doTableFateOperation(tableName, TableNotFoundException.class, FateOperation.TABLE_DELETE, args, opts);
     catch (TableExistsException e) 
      // should not happen
      throw new AssertionError(e);
    

  

所以实际入口是这里的doTableFateOperation方法。枚举体的FateOperation.TABLE_DELETE=2。

找到doTableFateOperation方法的调用栈

跳转到:

  private void doTableFateOperation(String tableOrNamespaceName, Class<? extends Exception> namespaceNotFoundExceptionClass, FateOperation op,
      List<ByteBuffer> args, Map<String,String> opts) throws AccumuloSecurityException, AccumuloException, TableExistsException, TableNotFoundException 
    try 
      doFateOperation(op, args, opts, tableOrNamespaceName);
     
  

继续跳转:

  String doFateOperation(FateOperation op, List<ByteBuffer> args, Map<String,String> opts, String tableOrNamespaceName) throws AccumuloSecurityException,
      TableExistsException, TableNotFoundException, AccumuloException, NamespaceExistsException, NamespaceNotFoundException 
    return doFateOperation(op, args, opts, tableOrNamespaceName, true);
  

继续跳转:

  String doFateOperation(FateOperation op, List<ByteBuffer> args, Map<String,String> opts, String tableOrNamespaceName, boolean wait)
      throws AccumuloSecurityException, TableExistsException, TableNotFoundException, AccumuloException, NamespaceExistsException, NamespaceNotFoundException 
    Long opid = null;

    try 
      opid = beginFateOperation();
      executeFateOperation(opid, op, args, opts, !wait);
      if (!wait) 
        opid = null;
        return null;
      
      String ret = waitForFateOperation(opid);
      return ret;
     catch (ThriftSecurityException e) 
      switch (e.getCode()) 
        case TABLE_DOESNT_EXIST:
          throw new TableNotFoundException(null, tableOrNamespaceName, "Target table does not exist");
        case NAMESPACE_DOESNT_EXIST:
          throw new NamespaceNotFoundException(null, tableOrNamespaceName, "Target namespace does not exist");
        default:
          String tableInfo = Tables.getPrintableTableInfoFromName(context.getInstance(), tableOrNamespaceName);
          throw new AccumuloSecurityException(e.user, e.code, tableInfo, e);
      
     catch (ThriftTableOperationException e) 
      switch (e.getType()) 
        case EXISTS:
          throw new TableExistsException(e);
        case NOTFOUND:
          throw new TableNotFoundException(e);
        case NAMESPACE_EXISTS:
          throw new NamespaceExistsException(e);
        case NAMESPACE_NOTFOUND:
          throw new NamespaceNotFoundException(e);
        case OFFLINE:
          throw new TableOfflineException(context.getInstance(), Tables.getTableId(context.getInstance(), tableOrNamespaceName));
        default:
          throw new AccumuloException(e.description, e);
      
     catch (Exception e) 
      throw new AccumuloException(e.getMessage(), e);
     finally 
      Tables.clearCache(context.getInstance());
      // always finish table op, even when exception
      if (opid != null)
        try 
          finishFateOperation(opid);
         catch (Exception e) 
          log.warn(e.getMessage(), e);
        
    
  

在这里可以发现一些奇怪的现象,居然catch了好多Thrift相关的Exception。继续跳转:

  // This method is for retrying in the case of network failures; anything else it passes to the caller to deal with
  private void executeFateOperation(long opid, FateOperation op, List<ByteBuffer> args, Map<String,String> opts, boolean autoCleanUp)
      throws ThriftSecurityException, TException, ThriftTableOperationException 
    while (true) 
      MasterClientService.Iface client = null;
      try 
        client = MasterC

Hive 源码解读 CliDriver HQL 语句拆分

Hive 版本:2.3.4

1. HQL 文件拆分

1.1 入口

从上一篇文章 Hive 源码解读 CliDriver HQL 读取与参数解析 中了解到,如果 Hive CLI 命令行中指定了 -f <query-file> 选项,需要调用 processFile 来执行文件中的一个或者多个 HQL 语句:

try 
  if (ss.fileName != null) 
    return cli.processFile(ss.fileName);
  
 catch (FileNotFoundException e) 
  ...

除了 -f <query-file> 选项需要调用 processFile 处理 HQL 文件之外,-i <filename> 选项也需要:

public void processInitFiles(CliSessionState ss) throws IOException 
    ...
    // 调用 processFile 处理初始文件
    for (String initFile : ss.initFiles) 
      int rc = processFile(initFile);
      ...
    
    ...

1.2 文件处理 processFile

现在具体看一下 Hive 是如何通过 processFile 处理一个 HQL 文件中的语句。processFile 函数根据输入的文件路径名读取文件,实际处理逻辑交由 processReader 函数处理:

public int processFile(String fileName) throws IOException 
    Path path = new Path(fileName);
    FileSystem fs;
    // 绝对路径
    if (!path.toUri().isAbsolute()) 
      fs = FileSystem.getLocal(conf);
      path = fs.makeQualified(path);
     else 
      fs = FileSystem.get(path.toUri(), conf);
    
    BufferedReader bufferReader = null;
    int rc = 0;
    try 
      bufferReader = new BufferedReader(new InputStreamReader(fs.open(path)));
      rc = processReader(bufferReader);
     finally 
      IOUtils.closeStream(bufferReader);
    
    return rc;

我们可以看到 processReader 处理逻辑比较简单,只是去掉注释行,其他的调用 processLine 方法处理:

public int processReader(BufferedReader r) throws IOException 
    String line;
    StringBuilder qsb = new StringBuilder();
    // 一行一行的处理
    while ((line = r.readLine()) != null) 
      // 跳过注释
      if (! line.startsWith("--")) 
        qsb.append(line + "\\n");
      
    
    return (processLine(qsb.toString()));

2. HQL 语句拆分

2.1 入口

除了上述 processFile 最终会调用 processLine 处理之外,hive -e <query-string> 执行查询字符串时也会调用:

CliDriver#executeDriver

if (ss.execString != null) 
    int cmdProcessStatus = cli.processLine(ss.execString);
    return cmdProcessStatus;

除此之外,在交互式模式下从标准化输入中解析出 HQL 语句也会交由 processLine 来执行:

CliDriver#executeDriver

while ((line = reader.readLine(curPrompt + "> ")) != null) 
  ...
  // 直到遇到分号结尾才确定一个一条 HQL 语句的终结
  if (line.trim().endsWith(";") && !line.trim().endsWith("\\\\;")) 
    line = prefix + line;
    // HQL 语句的执行实际上交由 processLine 处理
    ret = cli.processLine(line, true);
    ...
   else 
    ...
  

2.2 行处理 processLine

现在具体看一下 Hive 是如何通过 processLine 处理一个 HQL 语句。

首先第一部分的逻辑是对中断信号的处理。当调用的是 processLine(line, true) 方法时,表示当前作业可以被允许打断。当第一次用户输入 Ctrl+C 时,在交互式界面中输出 Interrupting... Be patient, this might take some time. 表示正在中断中,CLI 线程会中断并杀死正在进行的 MR 作业。当第一次用户输入 Ctrl+C 时,则会退出 JVM:

SignalHandler oldSignal = null;
Signal interruptSignal = null;
if (allowInterrupting) 
  interruptSignal = new Signal("INT");
  oldSignal = Signal.handle(interruptSignal, new SignalHandler() 
    private boolean interruptRequested;
    @Override
    public void handle(Signal signal) 
      boolean initialRequest = !interruptRequested;
      interruptRequested = true;
      if (!initialRequest) 
        // 第二次 ctrl+c 退出 JVM
        console.printInfo("Exiting the JVM");
        System.exit(127);
      
      // 第一次 Ctrl+C 中断 CLI 线程、停止当前语句并杀死正在进行的 MR 作业
      console.printInfo("Interrupting... Be patient, this might take some time.");
      console.printInfo("Press Ctrl+C again to kill JVM");
      HadoopJobExecHelper.killRunningJobs();
      TezJobExecHelper.killRunningJobs();
      HiveInterruptUtils.interrupt();
    
  );

第二部分逻辑是对 HQL 语句进行拆分:

// 拆分为不同的 HQL 命令
List<String> commands = splitSemiColon(line);
String command = "";
for (String oneCmd : commands) 
  if (StringUtils.endsWith(oneCmd, "\\\\")) 
    command += StringUtils.chop(oneCmd) + ";";
    continue;
   else 
    command += oneCmd;
  
  if (StringUtils.isBlank(command)) 
    continue;
  
  // 交由 processCmd 处理命令
  ret = processCmd(command);
  command = "";
  lastRet = ret;
  boolean ignoreErrors = HiveConf.getBoolVar(conf, HiveConf.ConfVars.CLIIGNOREERRORS);
  if (ret != 0 && !ignoreErrors) 
    CommandProcessorFactory.clean((HiveConf) conf);
    return ret;
  

上游处理时只是针对每行末尾是分号 ; 时才调用 processLine 进行处理,所以一行中可能存在多条 HQL 命令,如下所示一行中就存在两条 HQL 命令:

hive > USE default;SELECT concat(uid, ";") FROM behavior LIMIT 1;

首先第一步就是将 HQL 语句拆分为不同的 HQL 命令,需要注意的是不能使用 split 函数直接根据 ; 进行拆分,因为 HQL 语句中可能存在 ;,例如上述语句中的第二个命令,所以需要单独调用 splitSemiColon 方法进行处理。最后将拆分后的 HQL 命令交由 processCmd 方法执行。

以上是关于从Hive源码解读大数据开发为什么可以脱离SQLJavaScala的主要内容,如果未能解决你的问题,请参考以下文章

Hive 源码解读 CliDriver HQL 语句拆分

Hive 源码解读 CliDriver HQL 语句拆分

Hive 源码解读 CliDriver HQL 语句拆分

Hive 源码解读 Driver 语义分析器 SemanticAnalyzer

Hive 源码解读 Driver 语义分析器 SemanticAnalyzer

Hive 源码解读 Driver 语义分析器 SemanticAnalyzer