SparkSQL join

Posted 坤岭

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SparkSQL join相关的知识,希望对你有一定的参考价值。

目录

SparkSQL join_type

INNER JOIN

CROSS JOIN

LEFT [ OUTER ] JOIN

[ LEFT ] SEMI JOIN

[ LEFT ] ANTI JOIN

RIGHT [ OUTER ] JOIN

FULL [ OUTER ] JOIN

SparkSQL Join实现

Join基本实现流程

inner join

left outer join

right outer join

full outer join

left semi join

left anti join

SPARK JOIN策略

Sort Merge Join

Broadcast Hash Join

Shuffle Hash Join

Cartesian Join

Broadcast Nested Loop Join


SparkSQL join_type

INNER JOIN

内部联接是 Spark SQL 中的默认联接。它选择在两个关系中具有匹配值的行。

inner join 只会返回两个表的交集

等同于 a left join b on a.字段 is not null and b.字段 is not null

CROSS JOIN

交叉连接返回两个关系的笛卡尔积。

如果不带WHERE条件子句,它将会返回被连接的两个表的笛卡尔积,返回结果的行数等于两个表行数的乘积笛卡尔乘积是指在数学中,两个集合XY的笛卡尓积(Cartesian product),又称直积,表示为X × Y,第一个对象是X的成员而第二个对象是Y的所有可能有序对的其中一个成员

LEFT [ OUTER ] JOIN

左连接返回左关系中的所有值和右关系中的匹配值,或者如果没有匹配则附加 NULL。它也称为左外部连接。

left join 是sql 中使用频率最高的一种连接方式,但是也是比较容易出错的一种连接方式

[ LEFT ] SEMI JOIN

半连接返回与右侧匹配的关系左侧的值。它也称为左半连接。

结果等同于 a left join b on b.字段 is not null

left semi join最主要的使用场景就是解决exist in。LEFT SEMI JOIN (左半连接)是 IN/EXISTS 子查询的一种更高效的实现。

注意:left semi join的JOIN 子句中右边的表只能在 ON 子句中设置过滤条件,在 WHERE 子句、

SELECT 子句或其他地方过滤都不行。

left semi join中最后 select 的结果只许出现左表的那些列

left semi join 是 in(keySet) 的关系,遇到右表重复记录,左表会跳过,而 join 则会一直遍历。这就导致右

表有重复值得情况下 left semi join 只产生一条,join 会产生多条,也会导致 left semi join 的性能更高。

[ LEFT ] ANTI JOIN

反连接从与右侧不匹配的左侧关系返回值。它也被称为左反连接。

结果等同于 a left join b on b.字段 is null

a left anti join b 的功能是在查询过程中,剔除a表中和b表有交集的部分

其实inner join 和 left anti join 的效果功能,都可以使用left join 最后在where中加以限制的方式进行实现, 但是这样的做法会导致查询效率变低;

测试中left anti join之后就不能再用b表的字段

在关联右表的时候,直接用表名比先把表查出来效率更快

RIGHT [ OUTER ] JOIN

右连接返回右关系中的所有值和左关系中的匹配值,或者如果没有匹配则附加 NULL。它也称为右外连接。

FULL [ OUTER ] JOIN

完全连接返回两个关系的所有值,在没有匹配的一侧附加 NULL 值。它也称为完全外部联接。

SparkSQL Join实现

Join三个要素:Join方式、Join条件以及过滤条件。其中过滤条件也可以通过AND语句放在Join条件中。

Join基本实现流程

Join的基本实现流程如下图所示,Spark将参与Join的两张表抽象为流式遍历表(streamIter)和查找表(buildIter),通常streamIter为大表,buildIter为小表,我们不用担心哪个表为streamIter,哪个表为buildIter,这个spark会根据join语句自动帮我们完成。

在实际计算时,spark会基于streamIter来遍历,每次取出streamIter中的一条记录rowA,根据Join条件计算keyA,然后根据该keyA去buildIter中查找所有满足Join条件(keyB==keyA)的记录rowBs,并将rowBs中每条记录分别与rowAjoin得到join后的记录,最后根据过滤条件得到最终join的记录。

inner join

inner join是一定要找到左右表中满足join条件的记录,在spark sql查询优化阶段,spark会自动将大表设为左表,即streamIter,将小表设为右表,即buildIter。这样对小表的查找相对更优。在查找阶段,如果右表不存在满足join条件的记录,则跳过。

left outer join

left outer join是以左表为准,在右表中查找匹配的记录,如果查找失败,则返回一个所有字段都

为null的记录。在写sql语句或者使用DataFrmae时,一般让大表在左边,小表在右边。

(之前不是有说过尽量让小表在前??)

right outer join

right outer join是以右表为准,在左表中查找匹配的记录,如果查找失败,则返回一个所有字段都

为null的记录。右表是streamIter,左表是buildIter,一般让大表在右边,小表在左边。

full outer join

full outer join相对来说要复杂一点,总体上来看既要做left outer join,又要做right outer join,所以full outer join仅采用sort merge join实现,左边和右表既要作为streamIter,又要作为buildIter

left semi join

left semi join是以左表为准,在右表中查找匹配的记录,如果查找成功,则仅返回左边的记录,否

则返回null。

left anti join

left anti join与left semi join相反,是以左表为准,在右表中查找匹配的记录,如果查找成功,则返

回null,否则仅返回左边的记录。

SPARK JOIN策略

Sort Merge Join

spark默认的,两张大表进行join时候使用,小表不进行配置Broadcast也会触发

主要包括三个阶段:

  • Shuffle 阶段:两张大表根据Join key进行Shuffle重分区
  • Sort 阶段: 每个分区内的数据进行排序
  • Merge 阶段: 对来自不同表的排序好的分区数据进行JOIN,通过遍历元素,连接具有相同Join key值的行来合并数据集

参数:spark.sql.join.prefersortmergeJoin

在shuffle read阶段,分别对streamIter和buildIter进行merge sort,在遍历streamIter时,对于每条

记录,都采用顺序查找的方式从buildIter查找对应的记录

SELECT /*+ MERGEJOIN(r) */ * FROM records r JOIN src s ON r.key = s.key

Broadcast Hash Join

当有一张表比较小的时候可以使用,比如事实表和维表进行join,可以提高join的效率

主要包括两个阶段:

  • broadcast阶段:将小表广播分发到大表所在的所有主机。涉及到不同的广播算法
  • hash join阶段:在每个executor上执行单机版hash join,小表映射,大表试探。

参数:spark.sql.autoBroadcastJoinThreshold

直接将buildIter广播到每个计算节点,然后将buildIter放到hash表中

源码具体实现:driver端根据表的统计信息,当发现一张小表达到广播条件的时候,就会将小表collect到driver端,然后构建一个HashedRelation,然后广播。

SELECT /*+ BROADCAST(r) */ * FROM records r JOIN src s ON r.key = s.key

Shuffle Hash Join

当要JOIN的表数据量比较大时使用,可以将大表按照JOIN的key进行重分区,保证每个相同的

JOIN key都发送到同一个分区中

主要包括两个阶段:

  • shuffle阶段:分别将两个表按照join key进行分区,将相同join key的记录重分布到同一节点,两张表的数据会被重分布到集群中所有节点
  • hash join阶段:每个分区节点上的数据单独执行单机hash join算法。

hash join实现方式,在shuffle read阶段不对记录排序,将来自buildIter的记录放到hash表中

具体实现:分治思想,将两张表按照相同的hash分区器及分区数进行,对join条件进行分区,需要join的key就会落入相同的分区里,然后就可以利用本地join的策略来进行join了。

SELECT /*+ SHUFFLE_HASH(r) */ * FROM records r JOIN src s ON r.key = s.key

需要注意以下四个条件:

  • buildIter总体估计大小超过spark.sql.autoBroadcastJoinThreshold设定的值,即不满足broadcast join条件
  • 开启尝试使用hash join的开关,spark.sql.join.preferSortMergeJoin=false
  • 每个分区的平均大小不超过spark.sql.autoBroadcastJoinThreshold设定的值,即shuffle read阶段每个分区来自buildIter的记录要能放到内存中
  • streamIter的大小是buildIter三倍以上

Cartesian Join

两张表在join的时候没有join key可以使用

Broadcast Nested Loop Join

没有其他的更好的方式可以使用

以上是关于SparkSQL join的主要内容,如果未能解决你的问题,请参考以下文章

SparkSQL join

SparkSQL大数据实战:揭开Join的神秘面纱

SparkSQL的3种Join实现

SparkSQL的3种Join实现

SparkSQL的3种Join实现

SparkSQL大数据实战:揭开Join的神秘面纱