使用 pyspark 在某些类中应用函数时引发“PicklingError”错误

Posted

技术标签:

【中文标题】使用 pyspark 在某些类中应用函数时引发“PicklingError”错误【英文标题】:Raise ‘PicklingError’error when apply functions in certain class with pyspark 【发布时间】:2020-11-19 03:12:42 【问题描述】:

我正在尝试通过 applyInPandas 在 spark 中使用 pandas 函数,当我在某个类中对其进行转换时,它会引发如下错误:pickle.PicklingError:无法序列化对象:异常:看来您是试图从广播变量、动作或转换中引用 SparkContext。 SparkContext 只能在驱动程序上使用,不能在它在工作人员上运行的代码中使用。有关详细信息,请参阅 SPARK-5063。

我的脚本在函数类型编码中运行良好:

from scipy.stats import kendalltau
import numpy as np
import pandas as pd

def kendall(dat, a, b):
        kentmp = []
        ken = [np.nan, np.nan]
        if type(a) is list:
            if dat.shape[0] > 3:
                for item in a:
                    kentmp.append(kendalltau(dat[item], dat[b])[0])
                tmp = pd.Series(kentmp, index=a).dropna()
                if tmp.shape[0] > 0:
                    cato = tmp.idxmax()
                    if (tmp < 0).any():
                        cato = tmp.abs().idxmax()
                    ken = [cato, tmp[cato]]
            index = ['category', 'corr']
        else:
            if dat.shape[0] >= 10:
                ken = [kendalltau(dat[a], dat[b])[0], dat.shape[0]]
            index = ['corr', 'N']
        return pd.Series(ken, index=index)

def kendall_process(pdf):
        result = pdf.groupby(['step_id','unit_id']).apply(kendall,'process','label')
        result = pd.DataFrame(result).reset_index()
        #result.columns = ['step_id','unit_id','corr','N']
        pdf['label'] = pdf.label.astype('int')
        result_ = pdf.groupby(['step_id','unit_id'])['label'].mean().reset_index()
        result = pd.merge(result,result_,on=['step_id','unit_id'],how='left')
        result.columns = ['step_id','unit_id','corr','N','ratio']
        return result
result = datInOut.groupBy('step_id','unit_id').applyInPandas(kendall_process, schema='step_id string,\
                                                                                        unit_id string,\
                                                                                         corr float,\
                                                                                       N long,\
                                                                                       ratio float')
                                                                                    
result.show(5)
+--------------+--------+-----------+----+-----+
|       step_id| unit_id|       corr|   N|ratio|
+--------------+--------+-----------+----+-----+
|10303_A2AOI300|A2AOI300|       null|null|  0.0|
|17613_A2AOI500|A2AOI500|-0.13477948|  14|  0.5|
|1B304_A2MAC100|A2MAC100|       null|null|  1.0|
|1A106_A2SPR100|A2SPR100|       null|null|  1.0|
|19103_A2AOI800|A2AOI800|       null|null|  0.5|
+--------------+--------+-----------+----+-----+
only showing top 5 rows

但是当我将其转换为类类型编码时,它会引发 PicklingError:

@staticmethod
def kendall(dat,a,b):
        kentmp=[]
        ken=[np.nan,np.nan]
        if type(a) is list:
            if dat.shape[0]>3:
                for item in a:
                    kentmp.append(kendalltau(dat[item],dat[b])[0])
                tmp=pd.Series(kentmp,index=a).dropna()
                if tmp.shape[0]>0:
                    cato=tmp.idxmax()
                    if (tmp<0).any():
                        cato=tmp.abs().idxmax()
                    ken=[cato,tmp[cato]]
            index=['category','corr']
        else:
            if dat.shape[0]>=10:
                ken=[kendalltau(dat[a],dat[b])[0],dat.shape[0]]
            index=['corr','N']
        return pd.Series(ken,index=index)
@staticmethod
def kendall_delay(pdf):
        result = pdf.groupby(['step_id','equip_id']).apply(QTWorker.kendall,'delay','label')
        result = pd.DataFrame(result).reset_index()
        pdf['label'] = pdf.label.astype('int')
        result_ = pdf.groupby(['step_id', 'equip_id'])['label'].mean().reset_index()
        result = pd.merge(result, result_, on=['step_id', 'equip_id'], how='left')
        result.columns = ['step_id', 'equip_id', 'corr', 'N', 'ratio']
        return result
ret = datQ.groupBy(self.step, self.equip).applyInPandas(self.kendall_delay, schema='step_id string,equip_id string,corr float,N long,ratio float')

如您所见,我已经装饰了与 staticmethod 一起使用的函数,但它仍然无法正常工作。我真的很想怎么解决它!

【问题讨论】:

【参考方案1】:

即使我不知道为什么,但我已经通过将 kendall 函数放在 kendall_delay 下解决了这个问题。 我真的很想弄清楚它的原因!

@staticmethod
def kendall_process(pdf):
        def kendall(dat, a, b):
            kentmp = []
            ken = [np.nan, np.nan]
            if type(a) is list:
                if dat.shape[0] > 3:
                    for item in a:
                        kentmp.append(kendalltau(dat[item], dat[b])[0])
                    tmp = pd.Series(kentmp, index=a).dropna()
                    if tmp.shape[0] > 0:
                        cato = tmp.idxmax()
                        if (tmp < 0).any():
                            cato = tmp.abs().idxmax()
                        ken = [cato, tmp[cato]]
                index = ['category', 'corr']
            else:
                if dat.shape[0] >= 10:
                    ken = [kendalltau(dat[a], dat[b])[0], dat.shape[0]]
                index = ['corr', 'N']
            return pd.Series(ken, index=index)
        result = pdf.groupby(['step_id','equip_id']).apply(kendall,'process','label')
        result = pd.DataFrame(result).reset_index()
        pdf['label'] = pdf.label.astype('int')
        result_ = pdf.groupby(['step_id', 'equip_id'])['label'].mean().reset_index()
        result = pd.merge(result, result_, on=['step_id', 'equip_id'], how='left')
        result.columns = ['step_id', 'equip_id', 'corr', 'N', 'ratio']
        return result

【讨论】:

以上是关于使用 pyspark 在某些类中应用函数时引发“PicklingError”错误的主要内容,如果未能解决你的问题,请参考以下文章

spark pyspark mllib 模型 - 当使用 map 生成预测 rdd 时,它会在 collect() 上引发异常

在 Pyspark 中读取 CSV 文件引发错误 FileNotFound 错误

在 PySpark 数据框中的组中的列上应用函数

Pyspark - 如何将函数仅应用于 DataFrame 中的列子集?

如何按行将函数应用于 PySpark 数据帧的一组列?

Pyspark - 调用 pandas_udf 时出错,结果返回 Series.interpolate()