Spark的join啥情况下可以避免shuffle?

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark的join啥情况下可以避免shuffle?相关的知识,希望对你有一定的参考价值。

参考技术A Spark的join操作可能触发shuffle操作。shuffle操作要经过磁盘IO,网络传输,对性能影响比较大。本文聊一聊Spark的join在哪些情况下可以避免shuffle过程。

针对Spark DataFrame/DataSet的join,可以通过broadcast join和bucket join来避免shuffle操作。

Broadcast join很好理解,小表被分发到所有executors,所以不需要做shuffle就可以完成join. Spark SQL控制自动broadcast join的参数是:spark.sql.autoBroadcastJoinThreshold , 默认为10MB. 就是说当join中的一张表的size小于10MB时,spark会自动将其封装为broadcast发送到所有结点,然后进行broadcast join. 当然也可以手动将join中的某张表转化成broadcast : 

                 sparkSession.sparkContext.broadcast(df)

Bucket join其实就是将要join的两张表按照join columns(或join columns的子集)根据相同的partitioner预先做好分区,并将这些分区信息存储到catalog中(比如HiveExternalCatalog);然后在读取这两张表并做join时,spark根据bucket信息将两张表的相同partition进行join即可,从而避免了shuffle的过程。注意,这里是避免了shuffle过程,并没有完全避免网络传输,由于两张表的相同partition不一定在同一台机器上,所以这里仍需要对其中一张表的partition进行网络传输。关于spark bucketing的原理和使用细节可以参见这个 视频 。

笔者这里想讨论的是PairRDDFunctions类的join方法。在RDD对象中有一个隐式转换可以将rdd转换成PairRDDFunctions对象,这样就可以直接在rdd对象上调用join方法:

先来看看PairRDDFunctions的join方法:

PairRDDFunctions有多个重载的join方法,上面这个只带一个RDD对象的参数,我们接着看它调用的另一个重载的join方法:

可以看到,RDD的join实现是由cogroup方法完成的,cogroup完后得到的是类型为RDD[(K, (Iterable[V], Iterable[W]))]的rdd对象,其中K为key的类型,V为第一张join表的value类型,W为第二张join表的value类型;然后,调用flatMapValues将其转换成RDD[(K, V, W)]的rdd对象。

下面来看看PairRDDFunctions.cogroup方法的实现:

cogroup中生成了CoGroupedRDD对象,所以关键是这个RDD的getDependencies方法返回的dependencies中是否存在shuffle dependency.

看看这个RDD的getDependencies方法:

其中的rdds就是进行cogroup的rdd序列,也就是PairRDDFunctions.cogroup方法中传入的 Seq(self, other)  .

重点来了,对于所有参与cogroup的rdd,如果它的partitioner和结果CoGroupedRDD的partitioner相同,则该rdd会成为CoGroupedRDD的一个oneToOne窄依赖,否则就是一个shuffle依赖,即宽依赖。

我们知道,只有宽依赖才会触发shuffle,所以RDD的join可以避免shuffle的条件是: 参与join的所有rdd的partitioner都和结果rdd的partitioner相同。

那么,结果rdd的partitioner是怎么确定的呢?上文讲到PairRDDFunctions.join方法有多个重载,其中就有可以指定partitioner的重载,如果没有指定,则使用默认的partitioner,看看默认的partitioner是怎么确定的:

简单地说就是:

1. 如果父rdds中有可用的合格的partitioner,则直接使用其中分区数最大的那个partitioner;

2. 如果没有,则根据默认分区数生成HashPartitioner.

至于怎样的partitioner是合格的,请读者阅读上面的Partitioner.defaultPartitioner方法和Partitioner.isEligiblePartitioner方法。

RDD的compute方法是真正计算得到数据的方法,我们来看看CoGroupedRDD的compute方法是怎么实现的:

可以看到,CoGroupedRDD的数据是根据不同的依赖从父rdd中获取的:

1. 对于窄依赖,直接调用父rdd的iterator方法获取对应partition的数据

2. 对于宽依赖,从shuffleManager获取shuffleReader对象进行读取,这里就是shuffle read了

还有一个重点是读取多个父rdds的数据后,怎么将这些数据根据key进行cogroup?

这里用到了ExternalAppendOnlyMap来构建key和grouped values的映射。先来看看createExternalMap的实现:

相关类型定义如下:

可以看到,ExternalAppendOnlyMap的构造函数的参数是是三个方法参数:

1. createCombiner : 对每个key创建用于合并values的combiner数据结构,在这里就是一个CoGroup的数据,数组大小就是dependencies的数量

2. mergeValue : 将每个value合并到对应key的combiner数据结构中,在这里就是将一个CoGroupValue对象添加到其所在rdd对应的CoGroup中

3. mergeCombiners : 合并相同key的两个combiner数据结构,在这里就是合并两个CoGroupCombiner对象

CoGroupedRDD.compute会调用ExternalAppendOnlyMap.insertAll方法将从父rdds得到的数据一个一个地插入到ExternalAppendOnlyMap对象中进行合并。

最后,以这个ExternalAppendOnlyMap对象作为参数构造InterruptibleIterator,这个iterator会被调用者用于访问CoGroupedRDD的单个partition的所有数据。

本文简单地介绍了DataFrame/DataSet如何避免join中的shuffle过程,并根据源码详述了RDD的join操作的具体实现细节,分析了RDD的join在什么情况下可以避免shuffle过程。

1. 源码版本:2.4.0

2. 水平有限,如有错误,望读者指出

在 SELECT ... INNER JOIN ... FOR UPDATE 的情况下,字符串的顺序是啥锁定以及如何避免死锁?

【中文标题】在 SELECT ... INNER JOIN ... FOR UPDATE 的情况下,字符串的顺序是啥锁定以及如何避免死锁?【英文标题】:What the sequence of the strings locks in case of SELECT … INNER JOIN … FOR UPDATE and how to avoid deadlock?在 SELECT ... INNER JOIN ... FOR UPDATE 的情况下,字符串的顺序是什么锁定以及如何避免死锁? 【发布时间】:2018-05-05 11:51:13 【问题描述】:

角色 MySQL 5.7.21-20

— Memory temporary table TQueue
CREATE TEMPORARY TABLE IF NOT EXISTS TQueue (
    ID bigint(20) UNSIGNED NOT NULL AUTO_INCREMENT,
    QUEUE_STATUS enum ('ADDED', 'PROCESSED', 'SUCCESS', 'ERROR') NOT NULL DEFAULT 'ADDED',
    QUEUE_TIMEOUT datetime NOT NULL,
    ACTION enum ('INSERT', 'DELETE', 'UPDATE') NOT NULL,
    REPORT_ID tinyint(4) UNSIGNED NOT NULL,
    LOGIN int(11) NOT NULL,
    `GROUP` char(16) NOT NULL,
    ENABLE int(11) NOT NULL,
    ENABLE_CHANGE_PASS int(11) NOT NULL,
    ENABLE_READONLY int(11) NOT NULL,
    ENABLE_OTP int(11) NOT NULL,
    PASSWORD_PHONE char(32) NOT NULL,
    NAME char(128) NOT NULL,
    COUNTRY char(32) NOT NULL,
    CITY char(32) NOT NULL,
    STATE char(32) NOT NULL,
    ZIPCODE char(16) NOT NULL,
    ADDRESS char(128) NOT NULL,
    LEAD_SOURCE char(32) NOT NULL,
    PHONE char(32) NOT NULL,
    EMAIL char(48) NOT NULL,
    COMMENT char(64) NOT NULL,
    ID_DOCUMENT char(32) NOT NULL,
    STATUS char(16) NOT NULL,
    REGDATE datetime NOT NULL,
    LASTDATE datetime NOT NULL,
    LEVERAGE int(11) NOT NULL,
    AGENT_ACCOUNT int(11) NOT NULL,
    TIMESTAMP int(11) NOT NULL,
    BALANCE double NOT NULL,
    PREVMONTHBALANCE double NOT NULL,
    PREVBALANCE double NOT NULL,
    CREDIT double NOT NULL,
    INTERESTRATE double NOT NULL,
    TAXES double NOT NULL,
    SEND_REPORTS int(11) NOT NULL,
    MQID int(10) UNSIGNED NOT NULL,
    USER_COLOR int(11) NOT NULL,
    EQUITY double NOT NULL,
    MARGIN double NOT NULL,
    MARGIN_LEVEL double NOT NULL,
    MARGIN_FREE double NOT NULL,
    CURRENCY char(16) NOT NULL,
    API_DATA blob DEFAULT NULL,
    MODIFY_TIME datetime NOT NULL,
    PRIMARY KEY (ID),
    INDEX IDX_JOIN USING BTREE (LOGIN, REPORT_ID)
  )
  ENGINE = MEMORY;


CREATE TABLE `MT4_USERS` (
  ID bigint(20) UNSIGNED NOT NULL AUTO_INCREMENT,
    REPORT_ID tinyint(4) UNSIGNED NOT NULL,
    LOGIN int(11) NOT NULL,
    `GROUP` char(16) NOT NULL,
    ENABLE int(11) NOT NULL,
    ENABLE_CHANGE_PASS int(11) NOT NULL,
    ENABLE_READONLY int(11) NOT NULL,
    ENABLE_OTP int(11) NOT NULL,
    PASSWORD_PHONE char(32) NOT NULL,
    NAME char(128) NOT NULL,
    COUNTRY char(32) NOT NULL,
    CITY char(32) NOT NULL,
    STATE char(32) NOT NULL,
    ZIPCODE char(16) NOT NULL,
    ADDRESS char(128) NOT NULL,
    LEAD_SOURCE char(32) NOT NULL,
    PHONE char(32) NOT NULL,
    EMAIL char(48) NOT NULL,
    COMMENT char(64) NOT NULL,
    ID_DOCUMENT char(32) NOT NULL,
    STATUS char(16) NOT NULL,
    REGDATE datetime NOT NULL,
    LASTDATE datetime NOT NULL,
    LEVERAGE int(11) NOT NULL,
    AGENT_ACCOUNT int(11) NOT NULL,
    TIMESTAMP int(11) NOT NULL,
    BALANCE double NOT NULL,
    PREVMONTHBALANCE double NOT NULL,
    PREVBALANCE double NOT NULL,
    CREDIT double NOT NULL,
    INTERESTRATE double NOT NULL,
    TAXES double NOT NULL,
    SEND_REPORTS int(11) NOT NULL,
    MQID int(10) UNSIGNED NOT NULL,
    USER_COLOR int(11) NOT NULL,
    EQUITY double NOT NULL,
    MARGIN double NOT NULL,
    MARGIN_LEVEL double NOT NULL,
    MARGIN_FREE double NOT NULL,
    CURRENCY char(16) NOT NULL,
    API_DATA blob DEFAULT NULL,
    MODIFY_TIME datetime NOT NULL,
    PRIMARY KEY (ID),
    UNIQUE KEY IDX_LOGIN_REPORT_ID (`LOGIN`,`REPORT_ID`)
) 
ENGINE=InnoDB 
ROW_FORMAT=COMPRESSED

从内存表TQueue中的队列中选择数据。

我需要插入表 MT4_USERS。应该更新现有的字符串,所以我在 DUPLICATE KEY UPDATE 上做。

为了确保不会发生死锁,我尝试在 INSERT ON DUPLICATE KEY UPDATE 之前进行排他锁。

SET SESSION TRANSACTION ISOLATION LEVEL READ COMMITTED;
START TRANSACTION;
— hold exclusive lock on MT4_USERS table
SELECT 
u.* 
FROM 
TQueue q 
INNER JOIN MT4_USERS u USING (LOGIN, REPORT_ID) 
FOR UPDATE;

— insert to MT4_USERS form TQueue ON DUPLICATE KEY UPDATE
INSERT MT4_USERS (REPORT_ID, LOGIN, `GROUP`, ENABLE …)
SELECT
    REPORT_ID, LOGIN, `GROUP`, ENABLE, ENABLE_CHANGE_PASS …
FROM TQueue
ORDER BY ID ASC
ON DUPLICATE KEY UPDATE
    `GROUP` = VALUES(`GROUP`),
    ENABLE = VALUES(ENABLE),
    ENABLE_CHANGE_PASS = VALUES(ENABLE_CHANGE_PASS),
    …
COMMIT;

理想情况下,我想为表 MT4_USERS 的字符串设置排他锁,该锁与表 TQueue 的字符串及其字段(LOGIN、REPORT)一致,以便在不死锁的情况下执行 INSERT ON DUPLICATE KEY UPDATE。

但是我在并行事务中遇到了死锁。它执行相同的操作,但使用表 MT4_TRADES。它还使 LEFT JOIN MT4_USERS。

SET SESSION TRANSACTION ISOLATION LEVEL READ COMMITTED;
START TRANSACTION;
— hold exclusive lock on MT4_TRADES table
SELECT
t.*
FROM
TQueue q
INNER JOIN MT4_TRADES t USING (TICKET, REPORT_ID)
FOR UPDATE;

— insert to MT4_TRADES form TQueue ON DUPLICATE KEY UPDATE
INSERT MT4_TRADES (REPORT_ID, TICKET, LOGIN, SYMBOL, …)
SELECT
q.REPORT_ID, q.TICKET, q.LOGIN, q.SYMBOL, …
FROM
TQueue q
LEFT JOIN MT4_USERS mu USING(REPORT_ID, LOGIN)
ORDER BY
q.ID ASC
ON DUPLICATE KEY UPDATE
LOGIN = VALUES(LOGIN)
,SYMBOL = VALUES(SYMBOL)
,DIGITS = VALUES(DIGITS)
,…
COMMIT;

SHOW ENGINE INNODB STATUS
------------------------
LATEST DETECTED DEADLOCK
------------------------
2018-05-05 00:24:35 0x7fd53e5a6700
*** (1) TRANSACTION:
TRANSACTION 178168806, ACTIVE 1 sec starting index read
mysql tables in use 1, locked 1
LOCK WAIT 17 lock struct(s), heap size 1136, 45 row lock(s)
MySQL thread id 17661615, OS thread handle 140555520485120, query id 4591798647 event_scheduler Sending data
SELECT u.* FROM TQueue q INNER JOIN MT4_USERS u USING (LOGIN, REPORT_ID) FOR UPDATE
*** (1) WAITING FOR THIS LOCK TO BE GRANTED:
RECORD LOCKS space id 876643 page no 46339 n bits 952 index IDX_LOGIN_REPORT_ID of table `Developer`.`MT4_USERS` trx id 178168806 lock_mode X locks rec but not gap waiting
*** (2) TRANSACTION:
TRANSACTION 178168783, ACTIVE 3 sec starting index read
mysql tables in use 6, locked 4
8870 lock struct(s), heap size 1138896, 22372 row lock(s), undo log entries 1876
MySQL thread id 17661574, OS thread handle 140553850873600, query id 4591797640 event_scheduler
INSERT MT4_TRADES
  (
    REPORT_ID, TICKET, LOGIN, SYMBOL, DIGITS, CMD, VOLUME, OPEN_TIME, OPEN_PRICE, SL, TP, CLOSE_TIME,
    EXPIRATION, REASON, CONV_RATE1, CONV_RATE2, COMMISSION, COMMISSION_AGENT, SWAPS, CLOSE_PRICE,
    PROFIT, TAXES, COMMENT, INTERNAL_ID, MARGIN_RATE, `TIMESTAMP`, MAGIC, GW_VOLUME, GW_OPEN_PRICE,
    GW_CLOSE_PRICE, MODIFY_TIME, CURRENCY, DELIMER, RATE, TICKET_STATUS
  )
  SELECT
    q.REPORT_ID, q.TICKET, q.LOGIN, q.SYMBOL, q.DIGITS, q.CMD, q.VOLUME, q.OPEN_TIME, q.OPEN_PRICE, q.SL, q.TP, q.CLOSE_TIME,
    q.EXPIRATION, q.REASON, q.CONV_RATE1, q.CONV_RATE2, q.COMMISSION, q.COMMISSION_AGENT, q.SWAPS, q.CLOSE_PRICE,
    q.PROFIT, q.TAXES, q.COMMENT, q.INTERNAL_ID, q.MARGIN_RATE, q.`TIMESTAMP`, q.MAGIC, q.GW_VOLUME, q.GW_OPEN_PRICE,
    q.GW_CLOSE_PRICE, q.MODIFY_TIME, mu.CURRENCY, mu.DELIMER, IF(mu.CURRENCY = 'USD', 1, SearchOfRate(q.REPORT_ID, mu.CURRENCY, 'USD', q.MODIFY_TIME, -1)), IF(q.`
*** (2) HOLDS THE LOCK(S):
RECORD LOCKS space id 876643 page no 46339 n bits 952 index IDX_LOGIN_REPORT_ID of table `Developer`.`MT4_USERS` trx id 178168783 lock_mode X locks rec but not gap
*** (2) WAITING FOR THIS LOCK TO BE GRANTED:
RECORD LOCKS space id 876643 page no 46338 n bits 952 index IDX_LOGIN_REPORT_ID of table `Developer`.`MT4_USERS` trx id 178168783 lock_mode X locks rec but not gap waiting
*** WE ROLL BACK TRANSACTION (1)

在SELECT ... INNER JOIN ... FOR UPDATE的情况下,字符串的顺序是什么锁以及如何避免死锁?

【问题讨论】:

【参考方案1】:

死锁在您的MT4_USERS 表的索引上,我相信您已经注意到了。这就是为什么应用程序的不同部分中的两个查询显然会发生冲突的原因。

这个查询

SELECT u.* 
  FROM TQueue q 
 INNER JOIN MT4_USERS u USING (LOGIN, REPORT_ID) 
   FOR UPDATE;

锁定整个MT4_USERS 表。为什么?你可以只锁定一排吗?这是SELECT ... FOR UPDATE 的常见用例。

或者也许您应该跳过SELECT FOR UPDATE 锁和事务,而只执行INSERT ... ON DUPLICATE KEY UPDATE 操作独立。如果两个不同的操作更新相同的行,那么后面的操作将确定最终值。单个查询给出一致的结果。

我猜到了一些事情,因为您没有显示死锁中涉及的两个查询。

【讨论】:

"This query locks your entire MT4_USERS table. Can you lock just one row instead?» 我只需要对 TQueue 表中存在的字符串进行排他锁。我不需要阻止整个表,这就是为什么我在«INSERT ... ON DUPLICATE KEY UPDATE»开始执行之前执行«SELECT ... FOR UPDATE»。为什么说查询 «SELECT ... FOR UPDATE» 会阻塞整个表 MT4_USERS? «Or maybe you should skip the SELECT FOR UPDATE lock and the transaction, and just do the INSERT ... ON DUPLICATE KEY UPDATE operation standalone.» 如果没有排他锁,两个并行查询«INSERT ... ON DUPLICATE KEY UPDATE»会出现死锁,很明显,我已经遇到过这种情况。这就是为什么我想提前使用查询«SELECT ... FOR UPDATE»来获取排他锁。 TQueue 表的每个事务可能包含 100 到 10000 行。这就是为什么我不能使用简单的“SELECT ... FOR UPDATE”,我必须使用«SELECT...INNER JOIN..FOR UPDATE»。

以上是关于Spark的join啥情况下可以避免shuffle?的主要内容,如果未能解决你的问题,请参考以下文章

join的spark操作啥时候不会造成shuffle

当 shuffle 分区大于 200 时会发生啥(数据帧中的 spark.sql.shuffle.partitions 200(默认情况下))

在 SELECT ... INNER JOIN ... FOR UPDATE 的情况下,字符串的顺序是啥锁定以及如何避免死锁?

spark 内存溢出处理

Spark Dataframe Join shuffle

第37课:Spark中Shuffle详解及作业