Pyspark 和使用 UDF:如何将 Python 参数(sys.argv、argparse)传递给 Python Worker?

Posted

技术标签:

【中文标题】Pyspark 和使用 UDF:如何将 Python 参数(sys.argv、argparse)传递给 Python Worker?【英文标题】:Pyspark and using UDFs: How to Pass Python Arguments (sys.argv, argparse) to Python Worker? 【发布时间】:2021-03-08 17:28:09 【问题描述】:

我正在使用 Spark 3.0.1 和 Python 3.6.8 作为独立应用程序通过 spark-submit 运行脚本:

spark-submit [spark-confs]  pyspark-script.py --args1 val1 --args2 val2

脚本运行良好。使用 argparse 我可以获取 args1 和 args2,但是,当我在脚本中引入 UDF 时:

   my_udf = udf(lambda x: my_func(x))
   df = df.withColumn(cat, my_udf(cat))

我收到以下错误:

pyspark.sql.utils.PythonException:
  An exception was thrown from the Python worker. Please see the stack trace below.
Traceback (most recent call last):
  File "/opt/zoran/python-3.6/lib/python3.6/site-packages/pyspark-script.py", line 24, in main
    with open(args.args1) as f:
TypeError: expected str, bytes or os.PathLike object, not NoneType

我注意到我的论点不再被采纳。我通过硬编码 arg1 和 arg2 的值确认了这一点,并且它像以前一样工作。

问题是,如何将 python 参数传递给底层的 Python Worker?

此外,引入使 PySpark 行为如此的 UDF 会发生什么?

编辑: 在主脚本之外发生了一个 init_config() 调用。它没有包含在if __name__ == '__main__' 中,因此它会被自动调用并且它被设计为被调用一次。但是,(1) Python Worker 以某种方式再次调用它,(2) 因为它无法访问参数而出错。

【问题讨论】:

【参考方案1】:

documentation 谈到了应用程序参数

传递给主类的主要方法的参数(如果有)

这意味着参数仅在 Spark 驱动程序的上下文中可用。由于 udf 在远程 Python 进程中的远程执行器上执行,因此它们无法访问参数。

将参数传输到执行程序进程的一种方法是使用broadcasts。

【讨论】:

有趣的是,我还添加了一个编辑,以了解对后续 Python 工作者的调用是如何被触发的。它可能最终需要成为一个单独的问题。还会做更多的挖掘工作。

以上是关于Pyspark 和使用 UDF:如何将 Python 参数(sys.argv、argparse)传递给 Python Worker?的主要内容,如果未能解决你的问题,请参考以下文章

如何使用具有多个源列的 pandas_udf 将多个列添加到 pyspark DF?

如何将标量 Pyspark UDF 转换为 Pandas UDF?

如何将行传递到pyspark udf

如何在 Scala Spark 项目中使用 PySpark UDF?

如何在pyspark withcolumn中使用udf和class

如何在 pyspark.sql.functions.pandas_udf 和 pyspark.sql.functions.udf 之间进行选择?