PySpark groupby applyInPandas 将对象保存为文件问题
Posted
技术标签:
【中文标题】PySpark groupby applyInPandas 将对象保存为文件问题【英文标题】:PySpark groupby applyInPandas save objects as files problem 【发布时间】:2021-05-08 16:45:29 【问题描述】:我在 Jupyter Notebook 上以本地模式在 Windows 计算机上运行 PySpark 3.1。我在 Spark DataFrame 上调用“applyInPandas”。
下面的函数对输入的 Pandas DataFrame 应用一些数据转换,并训练一个 SGBT 模型。然后它将训练好的模型序列化为二进制并作为对象保存到 S3 存储桶中。最后它返回DataFrame。我从最后一行按两列分组的 Spark DataFrame 调用此函数。我没有收到错误,返回的 DataFrame 与输入的长度相同。返回每个组的数据。
问题是保存的模型对象。当每个组都应该有模型时,只有 2 组的对象保存在 S3 中。没有丢失/错误的数据点会导致模型训练失败。 (无论如何我都会收到错误或警告。)到目前为止我所尝试的:
替换 S3 并保存到本地文件系统:结果相同。 将“pickle”替换为“joblib”和“BytesIO”:结果相同。 调用函数前重新分区:现在我为不同的组保存了更多对象,但不是全部。 [我通过在最后一行调用“val_large_df.coalesce(1).groupby('la...”来做到这一点。]所以我怀疑这是关于并行性和分布的,但我无法弄清楚。已经谢谢你了。
def train_sgbt(pdf):
##Some data transformations here##
#Train the model
sgbt_mdl=GradientBoostingRegressor(--Params.--).fit(--Params.--)
sgbt_mdl_b=pickle.dumps(sgbt_mdl) #Serialize
#Initiate s3_client
s3_client = boto3.client(--Params.--)
#Put file in S3
s3_client.put_object(Body=sgbt_mdl_b, Bucket='my-bucket-name',
Key="models/BT_"+str(pdf.latGroup_m[0])+"_"+str(pdf.lonGroup_m[0])+".mdl")
return pdf
dummy_df=val_large_df.groupby("latGroup_m","lonGroup_m").applyInPandas(train_sgbt,
schema="fcast_error double")
dummy_df.show()
【问题讨论】:
【参考方案1】:Spark 会评估 dummy_df
lazy,因此只会为完成 Spark 操作所需的组调用 train_sgbt
。
这里的 Spark 操作是 show()
。此操作仅打印前 20 行,因此仅对前 20 行中至少有一个元素的组调用 train_sgbt
。 Spark可能评估更多组,但不能保证。
解决问题的一种方法是调用另一个操作,例如csv
。
【讨论】:
以上是关于PySpark groupby applyInPandas 将对象保存为文件问题的主要内容,如果未能解决你的问题,请参考以下文章