如何提高 Databricks 性能?
Posted
技术标签:
【中文标题】如何提高 Databricks 性能?【英文标题】:How to increase Databricks performance? 【发布时间】:2021-09-06 02:23:18 【问题描述】:我在这里遇到一个问题,我写到突触运行需要很长时间(> 20 小时)。我可以做些什么来改进需要写入突触的 Databricks?我的资源表来自 Azure Synase 上的 Fact Table(包含 1.51 亿行)。我假设我的脚本不适合在 Databricks 上运行,并且我假设它是由垃圾收集引起的,导致我的工作陷入困境。但是,我怎样才能解决这个问题,以防止长时间运行到最快的时间?
这是我的脚本,由 CUBE 分组:
cube_department_read = cube_department_df.cube(cube_department_df["YEAR"], cube_department_df["WeekOfYear"], cube_department_df["Month"],
cube_department_df["department_groups"], cube_department_df["category_name"],
cube_department_df["subcategory_name"], cube_department_df["section_name"]) \
.agg(F.max('last_date_of_week').alias('last_date_of_week'),
F.countDistinct('internal_tranx_key').alias('sales_basket'),
F.sum('SalesAmt').alias('sales_amt'),
F.sum('SalesQty').alias('sales_qty'),
F.sum('SalesQtyPro').alias('SalesQtyPro'),
F.sum('SalesAmtPro').alias('SalesAmtPro'),
F.countDistinct('membership_id').alias('member_count'),
F.sum(F.when(cube_department_df["membership_id"].isNotNull(),
cube_department_df["SalesQty"]).otherwise(0)).alias("SalesQty_Member"),
F.sum(F.when(cube_department_df["membership_id"].isNotNull(),
cube_department_df["SalesAmt"]).otherwise(0)).alias("SalesAmt_Member"),
F.sum(F.when(cube_department_df["membership_id"].isNotNull(),
1).otherwise(0)).alias("Basket_Count_Member"),
F.sum(F.when(cube_department_df["membership_id"].isNotNull(),
0).otherwise(cube_department_df["SalesQty"])).alias("SalesQty_NonMember"),
F.sum(F.when(cube_department_df["membership_id"].isNotNull(),
0).otherwise(cube_department_df["SalesAmt"])).alias("SalesAmt_NonMember"),
F.sum(F.when(cube_department_df["membership_id"].isNotNull(),
0).otherwise(1)).alias("Basket_Count_NonMember"),
F.sum(F.when(cube_department_df["promotion_flag"] == 'Y',
cube_department_df["SalesAmt"]).otherwise(0)).alias("SalesAmt_MMDS_Promotion"),
F.sum(F.when(cube_department_df["promotion_flag"] == 'Y',
cube_department_df["SalesQty"]).otherwise(0)).alias("SalesQty_MMDS_Promotion"),
F.sum(F.when(cube_department_df["promotion_flag"] == 'Y',
1).otherwise(0)).alias("Basket_Count_MMDS_Promotion"),
F.sum(F.when(cube_department_df["promotion_flag"] == 'Y',
0).otherwise(cube_department_df["SalesAmt"])).alias("SalesAmt_Non_MMDS_Promotion"),
F.sum(F.when(cube_department_df["promotion_flag"] == 'Y',
0).otherwise(cube_department_df["SalesQty"])).alias("SalesQty_Non_MMDS_Promotion"),
F.sum(F.when(cube_department_df["promotion_flag"] == 'Y',
0).otherwise(1)).alias("Basket_Count_Non_MMDS_Promotion"),
F.sum(F.when((cube_department_df["promotion_flag"] == 'Y') & (cube_department_df["membership_id"].isNotNull()),
cube_department_df["SalesAmt"]).otherwise(0)).alias("SalesAmt_Member_MMDS_Promotion"),
F.sum(F.when((cube_department_df["promotion_flag"] == 'Y') & (cube_department_df["membership_id"].isNotNull()),
cube_department_df["SalesQty"]).otherwise(0)).alias("SalesQty_Member_MMDS_Promotion"),
F.sum(F.when((cube_department_df["promotion_flag"] == 'Y') & (cube_department_df["membership_id"].isNotNull()),
1).otherwise(0)).alias("Basket_Count_Member_MMDS_Promotion"),
F.sum(F.when((cube_department_df["promotion_flag"] == 'Y') & (cube_department_df["membership_id"].isNull()),
cube_department_df["SalesAmt"]).otherwise(0)).alias("SalesAmt_Non_Member_MMDS_Promotion"),
F.sum(F.when((cube_department_df["promotion_flag"] == 'Y') & (cube_department_df["membership_id"].isNull()),
cube_department_df["SalesQty"]).otherwise(0)).alias("SalesQty_Non_Member_MMDS_Promotion"),
F.sum(F.when((cube_department_df["promotion_flag"] == 'Y') & (cube_department_df["membership_id"].isNull()),
1).otherwise(0)).alias("Basket_Count_Non_Member_MMDS_Promotion"),
F.sum(F.when((cube_department_df["promotion_flag"] == 'N') & (cube_department_df["membership_id"].isNotNull()),
cube_department_df["SalesAmt"]).otherwise(0)).alias("SalesAmt_Member_Non_MMDS_Promotion"),
F.sum(F.when((cube_department_df["promotion_flag"] == 'N') & (cube_department_df["membership_id"].isNotNull()),
cube_department_df["SalesQty"]).otherwise(0)).alias("SalesQty_Member_Non_MMDS_Promotion"),
F.sum(F.when((cube_department_df["promotion_flag"] == 'N') & (cube_department_df["membership_id"].isNotNull()),
1).otherwise(0)).alias("Basket_Count_Member_Non_MMDS_Promotion"),
F.sum(F.when((cube_department_df["promotion_flag"] == 'N') & (cube_department_df["membership_id"].isNull()),
cube_department_df["SalesAmt"]).otherwise(0)).alias("SalesAmt_Non_Member_Non_MMDS_Promotion"),
F.sum(F.when((cube_department_df["promotion_flag"] == 'N') & (cube_department_df["membership_id"].isNull()),
cube_department_df["SalesQty"]).otherwise(0)).alias("SalesQty_Non_Member_Non_MMDS_Promotion"),
F.sum(F.when((cube_department_df["promotion_flag"] == 'N') & (cube_department_df["membership_id"].isNull()),
1).otherwise(0)).alias("Basket_Count_Non_Member_Non_MMDS_Promotion"),
F.when((F.sum(cube_department_df["SalesQty"]) < 0) & (F.sum(cube_department_df["SalesAmt"]) < 0),
(F.sum(cube_department_df["SalesAmt"]) / F.sum(cube_department_df["SalesQty"])) * -1) \
.when((F.sum(cube_department_df["SalesQty"]) == 0) | (F.sum(cube_department_df["SalesAmt"]) == 0),
0).otherwise(F.sum(cube_department_df["SalesAmt"]) / F.sum(cube_department_df["SalesQty"])).alias("sales_per_unit"),
F.when((F.sum(cube_department_df["SalesQty"]) < 0) & (F.sum(cube_department_df["SalesAmt"]) < 0),
(F.sum(cube_department_df["SalesAmt"]) / F.countDistinct(cube_department_df["internal_tranx_key"])) * -1) \
.when((F.sum(cube_department_df["SalesQty"]) == 0) | (F.sum(cube_department_df["SalesAmt"]) == 0),
0).otherwise(F.sum(cube_department_df["SalesAmt"]) / F.countDistinct(cube_department_df["internal_tranx_key"])).alias("sales_per_basket"),
F.when((F.sum(cube_department_df["SalesQty"]) < 0) & (F.sum(cube_department_df["SalesAmt"]) < 0),
(F.sum(cube_department_df["SalesQty"]) / F.countDistinct(cube_department_df["internal_tranx_key"])) * -1) \
.when((F.sum(cube_department_df["SalesQty"]) == 0) | (F.sum(cube_department_df["SalesAmt"]) == 0),
0).otherwise(F.sum(cube_department_df["SalesQty"]) / F.countDistinct(cube_department_df["internal_tranx_key"])).alias("unit_per_basket"),
F.when((F.countDistinct(cube_department_df["membership_id"]) < 0) & (F.sum(cube_department_df["SalesAmt"]) < 0),
(F.sum(cube_department_df["SalesAmt"]) / F.countDistinct(cube_department_df["membership_id"])) * -1) \
.when((F.countDistinct(cube_department_df["membership_id"]) == 0) | (F.sum(cube_department_df["SalesAmt"]) == 0),
0).otherwise(F.sum(cube_department_df["SalesAmt"]) / F.countDistinct(cube_department_df["membership_id"])).alias("spend_per_customer")) \
.select("YEAR","WeekOfYear","Month","department_groups","category_name","subcategory_name","section_name",
"last_date_of_week","sales_basket","sales_amt","sales_qty","SalesQtyPro","SalesAmtPro",
"member_count","SalesQty_Member","SalesAmt_Member", "Basket_Count_Member",
"SalesQty_NonMember","SalesAmt_NonMember", "Basket_Count_NonMember",
"SalesAmt_MMDS_Promotion", "SalesQty_MMDS_Promotion", "Basket_Count_MMDS_Promotion",
"SalesAmt_Non_MMDS_Promotion","SalesQty_Non_MMDS_Promotion", "Basket_Count_Non_MMDS_Promotion",
"SalesAmt_Member_MMDS_Promotion","SalesQty_Member_MMDS_Promotion","Basket_Count_Member_MMDS_Promotion",
"SalesAmt_Non_Member_MMDS_Promotion","SalesQty_Non_Member_MMDS_Promotion","Basket_Count_Non_Member_MMDS_Promotion",
"SalesAmt_Member_Non_MMDS_Promotion","SalesQty_Member_Non_MMDS_Promotion","Basket_Count_Member_Non_MMDS_Promotion",
"SalesAmt_Non_Member_Non_MMDS_Promotion","SalesQty_Non_Member_Non_MMDS_Promotion","Basket_Count_Non_Member_Non_MMDS_Promotion",
"sales_per_unit","sales_per_basket","unit_per_basket", "spend_per_customer") \
.orderBy(F.col("YEAR").asc(),
F.col("WeekOfYear").asc(),
F.col("Month").asc(),
F.col("department_groups").asc(),
F.col("category_name").asc(),
F.col("subcategory_name").asc(),
F.col("section_name").asc())
这是我的垃圾收集
那么,我可以从这里做什么?我有一个包含 1.51 亿行的事实表资源。对不起,我是 Databricks 的新手,因为我需要做 CUBE 脚本,Synapse CUBE 还不支持,所以我需要在 databricks 上做这个 CUBE。
【问题讨论】:
【参考方案1】:从 Databricks 写入 Synapse 最慢的部分是 Databricks 写入临时目录(Azure Blob 存储)的步骤。
Databricks -> Polybase 不处理 Blob 存储。只有 Blob Store -> Synapse/Azure DW 是 Polybase,这部分通常会快速移动。
您可以尝试更改写语义:Databricks documentation
使用复制写入语义,您将能够更快地在 Synapse 中加载数据。
你可以在运行写命令之前进行配置,这样:
spark.conf.set("spark.databricks.sqldw.writeSemantics", "copy")
【讨论】:
我试过这个配置,但是什么也没发生,和以前一样,慢慢写。有什么办法可以处理数据块的性能吗?我曾尝试使用写入增量写入我的 blob 存储,但性能与写入突触相同,非常非常低 我请求您提出对突触写入问题的支持。对于blob,如果我得到,我会尝试提供任何解决方案。 我认为这更多是cube
函数的性能问题,而不是将数据持久化到文件系统。 cube
函数将为所有可能的组合生成行。 explode
也是一个生成行的函数,在某些情况下效率极低以上是关于如何提高 Databricks 性能?的主要内容,如果未能解决你的问题,请参考以下文章
Azure Databricks - 从 Gen2 Data Lake Storage 运行 Spark Jar
偶数科技:基于OushuDB的新一代云原生湖仓一体为企业助力
Spark - groupByKey over reduceByKey 的用例是啥