无法在 Databricks 中使用 Pandas UDF

Posted

技术标签:

【中文标题】无法在 Databricks 中使用 Pandas UDF【英文标题】:Unable to use Pandas UDF in Databricks 【发布时间】:2021-04-16 17:52:32 【问题描述】:

我必须运行一个脚本,该脚本将一些参数作为输入并返回一些结果作为输出,所以首先我在本地机器上开发了它 - 工作正常 - 我现在的目标是在 Databricks 中运行它以并行化它.

当我尝试并行化它时,问题就出现了。我正在从已安装的 Datalake 中获取数据(问题不存在,因为我能够在读取 DataFrame 后打印它),将其转换为 Spark DataFrame 并将每一行传递给按材料分组的主函数:

import pandas as pd
import os
import numpy as np
import scipy.stats as stats

from pyspark.sql import SparkSession
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import StructType,StructField,IntegerType,FloatType

# Pandas udf
schema = StructType([StructField('Material', IntegerType(), True),
                    StructField('Alpha', IntegerType(), True),
                    StructField('Beta', IntegerType(), True),
                    StructField('Sales', IntegerType(), True),
                    StructField('SL', FloatType(), True)])

@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def main(data):
    material = data['Material'].iloc[0]
    print(material)      #<-------- THIS IS NOT PRINTING
    print('Hello world')   #<------ NEITHER IS THIS

    start = data['start '].iloc[0]
    end = data['end '].iloc[0]
    mu_lt = data['mu_lt'].iloc[0]
    sigma_lt = data['sigma_lt'].iloc[0]
    
    df = pd.DataFrame(columns=('Material', 'Alpha', 'Beta', 'Sales', 'SL'))
    
    for beta in range(1, 2):
        for alpha in range(3, 5):
            # Do stuff
    
    return df


if __name__ == '__main__':
  spark = SparkSession.builder.getOrCreate()
  params = pd.read_csv('/dbfs/mnt/input/params_input.csv')
  params_spark = spark.createDataFrame(params) 

  params_spark.groupby('Material').apply(main).show()

我不确定我是否将 DF 正确传递给主函数,甚至声明它是正确的,但主函数中定义的打印和 DF 似乎都没有运行。代码没有抛出错误,但也没有返回任何输出。

【问题讨论】:

print 语句在用于 udf 函数时不起作用。 我如何知道数据(例如材料)被正确读取? 您可以使用显示或显示功能来检查数据框本身。但这就是它的延伸。 Spark 使用惰性求值,但 UDF 只是绕过它,所以我不确定你是否可以用其他方式调试它。 但是如果我在 main 函数中写 'data.show()' 是正常的吗? 你只需要在 executors 上查看这些数据... 【参考方案1】:

试试this:

@pandas_udf('y int, ds int, store_id string, product_id string, log string', PandasUDFType.GROUPED_MAP)
def train_predict(pdf):
    return pd.DataFrame([3, 5, 'store123', 'product123', 'My log message'], columns=['y', 'ds','store_id','product_id', 'log'])

【讨论】:

日志不是即时的,而是通过这种方式找到了错误!

以上是关于无法在 Databricks 中使用 Pandas UDF的主要内容,如果未能解决你的问题,请参考以下文章

DataBricks中pandas.DataFrame.tail的等价物是啥[关闭]

如何在 Databricks pyspark 中导入 Excel 文件

无法在 Databricks 中使用 Configparser 读取配置文件

无法在 Databricks 中使用 SecretKey

ModuleNotFoundError:databricks 中没有名为“xlsxwriter”的模块

无法在 Databricks 中使用 pyspark 读取 json 文件