使用 spark.sparkContext.addPyFile 导入 Pandas UDF
Posted
技术标签:
【中文标题】使用 spark.sparkContext.addPyFile 导入 Pandas UDF【英文标题】:Import Pandas UDF With spark.sparkContext.addPyFile 【发布时间】:2021-08-30 21:50:30 【问题描述】:我有以下玩具示例。我正在使用引导脚本将 pandas 和 pyarrow 安装到我的工作节点上。当我在 jupyter notebook 中运行以下代码时,它运行时没有错误。
# Declare the function and create the UDF
def multiply_func(a: pd.Series, b: pd.Series) -> pd.Series:
return a * b
@f.pandas_udf("float")
def udf_multiply(a: pd.Series, b: pd.Series) -> pd.Series:
df = pd.DataFrame('a': a, 'b': b)
df['product'] = df.apply(lambda x : multiply_func(x['a'], x['b']), axis = 1)
return df['product']
x = pd.Series([1, 2, 3])
#print(multiply_func(x, x))
# 0 1
# 1 4
# 2 9
# dtype: int64
# Create a Spark DataFrame, 'spark' is an existing SparkSession
df = spark.createDataFrame(pd.DataFrame(x, columns=["x"]))
# Execute function as a Spark vectorized UDF
df.select(udf_multiply(f.col("x"), f.col("x"))).show()
但是,我有很多 pandas_udf 想要导入到我的工作区,我不想将它们中的每一个都复制粘贴到我的 Jupyter Notebook 顶部。我想要的目录结构如下所示:
eda.ipynb
helpful_pandas_udfs/toy_example.py
我查看了其他 SO 帖子并确定我应该能够像这样添加 Python 文件:
spark.sparkContext.addPyFile("helpful_pandas_udfs/toy_example.py")
from toy_example import udf_multiply
但是,当我尝试运行此代码时,出现以下错误:
AttributeError: 'NoneType' object has no attribute '_jvm'
请帮忙!我完全被这件事难住了。
【问题讨论】:
要添加的一点是,我相信可能会发生此错误,因为我没有在要从中导入的文件中定义 Spark 上下文。但是,我不确定如何解决这个问题。 我认为这个问题类似于这里的 SO 帖子:***.com/questions/55688664/…。但是,建议的解决方案对我不起作用。 【参考方案1】:我可以通过在创建 spark 会话后将我的 UDF 复制为文本来解决这个问题。这不是我满意的解决方案,但它确实有效。
【讨论】:
以上是关于使用 spark.sparkContext.addPyFile 导入 Pandas UDF的主要内容,如果未能解决你的问题,请参考以下文章
在使用加载数据流步骤的猪中,使用(使用 PigStorage)和不使用它有啥区别?
Qt静态编译时使用OpenSSL有三种方式(不使用,动态使用,静态使用,默认是动态使用)