从Pyspark UDF调用另一个自定义Python函数
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了从Pyspark UDF调用另一个自定义Python函数相关的知识,希望对你有一定的参考价值。
假设你有一个文件,我们称之为udfs.py
并在其中:
def nested_f(x):
return x + 1
def main_f(x):
return nested_f(x) + 1
然后,您想从main_f
函数中创建一个UDF并在数据帧上运行它:
import pyspark.sql.functions as fn
import pandas as pd
pdf = pd.DataFrame([[1], [2], [3]], columns=['x'])
df = spark.createDataFrame(pdf)
_udf = fn.udf(main_f, 'int')
df.withColumn('x1', _udf(df['x'])).show()
如果我们在与定义两个函数的同一文件(udfs.py
)中执行此操作,则此工作正常。但是,尝试从不同的文件(比如main.py
)执行此操作会产生错误ModuleNotFoundError: No module named ...
:
...
import udfs
_udf = fn.udf(udfs.main_f, 'int')
df.withColumn('x1', _udf(df['x'])).show()
我注意到,如果我实际上将nested_f
嵌入main_f
中,就像这样:
def main_f(x):
def nested_f(x):
return x + 1
return nested_f(x) + 1
一切顺利。但是,我的目标是在多个函数中很好地分离逻辑,我也可以单独测试。
我认为这可以通过使用udfs.py
将spark.sparkContext.addPyFile('...udfs.py')
文件(或整个压缩文件夹)提交给执行程序来解决。然而:
- 我觉得这有点啰嗦(特别是如果你需要拉链文件夹......)
- 这并不总是容易/可能(例如
udfs.py
可能正在使用许多其他模块,然后也需要提交,导致一些连锁反应...) - 还有一些其他的不便与
addPyFile
(例如autoreload can stop working等)
所以问题是:有没有办法同时做所有这些:
- 让UDF的逻辑很好地分解为几个Python函数
- 从与定义逻辑的位置不同的文件中使用UDF
- 不需要使用
addPyFile
提交任何依赖项
奖励积分,以澄清这是如何工作/为什么这不起作用!
对于小的(一个或两个本地文件)依赖项,您可以使用--py-files并枚举它们,使用更大或更多的依赖项 - 最好将它打包到zip或egg文件中。
文件udfs.py
:
def my_function(*args, **kwargs):
# code
文件main.py
:
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from udfs import my_function
sc = SparkContext()
spark = SparkSession(sc)
my_udf = udf(my_function)
df = spark.createDataFrame([(1, "a"), (2, "b")])
df.withColumn("my_f", my_udf("..."))
运行:
pyspark --py-files /path/to/udfs.py
# or
spark-submit --py-files /path/to/udfs.py main.py
如果您已经编写了自己的Python模块甚至是第三方模块(不需要C编译),我个人需要使用geoip2
,最好创建一个zip或egg文件。
# pip with -t install all modules and dependencies in directory `src`
pip install geoip2 -t ./src
# Or from local directory
pip install ./my_module -t ./src
# Best is
pip install -r requirements.txt -t ./src
# If you need add some additionals files
cp ./some_scripts/* ./src/
# And pack it
cd ./src
zip -r ../libs.zip .
cd ..
pyspark --py-files libs.zip
spark-submit --py-files libs.zip
在使用pyspark --master yarn
的pyspark shell中使用--py-files
(可能还有其他非本地主选项)时要小心:
>>> import sys
>>> sys.path.insert(0, '/path/to/libs.zip') # You can use relative path: .insert(0, 'libs.zip')
>>> import MyModule # libs.zip/MyModule
编辑 - 关于如何在没有addPyFile ()
和--py-files
的情况下获取执行程序函数的问题的答案:
有必要在一个给定的文件中包含各个执行程序的函数。并可通过PATH环境访问。因此,我可能会编写一个Python模块,然后我将其安装在执行程序中并在环境中可用。
以上是关于从Pyspark UDF调用另一个自定义Python函数的主要内容,如果未能解决你的问题,请参考以下文章
PySpark 自定义 UDF ModuleNotFoundError: No module named