在 Pyspark 中管理多个数据框

Posted

技术标签:

【中文标题】在 Pyspark 中管理多个数据框【英文标题】:Managing multiple dataframes in Pyspark 【发布时间】:2020-12-18 07:42:21 【问题描述】:

我是 PySpark 的新手。在我的实现中,我使用了多个数据框。其中某些是中间数据帧,稍后将不会在代码中使用。我该如何处理它们?我正面临 GC OverHead 和内存问题。任何帮助表示赞赏

df_total_ch=df_agg.groupby('circle','vendor','zone','nss_id','category','sub_category','node_model','node_name','OAF').agg(func.count('OAF').alias('total_Card_count'))

df_total_ch=df_total_ch.groupby('circle','vendor','zone','nss_id','category','sub_category','node_model','node_name').agg(func.count("*").alias('total_Card_count'))

df_new=df_agg.join(df_total_ch,on=['circle','vendor','zone','nss_id','category','sub_category','node_model','node_name'],how='left_outer')

base_df=df_new.select ('eventtime','circle','vendor','zone','nss_id','category','sub_category','node_model','node_name','OAF','temperature_event_count','temperature_weight','power_event_count','power_weight','hardware_event_count','hardware_weight','other_event_count','other_weight','housekeeping_event_count','housekeeping_weight')

base_df_final = base_df.groupby('eventtime','circle','vendor','zone','nss_id','category','sub_category','node_model','node_name','OAF').agg(func.sum('temperature_event_count').alias('temperature_event_count'),func.sum('temperature_weight').alias('temperature_weight'),func.sum('power_event_count').alias('power_event_count'),func.sum('power_weight').alias('power_weight'),func.sum('hardware_event_count').alias('hardware_event_count'),func.sum('hardware_weight').alias('hardware_weight'),func.sum('other_event_count').alias('other_event_count'),func.sum('other_weight').alias('other_weight'),func.sum('housekeeping_event_count').alias('housekeeping_event_count'),func.sum('housekeeping_weight').alias('housekeeping_weight'))

base_df_final=base_df_final.withColumn('hardware_flag',when(func.col('hardware_event_count') > 0,lit('Yes')).otherwise(lit('No')))
base_df_final=base_df_final.withColumn('power_flag',when(func.col('power_event_count') > 0,lit('Yes')).otherwise(lit('No')))
base_df_final=base_df_final.withColumn('temperature_flag',when(func.col('temperature_event_count') > 0,lit('Yes')).otherwise(lit('No')))
base_df_final=base_df_final.withColumn('others_flag',when(func.col('other_event_count') > 0,lit('Yes')).otherwise(lit('No')))
base_df_final=base_df_final.withColumn('housekeeping_flag',when(func.col('housekeeping_event_count') > 0,lit('Yes')).otherwise(lit('No')))

base_df_final.write.mode("append").saveAsTable("DFT.TBL_TX")

我收到以下错误

    base_df_final.write.mode("append").saveAsTable("DFT.TBL_TX")
  File "/opt/cloudera/parcels/CDH-6.3.3-1.cdh6.3.3.p0.1796617/lib/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 775, in saveAsTable
  File "/opt/cloudera/parcels/CDH-6.3.3-1.cdh6.3.3.p0.1796617/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
  File "/opt/cloudera/parcels/CDH-6.3.3-1.cdh6.3.3.p0.1796617/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
  File "/opt/cloudera/parcels/CDH-6.3.3-1.cdh6.3.3.p0.1796617/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o482.saveAsTable.
: org.spark_project.guava.util.concurrent.ExecutionError: java.lang.OutOfMemoryError: GC overhead limit exceeded
   at org.spark_project.guava.cache.LocalCache$Segment.get(LocalCache.java:2261)
   at org.spark_project.guava.cache.LocalCache.get(LocalCache.java:4000)
   at org.spark_project.guava.cache.LocalCache$LocalManualCache.get(LocalCache.java:4789)
   at org.apache.spark.sql.catalyst.catalog.SessionCatalog.getCachedPlan(SessionCatalog.scala:141)

【问题讨论】:

由于 spark 的惰性计算,这些中间数据帧实际上都不存在。您可以将这些数据框视为仅查询计划。如果您遇到内存问题,请尝试使用 select('*', col) 而不是 withColumn(col) - 这有时会有所帮助 能否详细说明在我当前的代码中使用这个“select('*',col) @stacktesting 例如,第一行 withcolumn 可以替换为base_df_final = base_df_final.select('*', when(func.col('hardware_event_count') > 0,lit('Yes')).otherwise(lit('No')).alias('hardware_flag'))。与列的其他行类似。 我尝试了上述方法。但仍然面临 GC 开销问题 【参考方案1】:

Spark 变换是惰性求值的。所有转换都已添加到数据框中,但尚未应用。当您调用 show()、count()、write() 等操作方法时,此时会应用转换。

因此,您正在对数据框应用大量链式转换并分配给新的转换。所有转换都存在于每个数据框的历史记录中。您可以通过执行 df.explain() 方法来查看它们。你终于获得了base_df_final 数据帧,这个df包含了上面所有的转换,并准备好使用action方法write()来应用。但是内存不足以同时做所有的操作,所以应该分成多个操作。

您可以考虑在转换之间的某处临时写入和读取相同的数据帧。这会将所有转换应用于 df 并清除它的历史记录。因此,您为两个不同(或更多)的操作使用更少的内存并临时使用磁盘空间

Spark Transformation - Why its lazy and what is the advantage?

【讨论】:

我试过上面的。但是代码仍然遇到内存问题 1/ 你能分享错误信息吗? 2/你的数据有多大,你的内存呢?您可以考虑更频繁的写入/读取操作或首先重新分区您的数据帧。如果分区大小很大,您可能会减少 spark.sql.shuffle.partitions 配置 我添加了中间读/写操作。此外,当数据存在于 spark 中并且我能够执行“计数”操作时。但它仅在加载到最终配置单元表时失败。我也用错误更新了原始问题。 我们有 256GB 内存。此外,我尝试将“spark.sql.shuffle.partitions”设置为值“10”和“100”。但它仍然抛出同样的错误

以上是关于在 Pyspark 中管理多个数据框的主要内容,如果未能解决你的问题,请参考以下文章

PySpark:在 Spark 数据框中读取多个 XML 文件(s3 路径列表)

如何在pyspark数据框中添加多个带有when条件的新列?

Pyspark 根据数据框 groupBy 制作多个文件

如何在pyspark中加入具有多个重叠的两个数据框

如何在pyspark中按列合并多个数据框?

使用 pyspark,如何将文件中单行的多个 JSON 文档读入数据框?