动态构建大型数据框(spark 或 pandas)以导出到 csv 的方法

Posted

技术标签:

【中文标题】动态构建大型数据框(spark 或 pandas)以导出到 csv 的方法【英文标题】:Method to dynamically build large dataframe (spark or pandas) for export to csv 【发布时间】:2020-07-11 22:49:39 【问题描述】:

我有一个使用 spark.read 导入数据块的 csv。这个大文件包含每天的记录/交易。我将数据框缩减为 5 列,并保持 500,000 行不变。我正在尝试构建此源文件的汇总表,以一个月级别(汇总)表示这些记录/交易。

该脚本有一个 filter/groupby/sum 命令,它返回一行,将数据汇总为一个月的计数。查询返回的行如下所示:

+---------+---------+-------+-------------+
|  Country|StockCode|YYYY-MM|sum(Quantity)|
+---------+---------+-------+-------------+
|Singapore|        M| 2011-4|           10|
+---------+---------+-------+-------------+

脚本遍历源数据帧并每次返回。我无法使用此脚本的输出(显示或 csv 导出)。在 pyspark 和 pandas 中我都有问题。我不知道如何堆叠查询的结果以及它应该是什么形式?

#熊猫 如果我在 pandas 中执行此操作,脚本生成文件需要很长时间(我相信 pandas + 我这样做效率不高导致持续时间延长)~ 2.5 小时。 display 和 write.csv 命令的运行速度相当快,并且在大约几秒钟内完成。

#Pyspark 如果我在 pyspark 中执行此操作,脚本大约需要 10 分钟才能完成,但显示和导出崩溃。笔记本要么返回超时错误,要么重新启动,要么引发崩溃错误。

该方法是否应该是动态创建列表列表,并在完全构建后将其转换为数据框以供使用?我一直在尝试所有遇到的方法,但似乎没有任何进展。

这是生成结果的代码

#officeSummaryDFBefore
column_names = "Country|StockCode|YYYY-MM|Quantity"
monthlyCountsBeforeImpactDate = spark.createDataFrame(
  [
    tuple('' for i in column_names.split("|"))
  ],
  column_names.split("|")
).where("1=0")

monthlyCountsBeforeImpacteDateRow = spark.createDataFrame(
  [
    tuple('' for i in column_names.split("|"))
  ],
  column_names.split("|")
).where("1=0")

try :
  for country in country_lookup :
    country = country[0]
    print(country_count, " country(s) left")
    country_count = country_count - 1
    for stockCode in stockCode_lookup :
      stockCode = stockCode[0]
      monthlyCountsBeforeImpacteDateRow = dataBeforeImpactDate.filter((col("Country").rlike(country)) & (col("StockCode").rlike(stockCode))).groupby("Country", "StockCode", "YYYY-MM").sum()
      monthlyCountsBeforeImpacteDateRow.show()
      dfsCountsBefore = [monthlyCountsBeforeImpacteDateRow, monthlyCountsBeforeImpactDate]
      monthlyCountsBeforeImpactDate = reduce(DataFrame.union, dfsCountsBefore)
      
except Exception as e:
  print(e) 

我在循环内声明 dfsCountsBeforeImpactDate 这似乎不正确,但是当它在循环外时它返回为 NULL。

【问题讨论】:

你可以考虑为你的系统提供更多的内存:a)提供更多的物理内存,b)del不再使用的变量,c)你真的需要500,000行数据来获取摘要; d) 能否生成多个汇总行,然后获取汇总汇总? 【参考方案1】:

IIUC 您正在查找国家和股票以限制行,然后对它们进行分组以生成聚合。

为什么不完全过滤df然后分组

df = dataBeforeImpactDate

df = df.filter(col('country').isin(country_lookup) & col('stock').isin(stock_lookup))

df = df.groupby("Country", "StockCode", "YYYY-MM").sum()

df.show()

这会更快,因为您不需要循环过滤器,也不需要联合。

【讨论】:

让我试试你说的。感谢您的帮助!

以上是关于动态构建大型数据框(spark 或 pandas)以导出到 csv 的方法的主要内容,如果未能解决你的问题,请参考以下文章

如何将 pandas udf 应用于大型矩阵数据框

Pandas-根据开关用数据框填充字典

在 for 循环中构建 Spark sql 数据集

Pandas - 将大型数据框切成块

将大型 Dask 数据框与小型 Pandas 数据框合并

使用 sql 或 pandas 数据框获取前 5 行的 pyspark 数据框