Pyspark:使用 udf 多次加载模型

Posted

技术标签:

【中文标题】Pyspark:使用 udf 多次加载模型【英文标题】:Pyspark : Model Loading multiple times with udf 【发布时间】:2021-12-24 16:46:56 【问题描述】:

尝试将 udf 应用于基于某些条件进行模型预测的大型 csv 文件,但由于某种原因,该模型被多次加载。以下是流程的示例 sn-p:

# main.py loads predict.py

from predict import predict_udf

data = spark.read("csv_path")
data.show()

| Column1  | Column2 |
| -------- | ------- |
|   Class1 |         |
|   Class2 |         |

data.withColumn("Column2",predict_udf(col("Column1"))
# predict.py

model = load_model() # Initialising model object

def predict(class_name):
    if class_name == "Class1":
        # Do something

    elif class_name == "Class2":
        # Do something else

predict_udf = udf(predict, StringType())

理想情况下,我希望模型被加载一次,因为它被定义为全局变量,但模型被多次加载。

【问题讨论】:

【参考方案1】:

您可以尝试broadcasting该模型以避免不必要的重复读取。

model = load_model() # Initialising model object
model_bc = spark.sparkContext.broadcast(model)

然后您将能够通过model_bc.value 属性访问udf 中的初始model

这是一个很好的example,说明了完整的解决方案。

【讨论】:

以上是关于Pyspark:使用 udf 多次加载模型的主要内容,如果未能解决你的问题,请参考以下文章

Pandas UDF (PySpark) - 不正确的类型错误

在 PySpark 中重新加载 UDF

如何在 Scala Spark 项目中使用 PySpark UDF?

pyspark 中的重型有状态 UDF

如何将 txt 文件转换为 parquet 文件并将其加载到 hdfs table-pyspark

udf(用户定义函数)如何在 pyspark 中工作?