使用 pyspark 在某些类中应用函数时引发“PicklingError”错误
Posted
技术标签:
【中文标题】使用 pyspark 在某些类中应用函数时引发“PicklingError”错误【英文标题】:Raise ‘PicklingError’error when apply functions in certain class with pyspark 【发布时间】:2020-11-19 03:12:42 【问题描述】:我正在尝试通过 applyInPandas 在 spark 中使用 pandas 函数,当我在某个类中对其进行转换时,它会引发如下错误:pickle.PicklingError:无法序列化对象:异常:看来您是试图从广播变量、动作或转换中引用 SparkContext。 SparkContext 只能在驱动程序上使用,不能在它在工作人员上运行的代码中使用。有关详细信息,请参阅 SPARK-5063。
我的脚本在函数类型编码中运行良好:
from scipy.stats import kendalltau
import numpy as np
import pandas as pd
def kendall(dat, a, b):
kentmp = []
ken = [np.nan, np.nan]
if type(a) is list:
if dat.shape[0] > 3:
for item in a:
kentmp.append(kendalltau(dat[item], dat[b])[0])
tmp = pd.Series(kentmp, index=a).dropna()
if tmp.shape[0] > 0:
cato = tmp.idxmax()
if (tmp < 0).any():
cato = tmp.abs().idxmax()
ken = [cato, tmp[cato]]
index = ['category', 'corr']
else:
if dat.shape[0] >= 10:
ken = [kendalltau(dat[a], dat[b])[0], dat.shape[0]]
index = ['corr', 'N']
return pd.Series(ken, index=index)
def kendall_process(pdf):
result = pdf.groupby(['step_id','unit_id']).apply(kendall,'process','label')
result = pd.DataFrame(result).reset_index()
#result.columns = ['step_id','unit_id','corr','N']
pdf['label'] = pdf.label.astype('int')
result_ = pdf.groupby(['step_id','unit_id'])['label'].mean().reset_index()
result = pd.merge(result,result_,on=['step_id','unit_id'],how='left')
result.columns = ['step_id','unit_id','corr','N','ratio']
return result
result = datInOut.groupBy('step_id','unit_id').applyInPandas(kendall_process, schema='step_id string,\
unit_id string,\
corr float,\
N long,\
ratio float')
result.show(5)
+--------------+--------+-----------+----+-----+
| step_id| unit_id| corr| N|ratio|
+--------------+--------+-----------+----+-----+
|10303_A2AOI300|A2AOI300| null|null| 0.0|
|17613_A2AOI500|A2AOI500|-0.13477948| 14| 0.5|
|1B304_A2MAC100|A2MAC100| null|null| 1.0|
|1A106_A2SPR100|A2SPR100| null|null| 1.0|
|19103_A2AOI800|A2AOI800| null|null| 0.5|
+--------------+--------+-----------+----+-----+
only showing top 5 rows
但是当我将其转换为类类型编码时,它会引发 PicklingError:
@staticmethod
def kendall(dat,a,b):
kentmp=[]
ken=[np.nan,np.nan]
if type(a) is list:
if dat.shape[0]>3:
for item in a:
kentmp.append(kendalltau(dat[item],dat[b])[0])
tmp=pd.Series(kentmp,index=a).dropna()
if tmp.shape[0]>0:
cato=tmp.idxmax()
if (tmp<0).any():
cato=tmp.abs().idxmax()
ken=[cato,tmp[cato]]
index=['category','corr']
else:
if dat.shape[0]>=10:
ken=[kendalltau(dat[a],dat[b])[0],dat.shape[0]]
index=['corr','N']
return pd.Series(ken,index=index)
@staticmethod
def kendall_delay(pdf):
result = pdf.groupby(['step_id','equip_id']).apply(QTWorker.kendall,'delay','label')
result = pd.DataFrame(result).reset_index()
pdf['label'] = pdf.label.astype('int')
result_ = pdf.groupby(['step_id', 'equip_id'])['label'].mean().reset_index()
result = pd.merge(result, result_, on=['step_id', 'equip_id'], how='left')
result.columns = ['step_id', 'equip_id', 'corr', 'N', 'ratio']
return result
ret = datQ.groupBy(self.step, self.equip).applyInPandas(self.kendall_delay, schema='step_id string,equip_id string,corr float,N long,ratio float')
如您所见,我已经装饰了与 staticmethod 一起使用的函数,但它仍然无法正常工作。我真的很想怎么解决它!
【问题讨论】:
【参考方案1】:即使我不知道为什么,但我已经通过将 kendall 函数放在 kendall_delay 下解决了这个问题。 我真的很想弄清楚它的原因!
@staticmethod
def kendall_process(pdf):
def kendall(dat, a, b):
kentmp = []
ken = [np.nan, np.nan]
if type(a) is list:
if dat.shape[0] > 3:
for item in a:
kentmp.append(kendalltau(dat[item], dat[b])[0])
tmp = pd.Series(kentmp, index=a).dropna()
if tmp.shape[0] > 0:
cato = tmp.idxmax()
if (tmp < 0).any():
cato = tmp.abs().idxmax()
ken = [cato, tmp[cato]]
index = ['category', 'corr']
else:
if dat.shape[0] >= 10:
ken = [kendalltau(dat[a], dat[b])[0], dat.shape[0]]
index = ['corr', 'N']
return pd.Series(ken, index=index)
result = pdf.groupby(['step_id','equip_id']).apply(kendall,'process','label')
result = pd.DataFrame(result).reset_index()
pdf['label'] = pdf.label.astype('int')
result_ = pdf.groupby(['step_id', 'equip_id'])['label'].mean().reset_index()
result = pd.merge(result, result_, on=['step_id', 'equip_id'], how='left')
result.columns = ['step_id', 'equip_id', 'corr', 'N', 'ratio']
return result
【讨论】:
以上是关于使用 pyspark 在某些类中应用函数时引发“PicklingError”错误的主要内容,如果未能解决你的问题,请参考以下文章
spark pyspark mllib 模型 - 当使用 map 生成预测 rdd 时,它会在 collect() 上引发异常
在 Pyspark 中读取 CSV 文件引发错误 FileNotFound 错误