无法在 Databricks 中使用 Pandas UDF
Posted
技术标签:
【中文标题】无法在 Databricks 中使用 Pandas UDF【英文标题】:Unable to use Pandas UDF in Databricks 【发布时间】:2021-04-16 17:52:32 【问题描述】:我必须运行一个脚本,该脚本将一些参数作为输入并返回一些结果作为输出,所以首先我在本地机器上开发了它 - 工作正常 - 我现在的目标是在 Databricks 中运行它以并行化它.
当我尝试并行化它时,问题就出现了。我正在从已安装的 Datalake 中获取数据(问题不存在,因为我能够在读取 DataFrame 后打印它),将其转换为 Spark DataFrame 并将每一行传递给按材料分组的主函数:
import pandas as pd
import os
import numpy as np
import scipy.stats as stats
from pyspark.sql import SparkSession
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import StructType,StructField,IntegerType,FloatType
# Pandas udf
schema = StructType([StructField('Material', IntegerType(), True),
StructField('Alpha', IntegerType(), True),
StructField('Beta', IntegerType(), True),
StructField('Sales', IntegerType(), True),
StructField('SL', FloatType(), True)])
@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def main(data):
material = data['Material'].iloc[0]
print(material) #<-------- THIS IS NOT PRINTING
print('Hello world') #<------ NEITHER IS THIS
start = data['start '].iloc[0]
end = data['end '].iloc[0]
mu_lt = data['mu_lt'].iloc[0]
sigma_lt = data['sigma_lt'].iloc[0]
df = pd.DataFrame(columns=('Material', 'Alpha', 'Beta', 'Sales', 'SL'))
for beta in range(1, 2):
for alpha in range(3, 5):
# Do stuff
return df
if __name__ == '__main__':
spark = SparkSession.builder.getOrCreate()
params = pd.read_csv('/dbfs/mnt/input/params_input.csv')
params_spark = spark.createDataFrame(params)
params_spark.groupby('Material').apply(main).show()
我不确定我是否将 DF 正确传递给主函数,甚至声明它是正确的,但主函数中定义的打印和 DF 似乎都没有运行。代码没有抛出错误,但也没有返回任何输出。
【问题讨论】:
print 语句在用于 udf 函数时不起作用。 我如何知道数据(例如材料)被正确读取? 您可以使用显示或显示功能来检查数据框本身。但这就是它的延伸。 Spark 使用惰性求值,但 UDF 只是绕过它,所以我不确定你是否可以用其他方式调试它。 但是如果我在 main 函数中写 'data.show()' 是正常的吗? 你只需要在 executors 上查看这些数据... 【参考方案1】:试试this:
@pandas_udf('y int, ds int, store_id string, product_id string, log string', PandasUDFType.GROUPED_MAP)
def train_predict(pdf):
return pd.DataFrame([3, 5, 'store123', 'product123', 'My log message'], columns=['y', 'ds','store_id','product_id', 'log'])
【讨论】:
日志不是即时的,而是通过这种方式找到了错误!以上是关于无法在 Databricks 中使用 Pandas UDF的主要内容,如果未能解决你的问题,请参考以下文章
DataBricks中pandas.DataFrame.tail的等价物是啥[关闭]
如何在 Databricks pyspark 中导入 Excel 文件
无法在 Databricks 中使用 Configparser 读取配置文件