导入HDFS的数据到Hive

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了导入HDFS的数据到Hive相关的知识,希望对你有一定的参考价值。

1. 通过Hive view

CREATE EXTERNAL TABLE if not exists finance.json_serde_optd_table (
  retCode string,
  retMsg string,
  data array<struct< secid:string,="" tradedate:date,="" optid:string,="" ticker:string,="" secshortname:string,="" exchangecd:string,="" presettleprice:double,="" precloseprice:double,="" openprice:double,="" highestprice:double,="" lowestprice:double,="" closeprice:double,="" settlprice:double,="" turnovervol:double,="" turnovervalue:double,="" openint:int="">>)
ROW FORMAT SERDE ‘org.apache.hive.hcatalog.data.JsonSerDe‘
LOCATION ‘hdfs://wdp.xxxxx.cn:8020/nifi/finance1/optd/‘;
create table if not exists finance.tb_optd
as
SELECT b.data.secID,
		b.data.tradeDate,
		b.data.optID,
		b.data.ticker,
		b.data.secShortName,
		b.data.exchangeCD,
		b.data.preSettlePrice,
		b.data.preClosePrice,
		b.data.openPrice,
		b.data.highestPrice,
		b.data.lowestPrice,
		b.data.closePrice,
		b.data.settlPrice,
		b.data.turnoverVol,
		b.data.turnoverValue,
		b.data.openInt
FROM finance.json_serde_optd_table LATERAL VIEW explode(json_serde_optd_table.data) b AS data;
?

2. 通过Zeppelin

?

%dep
z.load("/usr/hdp/2.4.2.0-258/hive-hcatalog/share/hcatalog/hive-hcatalog-core.jar");

?

// 定义导入的hive对象集合

case class HiveConfig(database: String, modelName: String, hdfsPath: String, schema: String, schema_tb: String);
var hiveConfigList = List[HiveConfig]();
?
// 创建equd数据结构
// 定义json结构
val schema_json_equd_serde ="""  retCode string,
                              retMsg string,
                              data array<struct< secid="" :="" string,="" tradedate="" date,="" ticker="" secshortname="" exchangecd="" precloseprice="" double,="" actprecloseprice:="" openprice="" highestprice="" lowestprice="" closeprice="" turnovervol="" turnovervalue="" dealamount="" int,="" turnoverrate="" accumadjfactor="" negmarketvalue="" marketvalue="" pe="" pe1="" pb="" isopen="" int="">>""";
var schema_equd ="""b.data.secID,
            		b.data.ticker,
            		b.data.secShortName,
            		b.data.exchangeCD,
            		b.data.tradeDate,
            		b.data.preClosePrice,
            		b.data.actPreClosePrice,
            		b.data.openPrice,
            		b.data.highestPrice,
            		b.data.lowestPrice,
            		b.data.closePrice,
            		b.data.turnoverVol,
            		b.data.turnoverValue,
            		b.data.dealAmount,
            		b.data.turnoverRate,
            		b.data.accumAdjFactor,
            		b.data.negMarketValue,
            		b.data.marketValue,
            		b.data.PE,
            		b.data.PE1,
            		b.data.PB,
            		b.data.isOpen""";
hiveConfigList  = hiveConfigList :+ HiveConfig("finance", "equd", "hdfs://wdp.xxxxx.cn:8020/nifi/finance1/", schema_json_equd_serde, schema_equd);

?

// 创建idxd数据结构
// 定义json结构
val schema_json_idxd_serde ="""  retCode string,
                              retMsg string,
                              data array<struct< indexid:string,="" tradedate:date,="" ticker:string,="" porgfullname:string,="" secshortname:string,="" exchangecd:string,="" precloseindex:double,="" openindex:double,="" lowestindex:double,="" highestindex:double,="" closeindex:double,="" turnovervol:double,="" turnovervalue:double,="" chg:double,="" chgpct:double="">>""";
var schema_idxd ="""b.data.indexID,
            		b.data.tradeDate,
            		b.data.ticker,
            		b.data.porgFullName,
            		b.data.secShortName,
            		b.data.exchangeCD,
            		b.data.preCloseIndex,
            		b.data.openIndex,
            		b.data.lowestIndex,
            		b.data.highestIndex,
            		b.data.closeIndex,
            		b.data.turnoverVol,
            		b.data.turnoverValue,
            		b.data.CHG,
            		b.data.CHGPct""";
hiveConfigList = hiveConfigList :+ HiveConfig("finance", "idxd", "hdfs://wdp.xxxxx.cn:8020/nifi/finance1/", schema_json_idxd_serde, schema_idxd);

?

// 循环加载数据中
  def loadDataToHive(args:HiveConfig){
    val loadPath = args.hdfsPath + args.modelName;
    val tb_json_serde = "json_serde_" + args.modelName +"_table";
    val tb= "tb_" + args.modelName;
    val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
    if(args.database != "" && args.schema != "") {
        print("正在创建项目..." + args.modelName)
        hiveContext.sql("CREATE DATABASE IF NOT EXISTS " + args.database);
        print("正在构造扩展模型...");
        hiveContext.sql("CREATE TABLE IF NOT EXISTS " + args.database + "." + tb_json_serde + "(" + args.schema + ") row format serde ‘org.apache.hive.hcatalog.data.JsonSerDe‘ LOCATION " + "‘" + loadPath + "/‘");
        println("CREATE TABLE IF NOT EXISTS " + args.database + "." + tb + " as select " + args.schema_tb + " from " + args.database + "." + tb_json_serde + " LATERAL VIEW explode(" + tb_json_serde + ".data) b AS data");
        hiveContext.sql("CREATE TABLE IF NOT EXISTS " + args.database + "." + tb + " as select " + args.schema_tb + " from " + args.database + "." + tb_json_serde + " LATERAL VIEW explode(" + tb_json_serde + ".data) b AS data");
        println(args.modelName + " 扩展模型加载已完成!");
    }
  }
  hiveConfigList.size;
  hiveConfigList.foreach { x => loadDataToHive(x) };

?

?

?

以上是关于导入HDFS的数据到Hive的主要内容,如果未能解决你的问题,请参考以下文章

mysql导入到hive hdfs上显示又数据了 hive表里面啥都没有

导入HDFS的数据到Hive

导入HDFS的数据到Hive

sqoop命令,oracle导入到hdfs、hbase、hive

如何周期性把每天日志导入hive

第3节 sqoop:4sqoop的数据导入之导入数据到hdfs和导入数据到hive表