如何在 Azure Databricks 中保存自定义 python 类对象?

Posted

技术标签:

【中文标题】如何在 Azure Databricks 中保存自定义 python 类对象?【英文标题】:How do you save a custom python class object in Azure Databricks? 【发布时间】:2021-12-17 10:34:49 【问题描述】:

我已经编写了一个 python 类,用于在分类建模之前对数据进行预处理。我用 sklearn 风格的拟合/转换功能构建了它,其中存在问题。在适合类对象后,我需要能够保存它并将其加载到生产环境中。

我在 Azure Databricks 中工作,我尝试使用 pickle 或 joblib 将对象保存到已安装的 az gen2 数据湖中,但我遇到了“FileNotFoundError”。如果可能/必要,我也愿意接受 MLflow 或其他替代方案。

有谁知道我如何在 Azure Databricks 中保存这种对象?

import pandas as pd

df_train = spark.read.parquet('BinaryClassigicationData.parquet')
df_train = df_train.toPandas()

woe_encoder = WeightOfEvidenceEncoder()

fit_woe = woe_encoder.fit(data=df_train,
                          y_var='y'
                         )

path_root = '/dbfs:/mnt/deltalake/'
path_models = 'MODELS/'

with open(path_root + path_models + 'WoE_Encoder.joblib', 'wb') as file:
  joblib.dump(imp_set, file)
class WeightOfEvidenceEncoder():
  
'''
  WeightOfEvidenceEncoder calculates the 'weight of evidence' of categorical variable classes and recodes them with those values, converting them to numeric variables before classification modelling. The encoder can also calculate the 'information value' of the categorical variable overall and the classes individually.
  For more info see https://towardsdatascience.com/model-or-do-you-mean-weight-of-evidence-woe-and-information-value-iv-331499f6fc2
'''
  
  def __init__(self):
    self.taught = None
    self.x_var = 'auto'
    
  def get_weight_of_evidence(self, data, dependant_var, independant_var, category):
    
    multi_index = data.groupby([dependant_var, independant_var]).count().iloc[:,0]

    tot_neg = multi_index.xs(False, level='Instructed').sum()
    tot_pos = multi_index.xs(True, level='Instructed').sum()

    cat_neg = multi_index.xs((False, category))
    
    try:
      cat_pos = multi_index.xs((True, category))
    except:
      cat_pos = 0

    perc_neg = cat_neg/tot_neg
    perc_pos = cat_pos/tot_pos

    try:
      weight_of_evidence = np.log(perc_neg/perc_pos)
    except:
      weight_of_evidence = 0

    return weight_of_evidence, perc_neg, perc_pos
  
    
  def get_information_value(self, data, dependant_var, independant_var):
    
    '''
    Information Value | Predictive Power
    ------------------|-----------------
    <0.02             | Terrible
    0.02 - 0.1        | Weak
    0.1 - 0.3         | Medium
    0.3 - 0.5         | Strong
    >0.5              | Fishy
    '''
    
  
    df = data
    
    if df[independant_var].dtype == 'O':

      classes = list(df[independant_var].astype('category').cat.categories)

    elif df[independant_var].dtype == 'bool':

      classes = [True, False]

    else:

      raise ValueError('get_information_value: independant_var must be either string or boolean')
      
    df2 = pd.DataFrame(data=None, 
                        columns=['Class', 'WeightOfEvidence', 'PercNegativeOverlap', 'PercPositiveOverlap']
                       ) 

    for independant_class in classes:

      weight_of_evidence, perc_neg, perc_pos = self.get_weight_of_evidence(data=df,
                                                                           dependant_var=dependant_var,
                                                                           independant_var=independant_var,
                                                                           category=independant_class
                                                                           )

      df2.loc[len(df2.index)] = [independant_class, weight_of_evidence, perc_neg, perc_pos]

    df2 = df2.assign(InformationValue = (df2.PercNegativeOverlap - df2.PercPositiveOverlap)*df2.WeightOfEvidence)
    df2 = (df2.assign(TotalInformationValue = df2.InformationValue.sum())
              .sort_values('InformationValue', ascending=False)
          )            

    return df2
  
  
  def fit(self, data, y_var, x_var='auto', include_bools=False, exclude=[None]):
    
    df = data
    
    if x_var == 'auto':
      if include_bools == True:

        independant_vars = [col for col in df.columns 
                            if ((df[col].dtype == 'O') or (df[col].dtype == 'bool')) 
                            and ((col != y_var) and ~(col in exclude))
                            ]
      else:
        
        independant_vars = [col for col in df.columns if (df[col].dtype == 'O') 
                            and ((col != y_var) and ~(col in exclude))
                           ]
      
    else:
      
      independant_vars = [x_var]
      
    independant_var_information = []
    
    for var in independant_vars:
      
      var_information = self.get_information_value(data=df,
                                                   dependant_var=y_var,
                                                   independant_var=var
                                                   )

      independant_var_information.append(var_information)
      
    self.taught = dict(zip(independant_vars, independant_var_information))
    
    return self
  
  def transform(self, data):
    
    for var in self.taught.keys():
      
      var_data = self.taught[var].replace([np.inf, -np.inf], 0)
      var_data = var_data.set_index('Class')
      
      data[var] = data[var].map(var_data['WeightOfEvidence'])
      
    return data
  
  def fit_transform(self, data, y_var, x_var='auto', include_bools=False, exclude=[None]):
    
    df = data
    
    out = (self.fit(data=df, 
                    y_var=y_var, 
                    x_var=x_var, 
                    include_bools=include_bools, 
                    exclude=exclude
                    )
               .transform(data=df)
          )
    
    return out

【问题讨论】:

【参考方案1】:

主要问题是您正尝试将 Python 的本地 API(如 open)与 DBFS URL 一起使用 - Python 不知道此文件系统。解决方案是使用/dbfs 上可用的本地 DBFS 挂载(仅当您不在 Databricks 社区版上时才有效)。所以把代码改成

path_root = '/dbfs/mnt/deltalake/'

此外,您还可以查看 MLflow 中的自定义模型 - 它们专门设计用于在进行推理时将自定义代码应用于数据。您可以在this answer找到更多详细信息。

附:在 Databricks 社区,您可以直接写入本地磁盘,然后使用 dbutils.fs.cp 复制到 DBFS(有关详细信息,请参阅 this answer)

【讨论】:

以上是关于如何在 Azure Databricks 中保存自定义 python 类对象?的主要内容,如果未能解决你的问题,请参考以下文章

无法从 azure databricks 在 azure data Lake 中保存文件

在 Spark Azure Databricks 中创建自定义进度条指示器

从 Azure Databricks 将数据写入 Azure Blob 存储

以编程方式导入/导出 (Azure) Databricks Notebooks

如何在 Azure Databricks PySpark 中执行存储过程?

如何强制 Azure 数据工厂数据流使用 Databricks