使用Spark实现推主机群Hive数据到租户集群Hive的高性能Hive2Hive数据集成Java需编写JDBC连接Hive解析元数据
Posted 虎鲸不是鱼
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了使用Spark实现推主机群Hive数据到租户集群Hive的高性能Hive2Hive数据集成Java需编写JDBC连接Hive解析元数据相关的知识,希望对你有一定的参考价值。
使用Spark实现推主机群Hive数据到租户集群Hive的高性能Hive2Hive数据集成【Java】需编写JDBC连接Hive解析元数据
背景
之前有介绍过一种Hive2Hive的方式:https://lizhiyong.blog.csdn.net/article/details/124575115
走JDBC一定会在租户集群跑Map Reduce或者Hive On Tez任务,性能极烂,从主机群给租户集群推数据这种大规模应用的场景显然是不合适的。
一种可以实现Hive数据跨集群推送的方式就是直接推送数据文件到租户集群的HDFS路径。我们之前有强制规定必须全部使用Parquet格式,故本文主要介绍适用于Parquet的方式【在生产环境验证可行,可以抗300T/d的流量】。当然Orc和TextFile格式也可能适用【生产环境出现非Parquet格式的文件时我们不保证数据准确性】。
生产级代码是不可能公之于众的,笔者只上Demo级代码,抛砖引玉。有了方案,代码还是要自己去写!!!笔者曾经也是一行一行地将Java代码撸出来,一步一步调好的,伸手党自觉点。。。
当然本文也不适合只知道spark.sql这种水平的肤浅SQL Boy们,SQL在这方面除了解析元数据有点用,真就一无是处了。并且解析元数据或者alter表的操作也不是非得要SQL,可以使用MetaStore的API,可以直接动Hive存储在mysql的元数据,办法多的是。。一定要有Java和Hadoop基础再往下看。。。
解析Hive在HDFS的路径
话不多说,上代码:
package com.zhiyong;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.LinkedList;
public class HiveJdbcPara
public static void main(String[] args) throws Exception
Class.forName("org.apache.hive.jdbc.HiveDriver");
Connection connection = DriverManager
.getConnection("jdbc:hive2://192.168.88.11:10000/default",
"root",
"123456");//Kerberos方式只需要一个jdbc地址即可
//Kerberos方式需要使用Hive的连通性测试代码,jdbc地址与其相同
LinkedList<String> linkedList = new LinkedList<>();//存储结果
String db_name = "aaa";//库名
String tb_name = "testpar1";//表名
Statement stmt = connection.createStatement();
String sql1 = "show databases";
ResultSet rsSet1 = stmt.executeQuery(sql1);//查表
while (rsSet1.next()) //遍历
String col1 = rsSet1.getString(1);
System.out.println("dbName = " + col1);//显示
String sql2 = "use " + db_name;
stmt.execute(sql2);//切换为aaa数据库
String sql3 = "show tables";
ResultSet rsSet2 = stmt.executeQuery(sql3);//查表
while (rsSet2.next()) //遍历
String col1 = rsSet2.getString(1);
System.out.println(db_name + ".tbName = " + col1);//显示
System.out.println("************************************\\n\\n\\n");
String sql4 = "describe formatted " + db_name + "." + tb_name;//查看表格式(这里改成要查的库名.表名)
ResultSet rsSet4 = stmt.executeQuery(sql4);
String savType = "";//存储模式:textfile、orc、parquet
String splitChar = ",";//分隔符:"\\t",",","xxx"
String hdfsFullPath = "";//hdfs全路径
String hdfsPathURL = "";//hdfs路径
String hiveWareDir = "";//hive仓库路径
int counter = 0;
while (rsSet4.next())
String col_name = rsSet4.getString(1);//第一列
String data_type = rsSet4.getString(2);//第2列
String comment = rsSet4.getString(3);//第3列
System.out.println(col_name + "->" + data_type + "->" + comment + "->");
linkedList.add(col_name + "->" + data_type + "->" + comment + "->");
if (col_name.equals("Location: "))
if (hdfsFullPath.length() == 0)
hdfsFullPath = data_type;//获取到存储的hdfs的全路径
else if (hdfsFullPath.length() > data_type.length())
hdfsFullPath = data_type;//获取到存储的hdfs的全路径
System.out.println("存储位置:" + data_type);
//例如:hdfs://node1:8020/user/hive/warehouse/aaa.db/testpar1
//分区表可能有多个值,如:
//hdfs://node1:8020/user/hive/warehouse/aaa.db/testpar1/day=20211017/country=cn
//hdfs://node1:8020/user/hive/warehouse/aaa.db/testpar1/day=20211018/country=china
//需要取最短的
if (col_name.equals("InputFormat: "))
switch (data_type)
case "org.apache.hadoop.mapred.TextInputFormat":
savType = "textfile";
break;
case "org.apache.hadoop.hive.ql.io.orc.OrcInputFormat":
savType = "orc";
break;
case "org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat":
savType = "parquet";
break;
default:
savType = "error";
break;
System.out.println("输入格式:" + data_type + "->" + savType);
counter++;
System.out.println(counter);
for (int i = 0; i < linkedList.size(); i++)
String cell = linkedList.get(i);
if (cell.contains("serialization.format"))
String[] split = cell.split("->");
splitChar = split[2].trim();//获取到分隔符
System.out.println(splitChar);
int index = 0;//脚标计数器
char[] chars = hdfsFullPath.toCharArray();
for (int i = 0; i < chars.length; i++)
char aChar = chars[i];
if (aChar == '/')
index++;
System.out.println(i);
if (index == 2)
hdfsPathURL = hdfsFullPath.substring(0, i + 1);
String[] split1 = hdfsFullPath.split(".db");
String[] split2 = split1[0].split(hdfsPathURL + "/");
String[] split3 = split2[1].split("/" + db_name);
hiveWareDir = split3[0];
System.out.println(split3[0]);
stmt.close();//关流
connection.close();//关流
System.out.println("存储格式:" + savType);
System.out.println("分隔符:" + splitChar);
System.out.println("全路径:" + hdfsFullPath);
System.out.println("hdfsURL:" + hdfsPathURL);
System.out.println("hive仓库路径:" + hiveWareDir);
于是解析到了Hive在HDFS的路径信息及分隔符、存储格式等信息。TextFile格式用得到分隔符这种信息,不解释。Parquet和Orc则用不上分隔符的信息。
出于种种原因,我们强制规定库名、表名、字段名等都必须是纯小写,严禁有大小驼峰等骚操作,如果胆敢不按照规定,我们的平台不保证数据准确性。纯小写的好处不言而喻,尤其是对于平台组件二开人员来说,可以省去不少的麻烦,这点懂得都懂。
此Demo不适用于解析default库下Hive表的元数据,生产级代码当然是支持的。。。
写文件
获取文件
df.write().partitionBy("parcol1","parcol2").parquet("hdfs://node1:8020/aaa");
df.write().partitionBy("parcol1","parcol2").orc("hdfs://zhiyong-1/aaa");
df.write().partitionBy("parcol1","parcol2").text("hdfs://zhiyong-2/aaa");
这种方式即可简单高效地将数据做partitionBy,并且在HDFS的路径下生成snappy压缩的Parquet文件块。明眼人都能看出来,一定是不能直接写到Hive表的路径下,否则会把Hive在HDFS的路径直接干翻!!!这种方式获取到的Parquet文件块已经按照分区字段做了PartitionBy,已经按照文件夹放置整齐了,后续操作很是方便,可以实现多级分区,比阿里云DataPhin只支持单级分区要先进很多。
由于是跨集群,运维打通Kerberos的跨域跨集群认证后,Spark就可以直接将Parquet文件写到租户集群。
平台组件二开是个高危岗位,如果胆敢出现这种生产事故就会被开除!!!能理解吧?
写到HDFS的某个缓存路径后,才完成了万里长征的一小步!!!
由于众所周知的原因,Spark为了实现更快的写入,并行度会很高【默认200】,容易出现大量的小文件,对HDFS来说是致命的严重问题!!!故还需要做小文件合并的操作。
小文件合并
越底层的方式性能越好,平台组件开发最关心的就是性能问题。高效的方式是使用HDFS的API,遍历这些Parquet文件块,将<某个阈值的小文件们集中到另一个缓存路径【使用rename】,Spark将这些Parquet文件再次读一遍,并且根据某种算法确定Partition个数,进行打散:
/**
* Returns a new Dataset that has exactly `numPartitions` partitions, when the fewer partitions
* are requested. If a larger number of partitions is requested, it will stay at the current
* number of partitions. Similar to coalesce defined on an `RDD`, this operation results in
* a narrow dependency, e.g. if you go from 1000 partitions to 100 partitions, there will not
* be a shuffle, instead each of the 100 new partitions will claim 10 of the current partitions.
*
* However, if you're doing a drastic coalesce, e.g. to numPartitions = 1,
* this may result in your computation taking place on fewer nodes than
* you like (e.g. one node in the case of numPartitions = 1). To avoid this,
* you can call repartition. This will add a shuffle step, but means the
* current upstream partitions will be executed in parallel (per whatever
* the current partitioning is).
*
* @group typedrel
* @since 1.6.0
*/
def coalesce(numPartitions: Int): Dataset[T] = withTypedPlan
Repartition(numPartitions, shuffle = false, logicalPlan)
当然是要使用coalesce来杜绝Shuffle。此处不得不点名批评肤浅的SQL Boy们。。。平台组件二开的攻城狮们辛辛苦苦做优化,SQL Boy们各种Join尽可能多地去Shuffle并且丝毫不管小文件问题,导致集群性能一塌糊涂,也是服气。
合并完成后,小文件个数一定会大幅度缩减,此时搬回到之前的缓存路径,获取到GA的Parquet文件,就可以考虑将数据写入到Hive。
2PC提交
先要JDBC执行:
alter table 库名.表名 drop partition parcol1=v1;
这一步只需要处理一级分区。为了解决Hive3.1.3000-297的ACID问题,我们强制规定统一使用external外部表,否则不保证平台功能正常或者数据准确。这种操作其实只动了元数据,底层HDFS路径没什么变化。接下来就是delete掉相关的路径。
在delete完成后,即可将之前完成了小文件合并的Parquet文件从缓存路径rename到Hive表在HDFS的路径,完成文件搬运的操作。
搬运完成后,还需要做元数据修正:
alter table 库名.表名 add partition (parcol1=v1,parcol2=v2);
msck repair table 库名.表名;
这2种方式各有利弊。性能好的局限性大,代码量当然更不会少。性能差的适用范围广,代码量还更少。这就是个仁者见仁智者见智的问题了。选用哪种方式,看具体场景和开发者的功力,这都是个可以讲很久的庞大问题。
完成这一步元数据修复的操作后,正常情况就可以在Hive中查看到数据。生产环境的坑还是很多的,当然一切都不会像想象中的那么美好,不过一切也都不会像想象中的那么糟糕。不断的出现问题并不怕,怕的是攻城狮们定位不到问题或者技术上、方案上解决不了问题。攻城狮存在的意义就是解决问题。
尾言
可能对于肤浅的SQL Boy们来说,跨集群Hive2Hive就是简单地手动建好表再去搞个Distcp脚本搬文件就行【貌似只有平台和运维才会考虑自动化,SQL Boy们热衷于堆人力来徒手写脚本】。也只有做历史数据同步、集群迁移、数据比对时用得上,更多的时候是运维、网管负责,当然不能怪SQL Boy们只会SQL。。。但是只会SQL注定在大数据开发这条路是走不远的,可能很多人也并没有想走多远。
SQL的局限性不止于不能跨机器跨集群,在面对异构数据源、文件处理等操作时也是相当吃力。虽然Calcite、CataLyst等组件有CBO、RBO等优化器,可能SQL的性能大部分场景还可以接受,但是遇到严重的性能问题时,SQL调优并不见得比DSL方式下API调优容易或有效。面对小文件等调用API极其容易解决的问题,纯SQL表现同样乏力,自由度差了也不是一点半点。只能在条条框框中做事,必然摆脱不掉严重的局限性。。。
顺带提及下,目前已经有专业的数据集成组件SeaTunnel:http://seatunnel.incubator.apache.org/
阿里云DataPhin从Log上看,可能是基于Kettle或者Dlink实现的数据集成。
为何还需要自行实现数据集成的功能?单纯为了性能更好,也方便对依赖的POM做统一的规划与管理。花费不小的人力物力财力去重复造轮子【美其名曰自研】,当然有充足的理由。
平台组件二开这种高危岗位可不像SQL岗位是个人都能写几下,想要活过试用期并且不被开除还是要有两把刷子的。。。多说无益,仅以此纪念该功能提测上线1周年。
转载请注明出处:https://lizhiyong.blog.csdn.net/article/details/127680034
以上是关于使用Spark实现推主机群Hive数据到租户集群Hive的高性能Hive2Hive数据集成Java需编写JDBC连接Hive解析元数据的主要内容,如果未能解决你的问题,请参考以下文章
二次开发Spark实现JDBC读取远程租户集群Hive数据并落地到本集群Hive的Hive2Hive数据集成Java