在 Spark SQL Query 中通过 Repartition 重用 Exchange
Posted
技术标签:
【中文标题】在 Spark SQL Query 中通过 Repartition 重用 Exchange【英文标题】:Reuse Exchanges by Repartition in Spark SQL Query 【发布时间】:2020-06-30 04:12:15 【问题描述】:我正在尝试了解 Spark (2.4) 物理计划。我们通过 SQL API 与 Spark 交互。
我正在使用以下 sql。 sql 在步骤 1 中有一个聚合,在下一步中有一个 join
操作。我的意图是在聚合步骤之前 repartition
源表,以便我可以在以下步骤(在 SM-Join 步骤之前)中重复使用此 Exchange
并避免 Shuffles
(Exchanges
),但它由于 Spark 在 SMJ 之前添加了Exchanges
,因此没有按照我的预期工作。你能帮我理解我哪里出错了。
create or replace temporary view prsn_dtl as
select
policy_num,
prsn_id,
eff_dt,
from db.person_details
cluster by policy_num;
create or replace temporary view plcy_dtl as
select
policy_num,
role_desc,
prsn_actv_flg
from plcy_detail;
create or replace temporary view my_keys as
select
policy_num,
prsn_id,
max(eff_dt) eff_dt
from prsn_dtl
group by 1, 2;
select
keys.policy_num,
keys.prsn_id,
keys.eff_dt,
plcy.role_desc,
plcy.prsn_actv_flg
from my_keys keys
inner join plcy_dtl plcy
on keys.policy_num = plcy.policy_num;
在 DAG 表示中,我找到了 3 个Exchanges
(2
在左分支,1
在右分支)-
步骤 1) 第一个 hashpartitioning(policy_num#92, 200)
由于手动 cluster by
在 aggregate
之前
第 2 步)第二个是在 hashpartitioning(policy_num#163, prsn_id#164, 200)
上的 Aggregate
运算符之间
步骤3)最后hashpartitioning(policy_num#163)
在sort-merge Join之前
我的问题:
为什么上面第 1 步中的 Exchange
(来自 cluster by
)没有向下游传播,并且在 Sort-Merge Join 之前没有在第 3 步中重复使用。
我的期望是 Spark 将重用步骤 1 中的 Exchange
(cluster by
),并且不会在步骤 3 中添加另一个 Exchange
(在 SMJ 之前),因为左分支在 policy_num 上是 repartitioned
在查询的早期。
谁能解释我哪里出错了。任何帮助表示赞赏。
注意:我使用的是 Spark 2.4
谢谢
【问题讨论】:
【参考方案1】:通过不重命名查询中的列并将相同的列传播到下游查询,该问题似乎得到了缓解。
【讨论】:
以上是关于在 Spark SQL Query 中通过 Repartition 重用 Exchange的主要内容,如果未能解决你的问题,请参考以下文章
在spring data jpa中通过@Query更新Entity内部的关联对象
在 Apache Spark 中通过管道运行 Windows 批处理文件
在Java应用中通过SparkLauncher启动Spark任务
如何在 Spark 1.3 中通过 Hive 指示 SparkSQL 中的数据库