hive基础2

Posted star521

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了hive基础2相关的知识,希望对你有一定的参考价值。

RDBMS

OLTP.
relation database management system,关系型数据库管理系统。
支持事务(acid)
延迟低
安全行

V

variaty : 多样性。

hive

mr,sql
开发效率高。
数据仓库。
数据库:            //OLTP
OLTP            //online transaction process,
OLAP            //online analyze process,在线分析处理 , 很多分析函数
                //rank | lag | lead | .. | cube | rollup | grouping sets
                //grouping sets
                //group by 多次shuffle。

数据

hive元数据     //表结构信息存在关系型数据库中derby。

1.初始化hive
    修改hive-site.xml
    添加驱动到hivelib下
      <property>
        <name>javax.jdo.option.ConnectionDriverName</name>
        <value>com.mysql.jdbc.Driver</value>
        <description>Driver class name for a JDBC metastore</description>
      </property>
      <property>
            <name>javax.jdo.option.ConnectionURL</name>
            <value>jdbc:mysql://192.168.231.1:3306/big11_hive</value>
            <description>
              JDBC connect string for a JDBC metastore.
              To use SSL to encrypt/authenticate the connection, provide database-specific SSL flag in the connection URL.
              For example, jdbc:postgresql://myhost/db?ssl=true for postgres database.
            </description>
      </property>
      <property>
        <name>javax.jdo.option.ConnectionUserName</name>
        <value>root</value>
        <description>Username to use against metastore database</description>
      </property>
      <property>
        <name>javax.jdo.option.ConnectionPassword</name>
        <value>root</value>
        <description>password to use against metastore database</description>
      </property>
      <property>
        <name>hive.server2.enable.doAs</name>
        <value>false</value>
        <description>
          Setting this property to true will have HiveServer2 execute
          Hive operations as the user making the calls to it.
        </description>
      </property>


查看日志
------------
    /tmp/

2.登录hive
    $>
3.

hive数据类型

1.简单类型
    int 
    string
    bigint
    float
    ...

2.复杂类型
    array
    map
    struct
    named struct
    union

复杂类型

Michael|Montreal,Toronto|Male,30|DB:80|Product:Developer^DLead
Will|Montreal|Male,35|Perl:85|Product:Lead,Test:Lead
Shelley|New York|Female,27|Python:80|Test:Lead,COE:Architect
Lucy|Vancouver|Female,57|Sales:89,HR:94|Sales:Lead

CREATE TABLE employee
(
name string,
arr ARRAY<string>,
stru STRUCT<sex:string,age:int>,
map1 MAP<string,int>,
map2 MAP<string,ARRAY<string>>
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ‘|‘
COLLECTION ITEMS TERMINATED BY ‘,‘
MAP KEYS TERMINATED BY ‘:‘
lines terminated by ‘
‘;

加载数据到hive

load data local inpath ‘/home/centos/emp.txt‘ into table employee ;

hive split函数

select split(‘hello world‘ , ‘ ‘) from employee ;

使用hive实现wordcount计算

1.创建表
    CREATE TABLE doc
    (
    line string
    )
    ROW FORMAT DELIMITED
    FIELDS TERMINATED BY ‘,‘
    lines terminated by ‘
‘;
2.加载数据
    load data local inpath ‘/home/centos/1.txt‘ into table doc ;
3.统计
    select 
    t.word , 
    count(t.word) cnt
    from
    ( 
    select explode(split(line , ‘ ‘)) word from doc
    ) 
    t 
    group by 
    t.word

    order by
    cnt desc 
    limit 3 ;
4.

内部表

managed表.
删除时,表结构和数据同时删除。

外部表

删除时,只删除表结构,不删除数据。

?

执行hive生成jar
--------------------
hadoop jar job.jar org.apache.hadoop.hive.ql.exec.mr.ExecDriver

CTAS

create table t2 as select * from temployee ;        //带数据

create table t3 like t1 ;

使用hiveserver2服务器连接hive

1.hiveserver &
2.beeline连接
    $>beeline
    $beeline>!connect jdbc:hive2://localhost:10000/default ;

3.beeline命令行
    $beeline>select * from t1 ;

格式

[列存储]
rcfile
orc
parquet                 //

select * from t ;

*       //全字段扫描,
select id from t ;      //投影查询

kafka

零拷贝 + 磁盘线性读写。

hive创建列存储格式文件

磁盘线性读写 + 投影查询
create table t_orc (id int , name string , age int) stored as orc ;
create table t_orc (id int , name string , age int) stored as rcfile ;
create table t_orc (id int , name string , age int) stored as parquet ;

hive分区

分区就是子目录。

1.创分区表
CREATE external TABLE t_ext_par
(
id int ,
name string
)
PARTITIONED BY (prov string, city string)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ‘,‘
lines terminated by ‘ ‘;

//显示分区记录
$>SHOW PARTITIONS t_ext_par;


//添加分区
alter table t_ext_par add partition(prov=‘henan‘ , city=‘kaifeng‘) partition(prov=‘henan‘ , city=‘zhengzhou‘); 

//删除分区
alter table t_ext_par DROP IF EXISTS PARTITION (prov=‘henan‘ , city=‘kaifeng‘);

//加载数据到指定分区
load data local inpath ‘/home/centos/1.txt‘ into table t_ext_par partition(prov=‘henan‘ , city=‘kaifeng‘) ;

//查询数据插入到指定的分区上
insert into t_ext_par partition(prov , city) select 1 , ‘tom‘ , ‘hebei‘ , ‘shijiazhuang‘;
insert into t_ext_par partition(prov=‘hebei‘ , city) select 1 , ‘tom‘ , ‘cc‘;       //指定了一个静态分区
insert into t_ext_par partition(prov , city) select 1 , ‘tom‘ ,‘Liaoning‘ , ‘DL‘;   //指定了两个都是动态分区

[动态分区]
在DML/DDL是否允许动态创建分区。默认true.
set hive.exec.dynamic.partition=true            //启用动态分区
set hive.exec.dynamic.partition=false           //禁用动态分区
set hive.exec.dynamic.partition                 //查看动态分区

[分区模式-严格和非严格]
set hive.exec.dynamic.partition.mode=strict     //严格模式,

桶表

1.创建语句
    CREATE TABLE t_buck
    (
    id int ,
    name string
    )
    CLUSTERED BY (id) INTO 3 BUCKETS
    ROW FORMAT DELIMITED
    FIELDS TERMINATED BY ‘,‘
    lines terminated by  ‘
‘;

2.桶的个数设置
    让每个桶的数据量接近于block(128M)的2倍。
    预估数据量时500M,

3.注意事项
    load方法对bucket无效.

设置hive中reducer个数

$>set mapreduce.job.reduces=3 ;
$>select * from order by id desc ;                  //始终使用1个reduce实现全排序
$>select id , count(*) from t group by id    ;      //受reduces个数影响

view

//创建
create view v_emp as select name , arr[0] , stru.sex , map1[‘DB‘],map2[‘Product‘][0] from employee ; 
//修改
alter view v_emp as select * from t1 ;
//删除
drop view if exists  v_emp ;

查询

select distinct * from t1 ;     //可以指定不同个数的reduce.

抓取转换none | minimal | more

SET hive.fetch.task.conversion=more;
select * from t ;           //不是mr
SET hive.fetch.task.conversion=none;
select * from t ;           //转换成mr操作

准备数据

CREATE TABLE custs
(
id int ,
name string,
age int
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ‘,‘
lines terminated by ‘ ‘ stored as textfile ;

CREATE TABLE orders
(
id int ,
orderno string,
price float ,
cid int
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ‘,‘
lines terminated by ‘ ‘ stored as textfile ;

连接

//交叉连接
select c.* , o.* from customers c cross join orders o  ;                    //笛卡尔积
select c.* , o.* from customers c cross join orders o on  o.cid = c.id;     //等值连接
select c.* , o.* from customers c cross join orders o on  o.cid <> c.id;    //非等值连接

//map端连接查询暗示
select /*+mapjoin(custs)*/ c.*,o.* FROM custs c CROSS JOIN orders o WHERE c.id <> o.cid;

导出表

不但能够导出表结构,数据也能导出.
export table mydb.custs to ‘/user/centos/custs.dat‘

mr模式

set hive.mapred.mode = strict
select * from t_ext_par ;               //必须在分区列上进行过滤才可以。

排序

1.order by
    全排序,通过一个reduce实现。(数据倾斜)

2.sort by(部分排序)
    在每个reduce内排序法则。
    create table tmp2 as select * from orders sort by id desc ;

3.distribute by
    按照指定字段进行分发,就是分区过程。
    create table tmp3 as select * from orders distribute (by id % 2 ==0) sort by id desc ;

4.cluster by
    快捷方式 ,如果disbtribute by 和sort by使用的是相同字段,则可以使用cluster by直接完成
    select * from orders distribute id sort by id ;
    select * from orders cluster by id ;

函数

//
show functions ;                //查看所有函数
desc function split;            //查看特定函数
desc function extended split;   //扩展信息
select case when age < 13 then ‘0‘ when age > 13 then ‘2‘ else ‘1‘ from custs ;

//集合函数
select array(1,2,3) ;
select struct(1,2,3).`col1` ;   //结构体函数
select map(‘id‘ , 1 , ‘price‘ , 100.5) ;
select map(‘id‘ , 1 , ‘price‘ , 100.5)[‘id‘] ;

虚列

select id ,INPUT_FILE_NAME from t1 ;    //文件名

事务表

hive支持有条件的事务。
1.启动相关属性设置

SET hive.support.concurrency = true;
SET hive.enforce.bucketing = true;
SET hive.exec.dynamic.partition.mode = nonstrict;
SET hive.txn.manager = org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
SET hive.compactor.initiator.on = true;
SET hive.compactor.worker.threads = 1;
2.创建orc 桶表,并且携带事务属性
create table t_tx(id int ,name string , age int)
clustered by (id) into 2 buckets
stored as orc
TBLPROPERTIES(‘transactional‘=‘true‘);

准备数据

1.创建emp
    CREATE TABLE emp
    (
    id int ,
    name string ,
    age int,
    dep string,
    salary int
    )
    ROW FORMAT DELIMITED
    FIELDS TERMINATED BY ‘,‘
    lines terminated by  ‘
‘
    stored as textfile;
2.上传数据
    1,tom,22,001,3000
    2,tomas,22,001,2500
    3,tomasLee,25,002,1200
    4,tomson,24,002,3400
    5,peter,24,001,3800
    6,john,25,001,4200
    7,looser,24,001,5500
    8,alex,25,003,6000
    9,jason,24,003,6000
    10,japser,22,003,3000

高级聚合函数

select dep , avg(salary) from emp group by dep ;
select age , avg(salary) from emp group by age ;
//联合查询结果,需要3个job
select dep , avg(salary) from emp group by dep union select age , avg(salary) from emp group by age ;

//group by + grouping sets()
select avg(salary) from emp group by age,dep grouping sets((age) , (dep)) ;
select avg(salary) from emp group by age,dep grouping sets((age) , (dep) , (age,dep)) ;
select avg(salary) from emp group by age,dep grouping sets((age) , (dep) , (age,dep) , ()) ;

//rollup
select age,dep ,avg(salary) from emp group by age,dep with rollup ;
select age,dep ,avg(salary) from emp group by age,dep grouping sets((age,dep) , (age) , ()) ;

//cube
select age,dep ,avg(salary) from emp group by age,dep with cube ;
GROUP BY a,b,c GROUPING SETS ((a,b,c),(a,b),(b,c),(a,c),(a),(b),(c),())
select substr(name,1,1) ,age , dep , avg(salary) from emp  group by substr(name,1,1) ,age , dep with cube ;

分析函数

Function (arg1,..., argn) OVER ([PARTITION BY <...>] [ORDER BY <....>] [<window_clause>])
//order by全部数据集上的排序
select id,name,age,salary , avg(salary) over (partition by dep) from emp order by dep;

//
select id,name,age,salary , first_value(salary) over (order by id desc) from emp order by id asc;
select id,name,age,salary , last_value(salary) over (order by id desc) from emp order by id asc;
select 
    id,
    name,
    LAST_VALUE(salary) OVER (PARTITION BY dep ORDER BY salary row BETWEEN UNBOUNDED PRECEDING AND current row) as t2 
from emp;

[开窗子句]
    默认时行开窗,前导无界到当前行。
    row BETWEEN UNBOUNDED PRECEDING AND current row)            //行开窗
    row BETWEEN UNBOUNDED PRECEDING AND unbounded folloing      //行开窗
    range BETWEEN 1 PRECEDING AND 2 following)                  //范围开窗

select id,name,dep,salary,MAX(salary) OVER (PARTITION BY dep ORDER BY salary range BETWEEN 1 PRECEDING AND 1 following) as t2 from emp;

[具体分析函数]
//排名,有缝
select id,name,dep,salary,rank() OVER (PARTITION BY dep ORDER BY salary desc) as t2 from emp;
//密度排名,无缝
select id,name,dep,salary,dense_rank() OVER (PARTITION BY dep ORDER BY salary desc) as t2 from emp;
select id,name,dep,salary,percent_rank() OVER (PARTITION BY dep ORDER BY salary desc) as t2 from emp;
select id,name,dep,salary,ntile(4) OVER (PARTITION BY dep ORDER BY salary desc) as t2 from emp;
//前查询
select id,name,dep,salary,lag(salary,2,‘-1000‘) OVER (PARTITION BY dep ORDER BY salary desc) as t2 from emp;
//后查
select id,name,dep,salary,lead(salary,2,‘-1000‘) OVER (PARTITION BY dep ORDER BY salary desc) as t2 from emp;

[索引]
CREATE INDEX idx_emp_name ON TABLE emp(name) AS ‘COMPACT‘ WITH DEFERRED REBUILD;
ALTER INDEX idx_emp_name ON employee REBUILD;

[数据文件]
1.文件格式,hive支持TEXTFILE, SEQUENCEFILE, RCFILE, ORC, and PARQUET.
修改文件格式:
//创建表
CREATE TABLE… STORED AS

[压缩]
1.设置mr中间的压缩处理
//设置中间结果是否可压缩,多次job的中间结果是否压缩处理。
SET hive.exec.compress.intermediate=true
//设置结果压缩编解码器
SET hive.intermediate.compression.codec=org.apache.hadoop.io.compress.SnappyCodec
//是否控制最终job的输出压缩
SET hive.exec.compress.output=true
//设置输出压缩编解码器
SET mapred.output.compression.codec=org.apache.hadoop.io.compress.SnappyCodec
SET mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec
Compression_Codec Extension Splittable
Deflate org.apache.hadoop.io.compress.DefaultCodec .deflate N
GZip org.apache.hadoop.io.compress.GzipCodec .gz N
Bzip2 org.apache.hadoop.io.compress.BZip2Codec .gz Y
LZO com.hadoop.compression.lzo.LzopCodec .lzo N(preprocess,lzoindex)
LZ4 org.apache.hadoop.io.compress.Lz4Codec .lz4 N
Snappy org.apache.hadoop.io.compress.SnappyCodec .snappy N

//输出压缩
mapreduce.output.fileoutputformat.compress=false
hive.exec.compress.output=true

合并小文件

-- 每个Map最大输入大小,决定合并后的文件数
set mapred.max.split.size=256000000;
-- 一个节点上split的至少的大小 ,决定了多个data node上的文件是否需要合并
set mapred.min.split.size.per.node=100000000;
-- 一个交换机下split的至少的大小,决定了多个交换机上的文件是否需要合并
set mapred.min.split.size.per.rack=100000000;
-- 执行Map前进行小文件合并
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; 

?

采样

//
SELECT * FROM emp TABLESAMPLE(BUCKET 3 OUT OF 32 ON rand()) s;
SELECT * FROM emp TABLESAMPLE(BUCKET 3 OUT OF 16 ON id) ;
SELECT * FROM emp TABLESAMPLE(50 PERCENT) s ;
SELECT * FROM emp TABLESAMPLE(100M) s; 
SELECT * FROM emp TABLESAMPLE(10 ROWS);

查询语句执行构成

select ... from ... where ... group by ... having ... order by ...  limit ... ;
执行顺序:
1.from
2.where
3.select
4.group by
5.having 
6.order by
7.limit ...

JOB : M+/R?M*
hive
----------------------
select ... from ... where ... group by ... having ... order by ... limit ... ;
[生成mr]
group by //分区(map) + 分组(reduce)
group by having //分区(map) + 分组(reduce) + map(fiilter)
order by //1个reduce + sort
disintinct
join

    //分区           //排序对比器
    distributed by + sort by

explain(解释执行计划)

explain select dep,max(salary) mx from emp group by dep having mx > 4000 ;

OK
STAGE DEPENDENCIES:
Stage-1 is a root stage
Stage-0 depends on stages: Stage-1

STAGE PLANS:
Stage: Stage-1
Map Reduce
Map Operator Tree:
TableScan
alias: emp
Statistics: Num rows: 1 Data size: 203 Basic stats: COMPLETE Column stats: NONE
Select Operator
expressions: dep (type: string), salary (type: int)
outputColumnNames: dep, salary
Statistics: Num rows: 1 Data size: 203 Basic stats: COMPLETE Column stats: NONE
Group By Operator
aggregations: max(salary)
keys: dep (type: string)
mode: hash
outputColumnNames: _col0, _col1
Statistics: Num rows: 1 Data size: 203 Basic stats: COMPLETE Column stats: NONE
Reduce Output Operator
key expressions: _col0 (type: string)
sort order: +
Map-reduce partition columns: _col0 (type: string)
Statistics: Num rows: 1 Data size: 203 Basic stats: COMPLETE Column stats: NONE
value expressions: _col1 (type: int)
Reduce Operator Tree:
Group By Operator
aggregations: max(VALUE._col0)
keys: KEY._col0 (type: string)
mode: mergepartial
outputColumnNames: _col0, _col1
Statistics: Num rows: 1 Data size: 203 Basic stats: COMPLETE Column stats: NONE
Filter Operator
predicate: (_col1 > 4000) (type: boolean)
Statistics: Num rows: 1 Data size: 203 Basic stats: COMPLETE Column stats: NONE
File Output Operator
compressed: false
Statistics: Num rows: 1 Data size: 203 Basic stats: COMPLETE Column stats: NONE
table:
input format: org.apache.hadoop.mapred.SequenceFileInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe

Stage: Stage-0
Fetch Operator
limit: -1
Processor Tree:
ListSink


1.气温数据上传到temps表,使用distribute by + sort by 实现全排序。见hive基础1 最后一节























































































































以上是关于hive基础2的主要内容,如果未能解决你的问题,请参考以下文章

Hive基础-- Hive Catalog

Hive基础02安装Hive

hive 基础知识

hive基础及系统架构

JSP基础

Hive基础使用