提高 Spark.SQL 中的数据整理性能

Posted

技术标签:

【中文标题】提高 Spark.SQL 中的数据整理性能【英文标题】:Improve data wrangling performance in Spark.SQL 【发布时间】:2020-05-19 08:08:31 【问题描述】:

我有一个包含多个 csv 文件的大型数据库。每个 csv 文件都包含最近 10 天的数据,只有最早的日期是最终数据。

例如“file_2019-08-11.csv”文件包含从 08-02 到 08-11 的数据(仅数据中日期为 08-02 的记录为最终记录)和“file_2019-08-12.csv”文件包含从 08-03 到 08-12 的数据(只有日期为 08-03 的记录是最终的)。

我正在使用 PySpark 来做到这一点。我的目标是仅保留 variables_2019-08-11.csv 文件中日期 08-02 的记录和 variables_2019-08-12.csv 文件中日期 08-03 的记录等。我正在使用 PySpark 和 Databricks 来做到这一点,我的 sn-p 正在工作但有点慢,尽管我在足够大的集群上运行它。

我很乐意就其他方案提出建议以提高其性能。谢谢

    import datetime
    # define the period range
    start_date="2019-08-12"
    end_date="2019-08-30



# create list of dates under date_generated variable

    start = datetime.datetime.strptime(start_date, "%Y-%m-%d")
    end = datetime.datetime.strptime(end_date, "%Y-%m-%d")
    date_generated = [start + datetime.timedelta(days=x) for x in range(0, (end-start).days)]

# read first file

    filename="file_variables_"+str(date_generated[0])[0:10]+".csv"
    df=spark.read.csv(data_path+filename,header="true")
    df.createOrReplaceTempView("df")

#create the main file which we will use the other dates to append below this one

    final=spark.sql("select * from df where data_date in (select min(data_date) from df)")

#loop on other dates than the first date 

    for date in date_generated[1:len(date_generated)]:
      filename="file_variables_"+str(date)[0:10]+".csv"
      df=spark.read.csv(data_path+filename,header="true")
      df.createOrReplaceTempView("df")
      temp=spark.sql("select * from df where data_date in (select min(data_date) from df)")
      final=final.union(temp)
    final.createOrReplaceTempView("final")

【问题讨论】:

【参考方案1】:

我怀疑您的大型集群上的大多数内核都处于空闲状态,因为根据您的代码在每个文件上使用循环的结构方式,您的工作是处理一个文件并且只使用集群中的一个内核。查看集群 -> [Your Cluster] -> Metrics -> [Ganglia UI]

首先,最好将所有文件作为一组处理。如果您的逻辑依赖于输入文件名,请使用input_file_name()。在片场完成所有工作。循环会影响你的表现。

其次,我认为窗口化的 SQL 函数 dense_rank() 将帮助您找到组中所有日期的第一个日期 [input_file_name()]。这里有一篇介绍窗口函数的博客:https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html

df=spark.read.csv(data_path)

from pyspark.sql.functions import input_file_name
df2 = df.withColumn('file_name',input_file_name())

final = df2.<apply logic>

【讨论】:

谢谢,我实现了第一个一次性处理所有文件。但没能成为第二个。 (df2 .groupBy("file_name") .agg(min("data.date")) .show()) 好的,很酷,我对窗口函数有点模糊,很高兴它有帮助。

以上是关于提高 Spark.SQL 中的数据整理性能的主要内容,如果未能解决你的问题,请参考以下文章

Spark SQL项目中的优化思路

如何提高具有数组列的 DataFrame 的 Spark SQL 查询性能?

spark-sql 与 spark-shell REPL 中的 Spark SQL 性能差异

hive 或 impala 中的计算表统计信息如何加速 Spark SQL 中的查询?

spark sql 中的结构化数据

spark sql 性能调优