Pyspark 和使用 UDF:如何将 Python 参数(sys.argv、argparse)传递给 Python Worker?
Posted
技术标签:
【中文标题】Pyspark 和使用 UDF:如何将 Python 参数(sys.argv、argparse)传递给 Python Worker?【英文标题】:Pyspark and using UDFs: How to Pass Python Arguments (sys.argv, argparse) to Python Worker? 【发布时间】:2021-03-08 17:28:09 【问题描述】:我正在使用 Spark 3.0.1 和 Python 3.6.8 作为独立应用程序通过 spark-submit 运行脚本:
spark-submit [spark-confs] pyspark-script.py --args1 val1 --args2 val2
脚本运行良好。使用 argparse 我可以获取 args1 和 args2,但是,当我在脚本中引入 UDF 时:
my_udf = udf(lambda x: my_func(x))
df = df.withColumn(cat, my_udf(cat))
我收到以下错误:
pyspark.sql.utils.PythonException:
An exception was thrown from the Python worker. Please see the stack trace below.
Traceback (most recent call last):
File "/opt/zoran/python-3.6/lib/python3.6/site-packages/pyspark-script.py", line 24, in main
with open(args.args1) as f:
TypeError: expected str, bytes or os.PathLike object, not NoneType
我注意到我的论点不再被采纳。我通过硬编码 arg1 和 arg2 的值确认了这一点,并且它像以前一样工作。
问题是,如何将 python 参数传递给底层的 Python Worker?
此外,引入使 PySpark 行为如此的 UDF 会发生什么?
编辑:
在主脚本之外发生了一个 init_config() 调用。它没有包含在if __name__ == '__main__'
中,因此它会被自动调用并且它被设计为被调用一次。但是,(1) Python Worker 以某种方式再次调用它,(2) 因为它无法访问参数而出错。
【问题讨论】:
【参考方案1】:documentation 谈到了应用程序参数
传递给主类的主要方法的参数(如果有)
这意味着参数仅在 Spark 驱动程序的上下文中可用。由于 udf 在远程 Python 进程中的远程执行器上执行,因此它们无法访问参数。
将参数传输到执行程序进程的一种方法是使用broadcasts。
【讨论】:
有趣的是,我还添加了一个编辑,以了解对后续 Python 工作者的调用是如何被触发的。它可能最终需要成为一个单独的问题。还会做更多的挖掘工作。以上是关于Pyspark 和使用 UDF:如何将 Python 参数(sys.argv、argparse)传递给 Python Worker?的主要内容,如果未能解决你的问题,请参考以下文章
如何使用具有多个源列的 pandas_udf 将多个列添加到 pyspark DF?
如何将标量 Pyspark UDF 转换为 Pandas UDF?
如何在 Scala Spark 项目中使用 PySpark UDF?
如何在pyspark withcolumn中使用udf和class
如何在 pyspark.sql.functions.pandas_udf 和 pyspark.sql.functions.udf 之间进行选择?