在 Spark SQL Query 中通过 Repartition 重用 Exchange

Posted

技术标签:

【中文标题】在 Spark SQL Query 中通过 Repartition 重用 Exchange【英文标题】:Reuse Exchanges by Repartition in Spark SQL Query 【发布时间】:2020-06-30 04:12:15 【问题描述】:

我正在尝试了解 Spark (2.4) 物理计划。我们通过 SQL API 与 Spark 交互。

我正在使用以下 sql。 sql 在步骤 1 中有一个聚合,在下一步中有一个 join 操作。我的意图是在聚合步骤之前 repartition 源表,以便我可以在以下步骤(在 SM-Join 步骤之前)中重复使用此 Exchange 并避免 Shuffles (Exchanges),但它由于 Spark 在 SMJ 之前添加了Exchanges,因此没有按照我的预期工作。你能帮我理解我哪里出错了。

    create or replace temporary view prsn_dtl as
    select
    policy_num,
    prsn_id,
    eff_dt,
    from db.person_details
    cluster by policy_num;
    
    create or replace temporary view plcy_dtl as
    select
    policy_num,
    role_desc,
    prsn_actv_flg
    from plcy_detail;
    
    create or replace temporary view my_keys as
    select
    policy_num,
    prsn_id,
    max(eff_dt) eff_dt
    from prsn_dtl
    group by 1, 2;
    
    select
    keys.policy_num,
    keys.prsn_id,
    keys.eff_dt,
    plcy.role_desc,
    plcy.prsn_actv_flg
    from my_keys keys
    inner join plcy_dtl plcy
    on keys.policy_num = plcy.policy_num;

在 DAG 表示中,我找到了 3 个Exchanges2 在左分支,1 在右分支)- 步骤 1) 第一个 hashpartitioning(policy_num#92, 200) 由于手动 cluster byaggregate 之前 第 2 步)第二个是在 hashpartitioning(policy_num#163, prsn_id#164, 200) 上的 Aggregate 运算符之间 步骤3)最后hashpartitioning(policy_num#163)在sort-merge Join之前

我的问题: 为什么上面第 1 步中的 Exchange(来自 cluster by)没有向下游传播,并且在 Sort-Merge Join 之前没有在第 3 步中重复使用。

我的期望是 Spark 将重用步骤 1 中的 Exchange (cluster by),并且不会在步骤 3 中添加另一个 Exchange(在 SMJ 之前),因为左分支在 policy_num 上是 repartitioned在查询的早期。

谁能解释我哪里出错了。任何帮助表示赞赏。

注意:我使用的是 Spark 2.4

谢谢

【问题讨论】:

【参考方案1】:

通过不重命名查询中的列并将相同的列传播到下游查询,该问题似乎得到了缓解。

【讨论】:

以上是关于在 Spark SQL Query 中通过 Repartition 重用 Exchange的主要内容,如果未能解决你的问题,请参考以下文章

在spring data jpa中通过@Query更新Entity内部的关联对象

在 Apache Spark 中通过管道运行 Windows 批处理文件

在Java应用中通过SparkLauncher启动Spark任务

如何在 Spark 1.3 中通过 Hive 指示 SparkSQL 中的数据库

如何在代码中通过API监控Hadoop,Spark任务的进度和结果

Spark Sql 映射问题