如何提高 Databricks 性能?

Posted

技术标签:

【中文标题】如何提高 Databricks 性能?【英文标题】:How to increase Databricks performance? 【发布时间】:2021-09-06 02:23:18 【问题描述】:

我在这里遇到一个问题,我写到突触运行需要很长时间(> 20 小时)。我可以做些什么来改进需要写入突触的 Databricks?我的资源表来自 Azure Synase 上的 Fact Table(包含 1.51 亿行)。我假设我的脚本不适合在 Databricks 上运行,并且我假设它是由垃圾收集引起的,导致我的工作陷入困境。但是,我怎样才能解决这个问题,以防止长时间运行到最快的时间?

这是我的脚本,由 CUBE 分组:

cube_department_read = cube_department_df.cube(cube_department_df["YEAR"], cube_department_df["WeekOfYear"], cube_department_df["Month"], 
                                cube_department_df["department_groups"], cube_department_df["category_name"], 
                                cube_department_df["subcategory_name"], cube_department_df["section_name"]) \
        .agg(F.max('last_date_of_week').alias('last_date_of_week'), 
             F.countDistinct('internal_tranx_key').alias('sales_basket'), 
             F.sum('SalesAmt').alias('sales_amt'), 
             F.sum('SalesQty').alias('sales_qty'),
             F.sum('SalesQtyPro').alias('SalesQtyPro'), 
             F.sum('SalesAmtPro').alias('SalesAmtPro'),
             F.countDistinct('membership_id').alias('member_count'),
             F.sum(F.when(cube_department_df["membership_id"].isNotNull(), 
                        cube_department_df["SalesQty"]).otherwise(0)).alias("SalesQty_Member"),
             F.sum(F.when(cube_department_df["membership_id"].isNotNull(), 
                        cube_department_df["SalesAmt"]).otherwise(0)).alias("SalesAmt_Member"),
             F.sum(F.when(cube_department_df["membership_id"].isNotNull(), 
                        1).otherwise(0)).alias("Basket_Count_Member"),
             F.sum(F.when(cube_department_df["membership_id"].isNotNull(), 
                        0).otherwise(cube_department_df["SalesQty"])).alias("SalesQty_NonMember"),
             F.sum(F.when(cube_department_df["membership_id"].isNotNull(), 
                        0).otherwise(cube_department_df["SalesAmt"])).alias("SalesAmt_NonMember"),
             F.sum(F.when(cube_department_df["membership_id"].isNotNull(), 
                        0).otherwise(1)).alias("Basket_Count_NonMember"),
             F.sum(F.when(cube_department_df["promotion_flag"] == 'Y', 
                        cube_department_df["SalesAmt"]).otherwise(0)).alias("SalesAmt_MMDS_Promotion"),
             F.sum(F.when(cube_department_df["promotion_flag"] == 'Y', 
                        cube_department_df["SalesQty"]).otherwise(0)).alias("SalesQty_MMDS_Promotion"),
             F.sum(F.when(cube_department_df["promotion_flag"] == 'Y', 
                        1).otherwise(0)).alias("Basket_Count_MMDS_Promotion"),
             F.sum(F.when(cube_department_df["promotion_flag"] == 'Y', 
                        0).otherwise(cube_department_df["SalesAmt"])).alias("SalesAmt_Non_MMDS_Promotion"),
             F.sum(F.when(cube_department_df["promotion_flag"] == 'Y', 
                        0).otherwise(cube_department_df["SalesQty"])).alias("SalesQty_Non_MMDS_Promotion"),
             F.sum(F.when(cube_department_df["promotion_flag"] == 'Y', 
                        0).otherwise(1)).alias("Basket_Count_Non_MMDS_Promotion"),
             F.sum(F.when((cube_department_df["promotion_flag"] == 'Y') & (cube_department_df["membership_id"].isNotNull()), 
                        cube_department_df["SalesAmt"]).otherwise(0)).alias("SalesAmt_Member_MMDS_Promotion"),
             F.sum(F.when((cube_department_df["promotion_flag"] == 'Y') & (cube_department_df["membership_id"].isNotNull()), 
                        cube_department_df["SalesQty"]).otherwise(0)).alias("SalesQty_Member_MMDS_Promotion"),
             F.sum(F.when((cube_department_df["promotion_flag"] == 'Y') & (cube_department_df["membership_id"].isNotNull()), 
                        1).otherwise(0)).alias("Basket_Count_Member_MMDS_Promotion"),
             F.sum(F.when((cube_department_df["promotion_flag"] == 'Y') & (cube_department_df["membership_id"].isNull()), 
                        cube_department_df["SalesAmt"]).otherwise(0)).alias("SalesAmt_Non_Member_MMDS_Promotion"),
             F.sum(F.when((cube_department_df["promotion_flag"] == 'Y') & (cube_department_df["membership_id"].isNull()), 
                        cube_department_df["SalesQty"]).otherwise(0)).alias("SalesQty_Non_Member_MMDS_Promotion"),
             F.sum(F.when((cube_department_df["promotion_flag"] == 'Y') & (cube_department_df["membership_id"].isNull()), 
                        1).otherwise(0)).alias("Basket_Count_Non_Member_MMDS_Promotion"),
             F.sum(F.when((cube_department_df["promotion_flag"] == 'N') & (cube_department_df["membership_id"].isNotNull()), 
                        cube_department_df["SalesAmt"]).otherwise(0)).alias("SalesAmt_Member_Non_MMDS_Promotion"),
             F.sum(F.when((cube_department_df["promotion_flag"] == 'N') & (cube_department_df["membership_id"].isNotNull()), 
                        cube_department_df["SalesQty"]).otherwise(0)).alias("SalesQty_Member_Non_MMDS_Promotion"),
             F.sum(F.when((cube_department_df["promotion_flag"] == 'N') & (cube_department_df["membership_id"].isNotNull()), 
                        1).otherwise(0)).alias("Basket_Count_Member_Non_MMDS_Promotion"),
             F.sum(F.when((cube_department_df["promotion_flag"] == 'N') & (cube_department_df["membership_id"].isNull()), 
                        cube_department_df["SalesAmt"]).otherwise(0)).alias("SalesAmt_Non_Member_Non_MMDS_Promotion"),
             F.sum(F.when((cube_department_df["promotion_flag"] == 'N') & (cube_department_df["membership_id"].isNull()), 
                        cube_department_df["SalesQty"]).otherwise(0)).alias("SalesQty_Non_Member_Non_MMDS_Promotion"),
             F.sum(F.when((cube_department_df["promotion_flag"] == 'N') & (cube_department_df["membership_id"].isNull()), 
                        1).otherwise(0)).alias("Basket_Count_Non_Member_Non_MMDS_Promotion"),
            F.when((F.sum(cube_department_df["SalesQty"]) < 0) & (F.sum(cube_department_df["SalesAmt"]) < 0),
                (F.sum(cube_department_df["SalesAmt"]) / F.sum(cube_department_df["SalesQty"])) * -1) \
                .when((F.sum(cube_department_df["SalesQty"]) == 0) | (F.sum(cube_department_df["SalesAmt"]) == 0),
                0).otherwise(F.sum(cube_department_df["SalesAmt"]) / F.sum(cube_department_df["SalesQty"])).alias("sales_per_unit"),
            F.when((F.sum(cube_department_df["SalesQty"]) < 0) & (F.sum(cube_department_df["SalesAmt"]) < 0),
                (F.sum(cube_department_df["SalesAmt"]) / F.countDistinct(cube_department_df["internal_tranx_key"])) * -1) \
                .when((F.sum(cube_department_df["SalesQty"]) == 0) | (F.sum(cube_department_df["SalesAmt"]) == 0),
                0).otherwise(F.sum(cube_department_df["SalesAmt"]) / F.countDistinct(cube_department_df["internal_tranx_key"])).alias("sales_per_basket"),
            F.when((F.sum(cube_department_df["SalesQty"]) < 0) & (F.sum(cube_department_df["SalesAmt"]) < 0),
                (F.sum(cube_department_df["SalesQty"]) / F.countDistinct(cube_department_df["internal_tranx_key"])) * -1) \
                .when((F.sum(cube_department_df["SalesQty"]) == 0) | (F.sum(cube_department_df["SalesAmt"]) == 0),
                0).otherwise(F.sum(cube_department_df["SalesQty"]) / F.countDistinct(cube_department_df["internal_tranx_key"])).alias("unit_per_basket"),    
            F.when((F.countDistinct(cube_department_df["membership_id"]) < 0) & (F.sum(cube_department_df["SalesAmt"]) < 0),
                (F.sum(cube_department_df["SalesAmt"]) / F.countDistinct(cube_department_df["membership_id"])) * -1) \
                .when((F.countDistinct(cube_department_df["membership_id"]) == 0) | (F.sum(cube_department_df["SalesAmt"]) == 0),
                0).otherwise(F.sum(cube_department_df["SalesAmt"]) / F.countDistinct(cube_department_df["membership_id"])).alias("spend_per_customer")) \
        .select("YEAR","WeekOfYear","Month","department_groups","category_name","subcategory_name","section_name",
                "last_date_of_week","sales_basket","sales_amt","sales_qty","SalesQtyPro","SalesAmtPro",
                "member_count","SalesQty_Member","SalesAmt_Member", "Basket_Count_Member",
                "SalesQty_NonMember","SalesAmt_NonMember", "Basket_Count_NonMember", 
                "SalesAmt_MMDS_Promotion", "SalesQty_MMDS_Promotion", "Basket_Count_MMDS_Promotion",
                "SalesAmt_Non_MMDS_Promotion","SalesQty_Non_MMDS_Promotion", "Basket_Count_Non_MMDS_Promotion",
                "SalesAmt_Member_MMDS_Promotion","SalesQty_Member_MMDS_Promotion","Basket_Count_Member_MMDS_Promotion",
                "SalesAmt_Non_Member_MMDS_Promotion","SalesQty_Non_Member_MMDS_Promotion","Basket_Count_Non_Member_MMDS_Promotion",
                "SalesAmt_Member_Non_MMDS_Promotion","SalesQty_Member_Non_MMDS_Promotion","Basket_Count_Member_Non_MMDS_Promotion",
               "SalesAmt_Non_Member_Non_MMDS_Promotion","SalesQty_Non_Member_Non_MMDS_Promotion","Basket_Count_Non_Member_Non_MMDS_Promotion",
                "sales_per_unit","sales_per_basket","unit_per_basket", "spend_per_customer") \
        .orderBy(F.col("YEAR").asc(), 
           F.col("WeekOfYear").asc(), 
           F.col("Month").asc(),
           F.col("department_groups").asc(), 
           F.col("category_name").asc(),
           F.col("subcategory_name").asc(), 
           F.col("section_name").asc())

这是我的垃圾收集

那么,我可以从这里做什么?我有一个包含 1.51 亿行的事实表资源。对不起,我是 Databricks 的新手,因为我需要做 CUBE 脚本,Synapse CUBE 还不支持,所以我需要在 databricks 上做这个 CUBE。

【问题讨论】:

【参考方案1】:

从 Databricks 写入 Synapse 最慢的部分是 Databricks 写入临时目录(Azure Blob 存储)的步骤。

Databricks -> Polybase 不处理 Blob 存储。只有 Blob Store -> Synapse/Azure DW 是 Polybase,这部分通常会快速移动。

您可以尝试更改写语义:Databricks documentation

使用复制写入语义,您将能够更快地在 Synapse 中加载数据。

你可以在运行写命令之前进行配置,这样:

spark.conf.set("spark.databricks.sqldw.writeSemantics", "copy")

【讨论】:

我试过这个配置,但是什么也没发生,和以前一样,慢慢写。有什么办法可以处理数据块的性能吗?我曾尝试使用写入增量写入我的 blob 存储,但性能与写入突触相同,非常非常低 我请求您提出对突触写入问题的支持。对于blob,如果我得到,我会尝试提供任何解决方案。 我认为这更多是cube 函数的性能问题,而不是将数据持久化到文件系统。 cube 函数将为所有可能的组合生成行。 explode 也是一个生成行的函数,在某些情况下效率极低

以上是关于如何提高 Databricks 性能?的主要内容,如果未能解决你的问题,请参考以下文章

Azure Databricks - 从 Gen2 Data Lake Storage 运行 Spark Jar

偶数科技:基于OushuDB的新一代云原生湖仓一体为企业助力

只读取 sqlContext 中的前几行或标题

Spark - groupByKey over reduceByKey 的用例是啥

在 AWS 上为 Databricks 和 Snowflake 使用 Spark 连接器

将数据从 Databricks 加载到 Azure SQL 时的性能问题