Hive cluster by vs order by vs sort by

Posted

技术标签:

【中文标题】Hive cluster by vs order by vs sort by【英文标题】: 【发布时间】:2012-11-22 19:05:32 【问题描述】:

据我所知;

sort by 仅在 reducer 中排序

order by 在全球范围内订购东西,但将所有东西都塞到一个减速器中

cluster by 智能地通过 key hash 将内容分配到 reducer 中并进行排序

所以我的问题是集群是否保证全局顺序?分发方式将相同的键放入相同的减速器中,但是相邻的键呢?

我能找到的唯一文档是here,从这个例子看来,它似乎是在全球范围内订购它们。但从定义上看,我觉得它并不总是那样。

【问题讨论】:

【参考方案1】:

更简短的回答:是的,CLUSTER BY 保证全局排序,前提是您愿意自己加入多个输出文件。

更长的版本:

ORDER BY x:保证全局排序,但通过仅通过一个 reducer 推送所有数据来做到这一点。对于大型数据集,这基本上是不可接受的。您最终会得到一个已排序的文件作为输出。 SORT BY x:在 N 个 reducer 中的每一个上排序数据,但每个 reducer 可以接收重叠范围的数据。您最终会得到 N 个或更多具有重叠范围的排序文件。 DISTRIBUTE BY x:确保 N 个减速器中的每一个都获得 x 的非重叠范围,但不对每个减速器的输出进行排序。您最终会得到 N 个或更多范围不重叠的未排序文件。 CLUSTER BY x:确保 N 个减速器中的每一个都获得不重叠的范围,然后在减速器处按这些范围排序。这为您提供了全局排序,并且与(DISTRIBUTE BY xSORT BY x)相同。您最终会得到 N 个或更多具有非重叠范围的排序文件。

有意义吗?所以CLUSTER BY 基本上是ORDER BY 的可扩展版本。

【讨论】:

正如其他答案所述,根据cwiki.apache.org/confluence/display/Hive/LanguageManual+SortBy、CLUSTER BYDISTRIBUTE BY 不能给你不重叠的范围。 CLUSTER BY 无法保证全局排序。 我想知道...什么被认为是“大型数据集”?你能量化一下吗? 我的查询为 SORT BYCLUSTER BY 返回了相同的不想要的东西:减速器中的本地排序。我不得不求助于ORDER BY 并等待整个周末直到工作完成。 CLUSTER BY 使用集群列的哈希 mod 减速器数量来确保具有相同列值的行进入同一个减速器 - 就是这样,没有比这更强大的保证了!请参阅我的答案,其中包含示例和订单保留哈希等链接。 我的想法也和@yhuai一样。 lars-yencken,你能提供任何参考吗?【参考方案2】:

首先让我澄清一下:clustered by 仅将您的密钥分配到不同的存储桶中,clustered by ... sorted by 将存储桶排序。

通过一个简单的实验(见下文),您可以看到默认情况下您不会获得全局订单。原因是默认分区器使用哈希码拆分键,而不考虑实际的键顺序。

但是,您可以让您的数据完全有序。

动机是 Tom White 的“Hadoop:权威指南”(第 3 版,第 8 章,第 274 页,Total Sort),他在其中讨论了 TotalOrderPartitioner。

我将首先回答您的 TotalOrdering 问题,然后描述我所做的几个与排序相关的 Hive 实验。

请记住:我在这里描述的是“概念证明”,我能够使用 Claudera 的 CDH3 发行版处理一个示例。

最初我希望 org.apache.hadoop.mapred.lib.TotalOrderPartitioner 可以解决问题。不幸的是,它没有,因为它看起来像 Hive 的值分区,而不是键。所以我修补它(应该有子类,但我没有时间):

替换

public int getPartition(K key, V value, int numPartitions) 
    return partitions.findPartition(key);

public int getPartition(K key, V value, int numPartitions) 
    return partitions.findPartition(value);

现在您可以将 TotalOrderPartitioner 设置为(已修补)作为您的 Hive 分区器:

hive> set hive.mapred.partitioner=org.apache.hadoop.mapred.lib.TotalOrderPartitioner;

hive> set total.order.partitioner.natural.order=false

hive> set total.order.partitioner.path=/user/yevgen/out_data2

我也用过

hive> set hive.enforce.bucketing = true; 

hive> set mapred.reduce.tasks=4;

在我的测试中。

文件 out_data2 告诉 TotalOrderPartitioner 如何存储值。 您通过对数据进行采样来生成 out_data2。在我的测试中,我使用了 4 个桶和从 0 到 10 的密钥。我使用 ad-hoc 方法生成了 out_data2:

import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.hive.ql.io.HiveKey;
import org.apache.hadoop.fs.FileSystem;


public class TotalPartitioner extends Configured implements Tool
    public static void main(String[] args) throws Exception
            ToolRunner.run(new TotalPartitioner(), args);
    

    @Override
    public int run(String[] args) throws Exception 
        Path partFile = new Path("/home/yevgen/out_data2");
        FileSystem fs = FileSystem.getLocal(getConf());

        HiveKey key = new HiveKey();
        NullWritable value = NullWritable.get();

        SequenceFile.Writer writer = SequenceFile.createWriter(fs, getConf(), partFile, HiveKey.class, NullWritable.class);
        key.set( new byte[]1,3, 0, 2);//partition at 3; 1 came from Hive -- do not know why
        writer.append(key, value);
        key.set( new byte[]1, 6, 0, 2);//partition at 6
        writer.append(key, value);
        key.set( new byte[]1, 9, 0, 2);//partition at 9
        writer.append(key, value);
        writer.close();
        return 0;
    


然后我将生成的 out_data2 复制到 HDFS(到 /user/yevgen/out_data2)

通过这些设置,我对数据进行了分桶/排序(请参阅我的实验列表中的最后一项)。

这是我的实验。

创建样本数据

bash> echo -e "1\n3\n2\n4\n5\n7\n6\n8\n9\n0" > data.txt

创建基本测试表:

hive> 创建表 test(x int); hive> 将数据本地 inpath 'data.txt' 加载到表测试中;

这个表基本上包含从 0 到 9 的值,没有顺序。

演示表复制是如何工作的(真正的 mapred.reduce.tasks 参数设置了要使用的最大减少任务数)

hive> 创建表 test2(x int);

hive> 设置 mapred.reduce.tasks=4;

hive> 插入覆盖表 test2 从测试 a 中选择 a.x 加入测试b 在 a.x=b.x 上; -- stupied join 强制非平凡的 map-reduce

bash> hadoop fs -cat /user/hive/warehouse/test2/000001_0

1

5

9

演示分桶。您可以看到键是随机分配的,没有任何排序顺序:

hive> 创建表 test3(x int) 由 (x) 聚类成 4 个桶;

hive> 设置 hive.enforce.bucketing = true;

hive> 插入覆盖表 test3 从测试中选择 *;

bash> hadoop fs -cat /user/hive/warehouse/test3/000000_0

4

8

0

带有排序的分桶。结果是部分排序的,不是完全排序的

hive> 创建表 test4(x int) 按 (x) 聚类 按 (x desc) 排序 分成 4 个桶;

hive> 插入覆盖表 test4 从测试中选择 *;

bash> hadoop fs -cat /user/hive/warehouse/test4/000001_0

1

5

9

您可以看到值是按升序排序的。看起来像 CDH3 中的 Hive 错误?

在没有集群的情况下通过语句进行部分排序:

hive> 创建表 test5 为 选择 x 从测试 按 x 分配 按 x desc 排序;

bash> hadoop fs -cat /user/hive/warehouse/test5/000001_0

9

5

1

使用我打过补丁的 TotalOrderParitioner:

hive> 设置 hive.mapred.partitioner=org.apache.hadoop.mapred.lib.TotalOrderPartitioner;

hive> 设置 total.order.partitioner.natural.order=false

hive> 设置 total.order.partitioner.path=/user/training/out_data2

hive> 创建表 test6(x int) 按 (x) 聚类,按 (x) 排序,分成 4 个桶;

hive> 插入覆盖表 test6 从测试中选择 *;

bash> hadoop fs -cat /user/hive/warehouse/test6/000000_0

1

2

0

bash> hadoop fs -cat /user/hive/warehouse/test6/000001_0

3

4

5

bash> hadoop fs -cat /user/hive/warehouse/test6/000002_0

7

6

8

bash> hadoop fs -cat /user/hive/warehouse/test6/000003_0

9

【讨论】:

【参考方案3】:

CLUSTER BY 不会产生全局排序。

接受的答案(由 Lars Yencken 撰写)通过声明减速器将接收非重叠范围来误导。正如 Anton Zaviriukhin 正确指向 BucketedTables 文档一样,CLUSTER BY 基本上是 DISTRIBUTE BY(与 bucketing 相同)加上每个 bucket/reducer 中的 SORT BY。并且 DISTRIBUTE BY 只是简单地将哈希和 mods 放入存储桶中,而哈希函数 may 保留顺序(i 的哈希 > j 的哈希,如果 i > j),哈希值的 mod 不会。

这是显示重叠范围的更好示例

http://myitlearnings.com/bucketing-in-hive/

【讨论】:

我同意你的观点,即使是 hive 文档也没有提到'distribute by'进行全局排序。【参考方案4】:

据我了解,简短的回答是否定的。 你会得到重叠的范围。

来自SortBy documentation: “Cluster By 是 Distribute By 和 Sort By 的捷径。” “具有相同 Distribute By 列的所有行都将进入同一个 reducer。” 但是没有保证不重叠范围分布的信息。

此外,来自DDL BucketedTables documentation: “Hive 如何在存储桶中分配行?通常,存储桶编号由表达式 hash_function(bucketing_column) mod num_buckets 确定。” 我认为 Select 语句中的 Cluster by 使用相同的原则在 reducer 之间分配行,因为它的主要用途是用数据填充分桶表。

我创建了一个包含 1 个整数列“a”的表,并在其中插入了从 0 到 9 的数字。

然后我将减速器的数量设置为 2 set mapred.reduce.tasks = 2;.

select 来自该表的数据与Cluster by 子句 select * from my_tab cluster by a;

并收到了我预期的结果:

0
2
4
6
8
1
3
5
7
9

所以,第一个 reducer(数字 0)得到了偶数(因为它们的模式 2 给出了 0)

第二个减速器(编号 1)得到奇数(因为它们的模式 2 给出 1)

这就是“分发者”的工作原理。

然后“排序依据”对每个reducer内部的结果进行排序。

【讨论】:

【参考方案5】:

用例:当有一个大数据集时,应该像 sort by 那样进行 sort by ,所有 set reducer 在合并之前在内部对数据进行排序,从而提高性能。在按顺序排列时,较大数据集的性能会降低,因为所有数据都通过单个减速器传递,这会增加负载并因此需要更长的时间来执行查询。 请参见下面关于 11 节点集群的示例。

这个是Order By example output

这是Sort By示例输出

这个是Cluster By example

我观察到,sort by、cluster by和distribute by的数字是SAME,但内部机制不同。在 DISTRIBUTE BY 中:相同的列行将转到一个 reducer,例如。 DISTRIBUTE BY(City) - 一列中的班加罗尔数据,一个减速器中的德里数据:

【讨论】:

【参考方案6】:

Cluster by 是每个 reducer 排序的,不是全局的。在许多书中,它也被错误地或混淆地提及。它有特殊用途,例如您将每个部门分配给特定的减速器,然后按每个部门中的员工姓名排序,并且不关心要使用的集群的部门顺序,并且由于工作负载分布在减速器之间,它的性能更高.

【讨论】:

如果你在distribute by后使用collect_set或collect_list,会保持顺序吗?【参考方案7】:

SortBy:N个或更多具有重叠范围的排序文件。

OrderBy:单输出,即完全排序。

Distribute By:Distribute By 保护 N 个 reducer 中的每一个,获得列的非重叠范围,但不对每个 reducer 的输出进行排序。

更多信息http://commandstech.com/hive-sortby-vs-orderby-vs-distributeby-vs-clusterby/

ClusterBy:参考上面同一个例子,如果我们使用Cluster By x,两个reducer会进一步对x上的行进行排序:

【讨论】:

【参考方案8】:

如果我理解正确的话

1.sort by - 只对reducer内的数据进行排序

2.order by - 通过将整个数据集推送到单个 reducer 来全局排序。如果我们确实有很多数据(倾斜),这个过程会花费很多时间。

    cluster by - 通过 key hash 智能地将内容分配到 reducer 并进行排序,但不授予全局排序。一个 key(k1) 可以放入两个 reducer。第一个 reducer 获得 10K K1 数据,第二个可能获得 1K k1 数据。

【讨论】:

您的所有观点都已详细包含在已接受的答案中

以上是关于Hive cluster by vs order by vs sort by的主要内容,如果未能解决你的问题,请参考以下文章

Hive cluster by vs order by vs sort by

Hive之cluster by , distribute by,order by,sort by

Hive中sort by,order by,cluster by,distribute by总结

Hive中order by,sort by,distribute by,cluster by的区别

hive中order by,sort by, distribute by, cluster by的用法

hive中order by,sort by, distribute by, cluster by作用以及用法