在火花中联合后再次排序的蜂巢表排序

Posted

技术标签:

【中文标题】在火花中联合后再次排序的蜂巢表排序【英文标题】:Sorted hive tables sorting again after union in spark 【发布时间】:2019-04-05 10:18:01 【问题描述】:

我使用 spark 2.3.1 使用以下参数启动 spark-shell:

--master='local[*]' --executor-memory=6400M --driver-memory=60G --conf spark.sql.autoBroadcastJoinThreshold=209715200 --conf spark.sql.shuffle.partitions=1000 --conf spark.local.dir=/data/spark-temp --conf spark.driver.extraJavaOptions='-Dderby.system.home=/data/spark-catalog/'

然后用 sort 和 buckets 创建两个 hive 表

第一个表名 - table1

第二个表名 - table2

val storagePath = "path_to_orc"
val storage = spark.read.orc(storagePath)
val tableName = "table1"

sql(s"DROP TABLE IF EXISTS $tableName")
storage.select($"group", $"id").write.bucketBy(bucketsCount, "id").sortBy("id").saveAsTable(tableName)

(table2的代码相同)

我希望当我将这些表中的任何一个与另一个 df 连接起来时,查询计划中没有不必要的 Exchange 步骤

然后我关闭广播以使用 SortMergeJoin

spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 1)

我拿了一些 df

val sample = spark.read.option("header", "true).option("delimiter", "\t").csv("path_to_tsv")

val m = spark.table("table1")
sample.select($"col" as "id").join(m, Seq("id")).explain()

== Physical Plan ==
*(4) Project [id#24, group#0]
+- *(4) SortMergeJoin [id#24], [id#1], Inner
   :- *(2) Sort [id#24 ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(id#24, 1000)
   :     +- *(1) Project [col#21 AS id#24]
   :        +- *(1) Filter isnotnull(col#21)
   :           +- *(1) FileScan csv [col#21] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/samples/sample-20K], PartitionFilters: [], PushedFilters: [IsNotNull(col)], ReadSchema: struct<col:string>
   +- *(3) Project [group#0, id#1]
      +- *(3) Filter isnotnull(id#1)
         +- *(3) FileScan parquet default.table1[group#0,id#1] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/data/table1], PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<group:string,id:string>

但是当我在加入之前对两个表使用联合时

val m2 = spark.table("table2")
val mUnion = m union m2
sample.select($"col" as "id").join(mUnion, Seq("id")).explain()

== Physical Plan ==
*(6) Project [id#33, group#0]
+- *(6) SortMergeJoin [id#33], [id#1], Inner
   :- *(2) Sort [id#33 ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(id#33, 1000)
   :     +- *(1) Project [col#21 AS id#33]
   :        +- *(1) Filter isnotnull(col#21)
   :           +- *(1) FileScan csv [col#21] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/samples/sample-20K], PartitionFilters: [], PushedFilters: [IsNotNull(col)], ReadSchema: struct<col:string>
   +- *(5) Sort [id#1 ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(id#1, 1000)
         +- Union
            :- *(3) Project [group#0, id#1]
            :  +- *(3) Filter isnotnull(id#1)
            :     +- *(3) FileScan parquet default.membership_g043_append[group#0,id#1] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/data/table1], PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<group:string,id:string>
            +- *(4) Project [group#4, id#5]
               +- *(4) Filter isnotnull(id#5)
                  +- *(4) FileScan parquet default.membership_g042[group#4,id#5] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/data/table2], PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<group:string,id:string>

在这种情况下出现了排序和分区(第 5 步)

如何在不排序和交换的情况下合并两个 hive 表

【问题讨论】:

【参考方案1】:

据我所知,spark在加入时不考虑排序,只考虑分区。因此,为了获得有效的连接,您必须按同一列进行分区。这是因为排序并不能保证具有相同键的记录最终会出现在同一个分区中。 Spark 必须确保所有具有相同值的键都被洗牌到同一个分区和来自多个数据帧的同一个执行器上。

【讨论】:

以上是关于在火花中联合后再次排序的蜂巢表排序的主要内容,如果未能解决你的问题,请参考以下文章

在已经对选定表进行排序时优化联合所有订单

在火花对 RDD 中按值排序

在火花数据框中的每一行的地图类型列中按键排序

MySQL的外键,修改表,基本数据类型,表级别操作,其他(条件,通配符,分页,排序,分组,联合,连表操作)

在满足条件之前返回行值的排序行的火花聚合

在asp.net mvc中删除数据成功后重新排序自动编号列JQuery Datatable