Hive优化

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Hive优化相关的知识,希望对你有一定的参考价值。

参考技术A 注意:以下SQL不会转为Mapreduce来执行,Explain用于显示执行计划,可以来验证sql是否发生mapreduce

        select仅查询本表字段;

        where仅对本表字段做条件过滤;

比如下面的语句是会发生mapreduce的;(下面的reduce没有截图)

        (1)集群模式:hive默认采用的是集群的方式;

        (2)本地模式:首先开启本地模式,测试的时候就可以以本地模式来节省集群资源;

                set hive.exec.mode.local.auto=true;

        注意:hive.exec.mode.local.auto.inputbytes.max默认值为128M表示加载文件的最大值,若大于该配置仍会以集群方式来运行;

通过设置以下参数开启并行模式:默认是不开启并行计算,这是job是线性执行的;

set hive.exec.parallel=true;多个job并行执行提高效率;

注意:hive.exec.parallel.thread.number(一次SQL计算中允许并行执行的job个数的最大值);

通过设置以下参数开启严格模式:

set hive.mapred.mode=strict;(默认为:nonstrict非严格模式)

开启严格模式后,查询会有限制:

    (1)对于分区表,必须添加where对于分区字段的条件过滤,因为hive中的数据量一般都很大,避免全表扫描不添加会执行失败,非分区表正常查询;

    (2)order by语句必须包含limit输出限制;还是为了避免全表扫描

    (3)限制执行笛卡尔积的查询。

(1)Order By - 对于查询结果做全排序,只允许有一个reduce处理(当数据量较大时,应慎用。严格模式下,必须结合limit来使用);

(2)Sort  By - 对于单个reduce的数据进行排序

(3)Distribute By - 分区排序,经常和Sort By结合使用

(4)Cluster By - 相当于 Sort By +Distribute By(Cluster By不能通过asc、desc的方式指定排序规则;可通过 distribute by column sort by column asc|desc 的方式)

Join计算时,将小表(驱动表)放在join的左边

Map Join:在Map端完成Join,join操作对应mapreduce是reduce阶段,因为shuffle,跟reduce阶段比较浪费时间,所以才有了map  join;

两种实现方式:

(1)SQL方式,在SQL语句中添加MapJoin标记(mapjoin hint)

(2)开启自动mapjoin : 通过修改以下配置启用自动的map join:

set hive.auto.convert.join = true;(该参数为true时,Hive自动对左边的表统计量,如果是小表就加入内存,即对小表使用Map

join)

相关配置参数:

hive.mapjoin.smalltable.filesize; (大表小表判断的阈值,如果表的大小小于该值则会被加载到内存中运行)

hive.ignore.mapjoin.hint;(默认值:true;是否忽略mapjoin hint 即mapjoin标记)

hive.auto.convert.join.noconditionaltask;(默认值:true;将普通的join转化为普通的mapjoin时,是否将多个mapjoin转化为一个mapjoin)

hive.auto.convert.join.noconditionaltask.size;(将多个mapjoin转化为一个mapjoin时,其表的最大值)    

通过设置以下参数开启在Map端的聚合:

set hive.map.aggr=true; 开启后,map预聚合,相当于map端reduce减轻reduce 端压力;

相关配置参数:

hive.groupby.mapaggr.checkinterval:map端group by执行聚合时处理的多少行数据(默认:100000)

hive.map.aggr.hash.min.reduction:进行聚合的最小比例(预先对100000条数据做聚合,若聚合之后的数据量/100000的值大于该配置0.5,则不会聚合)

hive.map.aggr.hash.percentmemory:map端聚合使用的内存的最大值

hive.map.aggr.hash.force.flush.memory.threshold:map端做聚合操作是hash表的最大可用内容,大于该值则会触发flush

hive.groupby.skewindata: 是否对GroupBy产生的数据倾斜做优化,默认为false,当设置为true时,会进行两次mr,第一次把数据map端随机分配分区,达到均衡数据的目的,第二次进行正常的分区算法执行mr;

文件数目小,容易在文件存储端造成压力,给hdfs造成压力,影响效率

设置合并属性

        是否合并map输出文件:hive.merge.mapfiles=true

        是否合并reduce输出文件:hive.merge.mapredfiles=true;

        合并文件的大小:hive.merge.size.per.task=256*1000*1000

去重统计

        数据量小的时候无所谓,数据量大的情况下,由于COUNT DISTINCT操作需要用一个Reduce

Task来完成,这一个Reduce需要处理的数据量太大,就会导致整个Job很难完成,一般COUNT DISTINCT使用先GROUP

BY再COUNT的方式替换;

由于maptask的数量一般跟切片数量有关,所有我们主要对reduce端设置数量;

Map数量相关的参数

        mapred.max.split.size: 一个split的最大值,即每个map处理文件的最大值

        mapred.min.split.size.per.node:一个节点上split的最小值

        mapred.min.split.size.per.rack:一个机架上split的最小值

Reduce数量相关的参数

        mapred.reduce.tasks:强制指定reduce任务的数量

        hive.exec.reducers.bytes.per.reducer:每个reduce任务处理的数据量

        hive.exec.reducers.max:每个任务最大的reduce数

适用场景:

        (1)小文件个数过多

        (2)task个数过多

通过 set mapred.job.reuse.jvm.num.tasks=n; 来设置(n为task插槽个数);个人理解优点类似于各种连接池的作用;

缺点:设置开启之后,task插槽会一直占用资源,不论是否有task运行,直到所有的task即整个job全部执行完成时,才会释放所有的task插槽资源!

Hive与优化方法

Hive与优化方法



Java、大数据开发学习要点(持续更新中…)


一、Hive概念

  Hive是基于Hadoop的一个数据仓库工具,可以将结构化的数据文件映射为一张表,并提供类SQL查询功能。其本质是,将HQL转化成MapReduce程序。底层数据存储在HDFS上,由于延迟较大所以一般适用于离线大批量的数据计算和分析。
hive流程

二、Hive架构

Hive架构

  1. 用户接口Client
    CLI(hive shell)、JDBC/ODBC(java访问hive)、WEBUI(浏览器访问hive)
  2. 元数据Metastore
    元数据包括:表名、表所属的数据库(默认是default)、表的拥有者、列/分区字段、表的类型(是否是外部表)、表的数据所在目录等;默认存储在自带的derby数据库中,推荐使用MySQL存储Metastore
  3. Hadoop
    使用HDFS进行存储,使用MapReduce进行计算。
  4. 驱动器Driver
    • 解析器(SQL Parser):将SQL字符串转换成抽象语法树AST,这一步一般都用第三方工具库完成,比如antlr;对AST进行语法分析,比如表是否存在、字段是否存在、SQL语义是否有误。
    • 编译器(Physical Plan):将AST编译生成逻辑执行计划。
    • 优化器(Query Optimizer):对逻辑执行计划进行优化。
    • 执行器(Execution):把逻辑执行计划转换成可以运行的物理计划。对于Hive来说,就是MR/Spark。

三、Hive与数据库的比较

  Hive 和数据库除了拥有类似的查询语言,再无类似之处。其实记住Hive是数仓工具就可以将其与数据库区别开来。

  • Hive与传统数据库的区别
  1. 数据更新:由于Hive是针对数据仓库应用设计的,而数据仓库的内容是读多写少的。因此,Hive中不建议对数据的改写,所有的数据都是在加载的时候确定好的。而数据库中的数据通常是需要经常进行更新。
  2. 数据查询:传统数据库数据由于索引的存在,在数据量较小的情况下查询较快,并且自己提供执行引擎。而Hive数据查询是整表或者分区表的扫描,只有在大数据情况下分布式运算才有优势,依靠MR或Spark来执行。
  3. 数据存储:Hive数据存储没有固定的格式,用户可以自己指定存储的格式(Parquet、SequenceFile等),并自己指定压缩格式(Snappy、ORC)。数据库的存储引擎定义了自己的存储格式。
  • Hive与HBase的区别
    其实没有什么可以比较的。HBase是一个分布式列簇式存储KV数据库,Hive是一个数仓工具。Hive擅长于大数据离线计算和分析,而HBase则是提供快速数据写入和查询的数据库应用在实时查询的场景。

四、Hive中一些重要的概念

4.1 内部表和外部表

  内部表生命周期是受Hive控制的,删除内部表则数据和元数据都会被删除将数据导入外部表,数据并不会移动,即使删除外部表,只是删除外部表元数据原来的数据还是会存在

使用的例子,HDFS定期收到用户行为日志文件,在日志文件上建立外部表,中间表和结果表则以内部表的形式存储。

4.2 分区表

  分区表实际上就是对应一个HDFS文件系统上的独立的文件夹,该文件夹下是该分区所有的数据文件。Hive根据某列或者某些列的值(这些列在表中并不真实存在)将数据分区,放在表文件夹下不同子文件夹中存储。Hive中的分区就是分目录,把一个大的数据集根据业务需要分割成小的数据集。

  • 静态分区和动态分区
  1. 静态分区:在建表中指定分区条件,数据导入或者插入时需要指定分区。
  2. 动态分区:按照某个或某些字段的值不同自动地进行分区,底层实际是利用MapReduce的mutipleOutputs(根据条件判断,将结果写入不同目录不同文件)。
  3. 静态分区必须在动态分区前。
  • 分区的注意事项
    Hive分区过多,导致每个分区的文件小,会导致HDFS小文件过多的问题
    (1)小文件数量过多造成NameNode负担过大。
    (2)Hive运行Mapreduce时,每个block对应一个切片,而小文件则会直接对应一个map任务,使得map任务过多使得运行效率低下(Yarn频繁申请销毁容器)。

4.3 Hive排序关键字

  1. ORDER BY全局排序,强制只有一个Reducer,但是当数据规模较大时,会导致消耗较长的计算时间。

  2. SORT BY局部排序,每个task内部排序,使得reduce结果都是局部有序的

    由此,可以想到全局有序的一个方法,先SORT BY将分组内的数据有序数据,再用ORDER BY使得数据全局有序。实际就是两阶段MapReduce,第一次输出局部有序,第二次输出后全局有序。

  3. DISTRIBUTE BY:类似MR中的Partition分区器,根据某一列进行分区。使用DISTRIBUTE BY+SORT BY来实现分桶排序查询,如:

    hive (default)> set mapreduce.job.reduces=3;
    --根据col1进行分区,再根据col2进行分区内的降序排序
    hive (default)> select col1,col2 
    				from emp distribute by col1 sort by col2 desc;
    
  4. CLUSTER BY:当DISTRIBUTE BY和SORT BY字段相同时,可以使用CLUSTER BY代替,但只能升序排列。

4.4 Hive分桶

  对Hive表分桶可以将表中数据按分桶键的哈希值散列到多个文件中,这些小文件称为桶。

表分区是用不同的子文件夹管理不同的数据;而表分桶用不同的文件管理不同的数据。

  • 分桶的好处:
  1. join两个相同分桶划分的表时可以使用map-side join,优化join查询
  2. 根据某些列进行分桶可以使Hive查询时利用分桶的结构加快查询效率
  3. 对于非常大的数据集,有时用户需要使用的是一个具有代表性的查询结果而不是全部结果。Hive可以通过对表进行抽样来满足这个需求。而分桶的结构恰好满足抽样所需的数据结构,使得抽样更加高效。

4.5 三种排序窗函数的区别

  • RANK() n个排序相同时排名会重复,但下一个排名会跳跃至n个名次开始。(跳跃)
  • DENSE_RANK() n个排序相同时排名会重复,但下一个排名继续上一个排名加1开始。(连续)
  • ROW_NUMBER() 会根据顺序依次编号。

五、Hive调优

5.1 部分场景下尽可能避免启用MR

由于MapReduce的启动任务调度通常在数据集小的情况下耗时比job本身时间要长。所以Hive在有些场景下可以尽量避免启动MR来执行任务。比如数据抓取(Fetch)在全表数据获取、字段查找、limit查找的情况下可以不走MapReduce;再比如数据集较小的情况下,开启本地模式单机处理所有任务也能比走集群计算得到更好的时间效率。

5.2 表的优化

  • 小表JOIN大表
  1. JOIN有个特点是其中一个表需要作为全量读取的表先加载至内存,所以小表写在JOIN左边(当然这点Hive的开发者已经对此进行了优化)
  2. 小表JOIN大表的情况下,尽量使用map-side join,将小表广播到大表所在的map任务中,以减少小表shuffler所带来的IO开销。
  • 大表JOIN大表
  1. 要注意的是大表的数据量基本都比较大,JOIN容易出现reducer的OOM,所以要注意JOIN前数据的过滤与某些空key数据产生的数据倾斜问题(随机赋值)
  • 替换COUNT DISTINCT去重统计
    COUNT DISTINCT通过一个Reducer来完成去重统计,在数据量巨大的场景下效率低下。将COUNT DISTINCT用两阶段进行替换:先GROUP BY再开启一个任务进行COUNT

  • 避免笛卡尔积
    表的无条件JOIN(没有指定ON条件,或条件无效),Hive只能用一个Reducer完成,效率极其低下。

  • 行过滤
    在表的JOIN关联中,将附表的过滤作为子查询写在ON条件之前,否则会导致先关联再过滤的问题产生。

5.3 数据倾斜优化

  1. map-side join来缓解数据倾斜问题
    如果不指定MapJoin或者不符合MapJoin的条件,那么Hive解析器会将Join操作转换成Common Join,即:在Reduce阶段完成join。容易发生数据倾斜。可以用Map-side Join把小表全部加载到内存在Map端进行join,避免Reducer处理。(参数设置set hive.auto.convert.join = true;默认是true)

  2. Group by开启Map端预聚合
    默认情况下,Map阶段同一Key数据分发给一个Reducer,当一个key数据过大时就倾斜了。并不是所有的聚合操作都需要在Reduce端完成,很多聚合操作都可以先在Map端进行部分聚合,最后在Reduce端得出最终结果。两个参数hive.map.aggr = true(默认) 和 hive.groupby.skewindata = true(非默认),分别是开启Map端预聚合和数据倾斜时进行负载均衡。

    当选项设定为 true,生成的查询计划会有两个MR Job。第一个MR Job中,Map的输出结果会随机分布到Reduce中,每个Reduce做部分聚合操作,并输出结果,这样处理的结果是相同的Group By Key有可能被分发到不同的Reduce中,从而达到负载均衡的目的;第二个MR Job再根据预处理的数据结果按照Group By的Key分布到Reducer中(这个过程可以保证相同的Key被分布到同一个Reducer中),最后完成最终的聚合操作。

  3. 合理设置Map任务数和Reduce任务数

  • 合理的Map任务数:

    1. 每个小文件对应一个Map任务是不明智的,导致Map任务数过多,且任务启动调度的时间远大于任务逻辑执行的时间。
    2. 每个Map的大小接近128M呢?则会使得单个Map任务的执行时间过长。

    所以,Map任务需要按照场景进行调整,小文件多的情况下减少Map任务并设置Hive的InputFormat为CombineHiveInputFormat;而文件较大的情况下,增加Map数量来分担单文件大数据量的计算压力

  • 合理的Reduce任务数:

    与Map任务类似,Reducer数量也要合理,太多增大调度资源和小文件的产生,过少单个Reduce任务执行时间过长。

5.3 其他优化

  1. 并行执行
    Hive会将一个查询转化成一个或者多个阶段。这样的阶段可以是MapReduce阶段、抽样阶段、合并阶段、limit阶段。或者Hive执行过程中可能需要的其他阶段。默认情况下,Hive一次只会执行一个阶段。不过,某个特定的job可能包含众多的阶段,而这些阶段可能并非完全互相依赖的,也就是说有些阶段是可以并行执行的,这样可能使得整个job的执行时间缩短。不过,如果有更多的阶段可以并行执行,那么job可能就越快完成。

  2. 严格模式

    1. 对于分区表,除非where语句中含有分区字段过滤条件来限制范围,否则不允许执行。换句话说,就是用户不允许扫描所有分区。进行这个限制的原因是,通常分区表都拥有非常大的数据集,而且数据增加迅速。没有进行分区限制的查询可能会消耗令人不可接受的巨大资源来处理这个表。
    2. 对于使用了order by语句的查询,要求必须使用limit语句。因为order by为了执行排序过程会将所有的结果数据分发到同一个Reducer中进行处理,强制要求用户增加这个LIMIT语句可以防止Reducer额外执行很长一段时间。
    3. 限制笛卡尔积的查询。对关系型数据库非常了解的用户可能期望在执行JOIN查询的时候不使用ON语句而是使用WHERE语句,这样关系数据库的执行优化器就可以高效地将WHERE语句转化成那个ON语句。不幸的是,Hive并不会执行这种优化,因此,如果表足够大,那么这个查询就会出现不可控的情况。
  3. JVM重用

    JVM重用是Hadoop调优参数的内容,其对Hive的性能具有非常大的影响,特别是对于很难避免小文件的场景或task特别多的场景,这类场景大多数任务执行时间都很短

  4. 合理压缩

    比如使用Parquet列式存储数据,这种格式按列存储数据,没列数据类型相同,天然对压缩友好,建议可以使用Parquet存储格式+Snappy压缩格式的组合

以上是关于Hive优化的主要内容,如果未能解决你的问题,请参考以下文章

Hive优化之Hive的配置参数优化

Hive与优化方法

Hive与优化方法

Hive与优化方法

Hive优化

Hive优化