在 Spark SQL 中将多个小表与大表连接的最佳方法

Posted

技术标签:

【中文标题】在 Spark SQL 中将多个小表与大表连接的最佳方法【英文标题】:Best way to join multiples small tables with a big table in Spark SQL 【发布时间】:2018-07-24 02:25:45 【问题描述】:

我正在使用 spark sql 连接多个表。其中一张表非常大,其他表很小(10-20 条记录)。真的我想用其他包含键值对的表来替换最大表中的值。

即 大表:

| Col 1 | Col 2 | Col 3 | Col 4 | ....
--------------------------------------
| A1    | B1    | C1    | D1    | ....
| A2    | B1    | C2    | D2    | ....
| A1    | B1    | C3    | D2    | ....
| A2    | B2    | C3    | D1    | ....
| A1    | B2    | C2    | D1    | ....
.
.
.
.
.

表2:

| Col 1 | Col 2 
----------------
| A1    | 1a    
| A2    | 2a    

表3:

| Col 1 | Col 2 
----------------
| B1    | 1b    
| B2    | 2b  

表3:

| Col 1 | Col 2 
----------------
| C1    | 1c    
| C2    | 2c  
| C3    | 3c

表4:

| Col 1 | Col 2 
----------------
| D1    | 1d    
| D2    | 2d  

预期的表是

| Col 1 | Col 2 | Col 3 | Col 4 | ....
--------------------------------------
| 1a    | 1b    | 1c    | 1d    | ....
| 2a    | 1b    | 2c    | 2d    | ....
| 1a    | 1b    | 3c    | 2d    | ....
| 2a    | 2b    | 3c    | 1d    | ....
| 1a    | 2b    | 2c    | 1d    | ....
.
.
.
.
.

我的问题是; 这是加入表格的最佳方式。 (想想有100个或更多的小桌子) 1)收集小数据帧,将其转换为地图,广播地图和转换大数据帧只需一步

bigdf.transform(ds.map(row => (small1.get(row.col1),.....)

2) 广播表格并使用 select 方法进行连接。

spark.sql("
       select * 
       from bigtable
       left join small1 using(id1) 
       left join small2 using(id2)")

3) 广播表和连接多个连接

bigtable.join(broadcast(small1), bigtable('col1') ==small1('col1')).join...

提前致谢

【问题讨论】:

【参考方案1】:

你可以这样做:

    广播所有小表(通过设置spark.sql.autoBroadcastJoinThreshold略高于小表行数自动完成)

    运行一个连接大表的sql查询

    val df = spark.sql("
               select * 
               from bigtable
               left join small1 using(id1) 
               left join small2 using(id2)")
    

编辑: 在 sql 和 spark "dataframe" 语法之间进行选择: sql 语法比 spark 语法更易读,更简洁(从数据库用户的角度来看)。 从开发者的角度来看,dataframe 语法可能更具可读性。

使用“数据集”语法的主要优点是编译器将能够跟踪一些错误。使用任何字符串语法,例如 sql 或列名 (col("mycol")) 将在运行时被发现。

【讨论】:

感谢您的回复。最好使用,sql或者使用join sql函数。 dataframe.join(.....)【参考方案2】:

如果您的小表中的数据小于阈值大小并且您的数据的物理文件是 parquet 格式,那么 spark 将自动广播小表,但如果您正在从其他一些数据源(如 sql、PostgreSQL)读取数据等等,然后有时 spark 不会自动广播表格。

如果您知道表的大小很小并且预计表的大小不会增加(如果是查找表),您可以显式广播数据框或表,这样您就可以有效地将更大的表与小桌子。

您可以使用数据框上的 explain 命令验证小表是否正在广播,也可以从 Spark UI 中进行。

【讨论】:

【参考方案3】:

最好的方法,正如已经写在答案中的那样,广播所有小桌子。也可以使用BROADCAST 提示在 SparkSQL 中完成:

val df = spark.sql("""
    select /*+ BROADCAST(t2, t3) */
        * 
    from bigtable t1
        left join small1 t2 using(id1) 
        left join small2 t3 using(id2)
""")

【讨论】:

以上是关于在 Spark SQL 中将多个小表与大表连接的最佳方法的主要内容,如果未能解决你的问题,请参考以下文章

HIVE优化场景七--数据倾斜--group by 倾斜

Spark的Join连接

SQL性能问题.现在表设计可以把一个大表按类型(各类型字段不相同)拆分成多个小表.拆分后比较方便.

MySQL表连接之驱动表与被驱动表

表连接中的驱动表与被驱动表

hive大表和小表MapJoin关联查询优化