使用 spark.sparkContext.addPyFile 导入 Pandas UDF

Posted

技术标签:

【中文标题】使用 spark.sparkContext.addPyFile 导入 Pandas UDF【英文标题】:Import Pandas UDF With spark.sparkContext.addPyFile 【发布时间】:2021-08-30 21:50:30 【问题描述】:

我有以下玩具示例。我正在使用引导脚本将 pandas 和 pyarrow 安装到我的工作节点上。当我在 jupyter notebook 中运行以下代码时,它运行时没有错误。

# Declare the function and create the UDF
def multiply_func(a: pd.Series, b: pd.Series) -> pd.Series:
    return a * b

@f.pandas_udf("float")
def udf_multiply(a: pd.Series, b: pd.Series) -> pd.Series:
    df = pd.DataFrame('a': a, 'b': b)
    df['product'] = df.apply(lambda x : multiply_func(x['a'], x['b']), axis = 1)
    return df['product']

x = pd.Series([1, 2, 3])
#print(multiply_func(x, x))
# 0    1
# 1    4
# 2    9
# dtype: int64

# Create a Spark DataFrame, 'spark' is an existing SparkSession
df = spark.createDataFrame(pd.DataFrame(x, columns=["x"]))

# Execute function as a Spark vectorized UDF
df.select(udf_multiply(f.col("x"), f.col("x"))).show()

但是,我有很多 pandas_udf 想要导入到我的工作区,我不想将它们中的每一个都复制粘贴到我的 Jupyter Notebook 顶部。我想要的目录结构如下所示:

eda.ipynb
helpful_pandas_udfs/toy_example.py

我查看了其他 SO 帖子并确定我应该能够像这样添加 Python 文件:

spark.sparkContext.addPyFile("helpful_pandas_udfs/toy_example.py")
from toy_example import udf_multiply

但是,当我尝试运行此代码时,出现以下错误:

AttributeError: 'NoneType' object has no attribute '_jvm'

请帮忙!我完全被这件事难住了。

【问题讨论】:

要添加的一点是,我相信可能会发生此错误,因为我没有在要从中导入的文件中定义 Spark 上下文。但是,我不确定如何解决这个问题。 我认为这个问题类似于这里的 SO 帖子:***.com/questions/55688664/…。但是,建议的解决方案对我不起作用。 【参考方案1】:

我可以通过在创建 spark 会话后将我的 UDF 复制为文本来解决这个问题。这不是我满意的解决方案,但它确实有效。

【讨论】:

以上是关于使用 spark.sparkContext.addPyFile 导入 Pandas UDF的主要内容,如果未能解决你的问题,请参考以下文章

在使用加载数据流步骤的猪中,使用(使用 PigStorage)和不使用它有啥区别?

今目标使用教程 今目标任务使用篇

Qt静态编译时使用OpenSSL有三种方式(不使用,动态使用,静态使用,默认是动态使用)

MySQL db 在按日期排序时使用“使用位置;使用临时;使用文件排序”

使用“使用严格”作为“使用强”的备份

Kettle java脚本组件的使用说明(简单使用升级使用)