使用Spark实现推主机群Hive数据到租户集群Hive的高性能Hive2Hive数据集成Java需编写JDBC连接Hive解析元数据

Posted 虎鲸不是鱼

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了使用Spark实现推主机群Hive数据到租户集群Hive的高性能Hive2Hive数据集成Java需编写JDBC连接Hive解析元数据相关的知识,希望对你有一定的参考价值。

使用Spark实现推主机群Hive数据到租户集群Hive的高性能Hive2Hive数据集成【Java】需编写JDBC连接Hive解析元数据

背景

之前有介绍过一种Hive2Hive的方式:https://lizhiyong.blog.csdn.net/article/details/124575115

走JDBC一定会在租户集群跑Map Reduce或者Hive On Tez任务,性能极烂,从主机群给租户集群推数据这种大规模应用的场景显然是不合适的。

一种可以实现Hive数据跨集群推送的方式就是直接推送数据文件到租户集群的HDFS路径。我们之前有强制规定必须全部使用Parquet格式,故本文主要介绍适用于Parquet的方式【在生产环境验证可行,可以抗300T/d的流量】。当然Orc和TextFile格式也可能适用【生产环境出现非Parquet格式的文件时我们不保证数据准确性】。

生产级代码是不可能公之于众的,笔者只上Demo级代码,抛砖引玉。有了方案,代码还是要自己去写!!!笔者曾经也是一行一行地将Java代码撸出来,一步一步调好的,伸手党自觉点。。。

当然本文也不适合只知道spark.sql这种水平的肤浅SQL Boy们,SQL在这方面除了解析元数据有点用,真就一无是处了。并且解析元数据或者alter表的操作也不是非得要SQL,可以使用MetaStore的API,可以直接动Hive存储在mysql的元数据,办法多的是。。一定要有Java和Hadoop基础再往下看。。。

解析Hive在HDFS的路径

话不多说,上代码:

package com.zhiyong;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.LinkedList;

public class HiveJdbcPara 
    public static void main(String[] args) throws Exception 
        Class.forName("org.apache.hive.jdbc.HiveDriver");
        Connection connection = DriverManager
                .getConnection("jdbc:hive2://192.168.88.11:10000/default",
                        "root",
                        "123456");//Kerberos方式只需要一个jdbc地址即可
        //Kerberos方式需要使用Hive的连通性测试代码,jdbc地址与其相同
        LinkedList<String> linkedList = new LinkedList<>();//存储结果
        String db_name = "aaa";//库名
        String tb_name = "testpar1";//表名
        Statement stmt = connection.createStatement();
        String sql1 = "show databases";
        ResultSet rsSet1 = stmt.executeQuery(sql1);//查表
        while (rsSet1.next()) //遍历
            String col1 = rsSet1.getString(1);
            System.out.println("dbName = " + col1);//显示
        

        String sql2 = "use " + db_name;
        stmt.execute(sql2);//切换为aaa数据库

        String sql3 = "show tables";
        ResultSet rsSet2 = stmt.executeQuery(sql3);//查表
        while (rsSet2.next()) //遍历
            String col1 = rsSet2.getString(1);
            System.out.println(db_name + ".tbName = " + col1);//显示
        

        System.out.println("************************************\\n\\n\\n");

        String sql4 = "describe formatted " + db_name + "." + tb_name;//查看表格式(这里改成要查的库名.表名)
        ResultSet rsSet4 = stmt.executeQuery(sql4);
        String savType = "";//存储模式:textfile、orc、parquet
        String splitChar = ",";//分隔符:"\\t",",","xxx"
        String hdfsFullPath = "";//hdfs全路径
        String hdfsPathURL = "";//hdfs路径
        String hiveWareDir = "";//hive仓库路径

        int counter = 0;
        while (rsSet4.next()) 
            String col_name = rsSet4.getString(1);//第一列
            String data_type = rsSet4.getString(2);//第2列
            String comment = rsSet4.getString(3);//第3列
            System.out.println(col_name + "->" + data_type + "->" + comment + "->");
            linkedList.add(col_name + "->" + data_type + "->" + comment + "->");

            if (col_name.equals("Location:           ")) 
                if (hdfsFullPath.length() == 0) 
                    hdfsFullPath = data_type;//获取到存储的hdfs的全路径
                 else if (hdfsFullPath.length() > data_type.length()) 
                    hdfsFullPath = data_type;//获取到存储的hdfs的全路径
                
                System.out.println("存储位置:" + data_type);
                //例如:hdfs://node1:8020/user/hive/warehouse/aaa.db/testpar1
                //分区表可能有多个值,如:
                //hdfs://node1:8020/user/hive/warehouse/aaa.db/testpar1/day=20211017/country=cn
                //hdfs://node1:8020/user/hive/warehouse/aaa.db/testpar1/day=20211018/country=china
                //需要取最短的
            

            if (col_name.equals("InputFormat:        ")) 
                switch (data_type) 
                    case "org.apache.hadoop.mapred.TextInputFormat":
                        savType = "textfile";
                        break;
                    case "org.apache.hadoop.hive.ql.io.orc.OrcInputFormat":
                        savType = "orc";
                        break;
                    case "org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat":
                        savType = "parquet";
                        break;
                    default:
                        savType = "error";
                        break;
                
                System.out.println("输入格式:" + data_type + "->" + savType);
            

            counter++;

        


        System.out.println(counter);

        for (int i = 0; i < linkedList.size(); i++) 
            String cell = linkedList.get(i);
            if (cell.contains("serialization.format")) 
                String[] split = cell.split("->");
                splitChar = split[2].trim();//获取到分隔符
                System.out.println(splitChar);
            

        

        int index = 0;//脚标计数器
        char[] chars = hdfsFullPath.toCharArray();
        for (int i = 0; i < chars.length; i++) 
            char aChar = chars[i];
            if (aChar == '/') 
                index++;
                System.out.println(i);
            
            if (index == 2) 
                hdfsPathURL = hdfsFullPath.substring(0, i + 1);
            
        

        String[] split1 = hdfsFullPath.split(".db");
        String[] split2 = split1[0].split(hdfsPathURL + "/");
        String[] split3 = split2[1].split("/" + db_name);
        hiveWareDir = split3[0];
        System.out.println(split3[0]);


        stmt.close();//关流
        connection.close();//关流

        System.out.println("存储格式:" + savType);
        System.out.println("分隔符:" + splitChar);
        System.out.println("全路径:" + hdfsFullPath);
        System.out.println("hdfsURL:" + hdfsPathURL);
        System.out.println("hive仓库路径:" + hiveWareDir);

    

于是解析到了Hive在HDFS的路径信息及分隔符、存储格式等信息。TextFile格式用得到分隔符这种信息,不解释。Parquet和Orc则用不上分隔符的信息。

出于种种原因,我们强制规定库名、表名、字段名等都必须是纯小写,严禁有大小驼峰等骚操作,如果胆敢不按照规定,我们的平台不保证数据准确性。纯小写的好处不言而喻,尤其是对于平台组件二开人员来说,可以省去不少的麻烦,这点懂得都懂。

此Demo不适用于解析default库下Hive表的元数据,生产级代码当然是支持的。。。

写文件

获取文件

df.write().partitionBy("parcol1","parcol2").parquet("hdfs://node1:8020/aaa");
df.write().partitionBy("parcol1","parcol2").orc("hdfs://zhiyong-1/aaa");
df.write().partitionBy("parcol1","parcol2").text("hdfs://zhiyong-2/aaa");

这种方式即可简单高效地将数据做partitionBy,并且在HDFS的路径下生成snappy压缩的Parquet文件块。明眼人都能看出来,一定是不能直接写到Hive表的路径下,否则会把Hive在HDFS的路径直接干翻!!!这种方式获取到的Parquet文件块已经按照分区字段做了PartitionBy,已经按照文件夹放置整齐了,后续操作很是方便,可以实现多级分区,比阿里云DataPhin只支持单级分区要先进很多。

由于是跨集群,运维打通Kerberos的跨域跨集群认证后,Spark就可以直接将Parquet文件写到租户集群。

平台组件二开是个高危岗位,如果胆敢出现这种生产事故就会被开除!!!能理解吧?

写到HDFS的某个缓存路径后,才完成了万里长征的一小步!!!

由于众所周知的原因,Spark为了实现更快的写入,并行度会很高【默认200】,容易出现大量的小文件,对HDFS来说是致命的严重问题!!!故还需要做小文件合并的操作。

小文件合并

越底层的方式性能越好,平台组件开发最关心的就是性能问题。高效的方式是使用HDFS的API,遍历这些Parquet文件块,将<某个阈值的小文件们集中到另一个缓存路径【使用rename】,Spark将这些Parquet文件再次读一遍,并且根据某种算法确定Partition个数,进行打散:

  /**
   * Returns a new Dataset that has exactly `numPartitions` partitions, when the fewer partitions
   * are requested. If a larger number of partitions is requested, it will stay at the current
   * number of partitions. Similar to coalesce defined on an `RDD`, this operation results in
   * a narrow dependency, e.g. if you go from 1000 partitions to 100 partitions, there will not
   * be a shuffle, instead each of the 100 new partitions will claim 10 of the current partitions.
   *
   * However, if you're doing a drastic coalesce, e.g. to numPartitions = 1,
   * this may result in your computation taking place on fewer nodes than
   * you like (e.g. one node in the case of numPartitions = 1). To avoid this,
   * you can call repartition. This will add a shuffle step, but means the
   * current upstream partitions will be executed in parallel (per whatever
   * the current partitioning is).
   *
   * @group typedrel
   * @since 1.6.0
   */
  def coalesce(numPartitions: Int): Dataset[T] = withTypedPlan 
    Repartition(numPartitions, shuffle = false, logicalPlan)
  

当然是要使用coalesce来杜绝Shuffle。此处不得不点名批评肤浅的SQL Boy们。。。平台组件二开的攻城狮们辛辛苦苦做优化,SQL Boy们各种Join尽可能多地去Shuffle并且丝毫不管小文件问题,导致集群性能一塌糊涂,也是服气。

合并完成后,小文件个数一定会大幅度缩减,此时搬回到之前的缓存路径,获取到GA的Parquet文件,就可以考虑将数据写入到Hive。

2PC提交

先要JDBC执行:

alter table 库名.表名 drop partition parcol1=v1;

这一步只需要处理一级分区。为了解决Hive3.1.3000-297的ACID问题,我们强制规定统一使用external外部表,否则不保证平台功能正常或者数据准确。这种操作其实只动了元数据,底层HDFS路径没什么变化。接下来就是delete掉相关的路径。

在delete完成后,即可将之前完成了小文件合并的Parquet文件从缓存路径rename到Hive表在HDFS的路径,完成文件搬运的操作。

搬运完成后,还需要做元数据修正:

alter table 库名.表名 add partition (parcol1=v1,parcol2=v2);
msck repair table 库名.表名;

这2种方式各有利弊。性能好的局限性大,代码量当然更不会少。性能差的适用范围广,代码量还更少。这就是个仁者见仁智者见智的问题了。选用哪种方式,看具体场景和开发者的功力,这都是个可以讲很久的庞大问题。

完成这一步元数据修复的操作后,正常情况就可以在Hive中查看到数据。生产环境的坑还是很多的,当然一切都不会像想象中的那么美好,不过一切也都不会像想象中的那么糟糕。不断的出现问题并不怕,怕的是攻城狮们定位不到问题或者技术上、方案上解决不了问题。攻城狮存在的意义就是解决问题。

尾言

可能对于肤浅的SQL Boy们来说,跨集群Hive2Hive就是简单地手动建好表再去搞个Distcp脚本搬文件就行【貌似只有平台和运维才会考虑自动化,SQL Boy们热衷于堆人力来徒手写脚本】。也只有做历史数据同步、集群迁移、数据比对时用得上,更多的时候是运维、网管负责,当然不能怪SQL Boy们只会SQL。。。但是只会SQL注定在大数据开发这条路是走不远的,可能很多人也并没有想走多远。

SQL的局限性不止于不能跨机器跨集群,在面对异构数据源、文件处理等操作时也是相当吃力。虽然Calcite、CataLyst等组件有CBO、RBO等优化器,可能SQL的性能大部分场景还可以接受,但是遇到严重的性能问题时,SQL调优并不见得比DSL方式下API调优容易或有效。面对小文件等调用API极其容易解决的问题,纯SQL表现同样乏力,自由度差了也不是一点半点。只能在条条框框中做事,必然摆脱不掉严重的局限性。。。

顺带提及下,目前已经有专业的数据集成组件SeaTunnel:http://seatunnel.incubator.apache.org/

阿里云DataPhin从Log上看,可能是基于Kettle或者Dlink实现的数据集成。

为何还需要自行实现数据集成的功能?单纯为了性能更好,也方便对依赖的POM做统一的规划与管理。花费不小的人力物力财力去重复造轮子【美其名曰自研】,当然有充足的理由。

平台组件二开这种高危岗位可不像SQL岗位是个人都能写几下,想要活过试用期并且不被开除还是要有两把刷子的。。。多说无益,仅以此纪念该功能提测上线1周年。

转载请注明出处:https://lizhiyong.blog.csdn.net/article/details/127680034

以上是关于使用Spark实现推主机群Hive数据到租户集群Hive的高性能Hive2Hive数据集成Java需编写JDBC连接Hive解析元数据的主要内容,如果未能解决你的问题,请参考以下文章

二次开发Spark实现JDBC读取远程租户集群Hive数据并落地到本集群Hive的Hive2Hive数据集成Java

二次开发Spark实现JDBC读取远程租户集群Hive数据并落地到本集群Hive的Hive2Hive数据集成Java

使用 Spark 查询位于远程集群上的 Hive 数据

hive on spark僵死问题分析

Spark & Hive 云原生改造在智领云的应用

hadoop集群搭建(Hadoop 3.1.3 /Hive 3.1.2/Spark 3.0.0)