PySpark groupby applyInPandas 将对象保存为文件问题

Posted

技术标签:

【中文标题】PySpark groupby applyInPandas 将对象保存为文件问题【英文标题】:PySpark groupby applyInPandas save objects as files problem 【发布时间】:2021-05-08 16:45:29 【问题描述】:

我在 Jupyter Notebook 上以本地模式在 Windows 计算机上运行 PySpark 3.1。我在 Spark DataFrame 上调用“applyInPandas”。

下面的函数对输入的 Pandas DataFrame 应用一些数据转换,并训练一个 SGBT 模型。然后它将训练好的模型序列化为二进制并作为对象保存到 S3 存储桶中。最后它返回DataFrame。我从最后一行按两列分组的 Spark DataFrame 调用此函数。我没有收到错误,返回的 DataFrame 与输入的长度相同。返回每个组的数据。

问题是保存的模型对象。当每个组都应该有模型时,只有 2 组的对象保存在 S3 中。没有丢失/错误的数据点会导致模型训练失败。 (无论如何我都会收到错误或警告。)到目前为止我所尝试的:

替换 S3 并保存到本地文件系统:结果相同。 将“pickle”替换为“joblib”和“BytesIO”:结果相同。 调用函数前重新分区:现在我为不同的组保存了更多对象,但不是全部。 [我通过在最后一行调用“val_large_df.coalesce(1).groupby('la...”来做到这一点。]

所以我怀疑这是关于并行性和分布的,但我无法弄清楚。已经谢谢你了。

def train_sgbt(pdf):      
       ##Some data transformations here##    
       #Train the model
       sgbt_mdl=GradientBoostingRegressor(--Params.--).fit(--Params.--)
       sgbt_mdl_b=pickle.dumps(sgbt_mdl) #Serialize
       #Initiate s3_client
       s3_client = boto3.client(--Params.--)
       #Put file in S3
       s3_client.put_object(Body=sgbt_mdl_b, Bucket='my-bucket-name', 
            Key="models/BT_"+str(pdf.latGroup_m[0])+"_"+str(pdf.lonGroup_m[0])+".mdl")    
       return pdf

dummy_df=val_large_df.groupby("latGroup_m","lonGroup_m").applyInPandas(train_sgbt, 
           schema="fcast_error double")
dummy_df.show()

【问题讨论】:

【参考方案1】:

Spark 会评估 dummy_df lazy,因此只会为完成 Spark 操作所需的组调用 train_sgbt

这里的 Spark 操作是 show()。此操作仅打印前 20 行,因此仅对前 20 行中至少有一个元素的组调用 train_sgbt。 Spark可能评估更多组,但不能保证。

解决问题的一种方法是调用另一个操作,例如csv

【讨论】:

以上是关于PySpark groupby applyInPandas 将对象保存为文件问题的主要内容,如果未能解决你的问题,请参考以下文章

PySpark 计数 groupby 与 None 键

熊猫 groupby.apply 到 pyspark

从 Pandas groupBy 到 PySpark groupBy

检查一列是不是与pyspark中的groupby连续

Pyspark 根据数据框 groupBy 制作多个文件

PySpark 2.1.1 groupby + approx_count_distinct 计数为 0