从Pyspark UDF调用另一个自定义Python函数

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了从Pyspark UDF调用另一个自定义Python函数相关的知识,希望对你有一定的参考价值。

假设你有一个文件,我们称之为udfs.py并在其中:

def nested_f(x):
    return x + 1

def main_f(x):
    return nested_f(x) + 1

然后,您想从main_f函数中创建一个UDF并在数据帧上运行它:

import pyspark.sql.functions as fn
import pandas as pd

pdf = pd.DataFrame([[1], [2], [3]], columns=['x'])
df = spark.createDataFrame(pdf)

_udf = fn.udf(main_f, 'int')
df.withColumn('x1', _udf(df['x'])).show()

如果我们在与定义两个函数的同一文件(udfs.py)中执行此操作,则此工作正常。但是,尝试从不同的文件(比如main.py)执行此操作会产生错误ModuleNotFoundError: No module named ...

...
import udfs

_udf = fn.udf(udfs.main_f, 'int')
df.withColumn('x1', _udf(df['x'])).show()

我注意到,如果我实际上将nested_f嵌入main_f中,就像这样:

def main_f(x):
    def nested_f(x):
        return x + 1

    return nested_f(x) + 1

一切顺利。但是,我的目标是在多个函数中很好地分离逻辑,我也可以单独测试。

我认为这可以通过使用udfs.pyspark.sparkContext.addPyFile('...udfs.py')文件(或整个压缩文件夹)提交给执行程序来解决。然而:

  1. 我觉得这有点啰嗦(特别是如果你需要拉链文件夹......)
  2. 这并不总是容易/可能(例如udfs.py可能正在使用许多其他模块,然后也需要提交,导致一些连锁反应...)
  3. 还有一些其他的不便与addPyFile(例如autoreload can stop working等)

所以问题是:有没有办法同时做所有这些:

  • 让UDF的逻辑很好地分解为几个Python函数
  • 从与定义逻辑的位置不同的文件中使用UDF
  • 不需要使用addPyFile提交任何依赖项

奖励积分,以澄清这是如何工作/为什么这不起作用!

答案

对于小的(一个或两个本地文件)依赖项,您可以使用--py-files并枚举它们,使用更大或更多的依赖项 - 最好将它打包到zip或egg文件中。

文件udfs.py

def my_function(*args, **kwargs):
    # code

文件main.py

from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from udfs import my_function

sc = SparkContext()
spark = SparkSession(sc)
my_udf = udf(my_function)

df = spark.createDataFrame([(1, "a"), (2, "b")])
df.withColumn("my_f", my_udf("..."))

运行:

pyspark --py-files /path/to/udfs.py
# or
spark-submit --py-files /path/to/udfs.py main.py

如果您已经编写了自己的Python模块甚至是第三方模块(不需要C编译),我个人需要使用geoip2,最好创建一个zip或egg文件。

# pip with -t install all modules and dependencies in directory `src`
pip install geoip2 -t ./src
# Or from local directory
pip install ./my_module -t ./src

# Best is 
pip install -r requirements.txt -t ./src

# If you need add some additionals files
cp ./some_scripts/* ./src/

# And pack it
cd ./src
zip -r ../libs.zip .
cd ..

pyspark --py-files libs.zip
spark-submit --py-files libs.zip

在使用pyspark --master yarn的pyspark shell中使用--py-files(可能还有其他非本地主选项)时要小心:

>>> import sys
>>> sys.path.insert(0, '/path/to/libs.zip')  # You can use relative path: .insert(0, 'libs.zip')
>>> import MyModule  # libs.zip/MyModule

编辑 - 关于如何在没有addPyFile ()--py-files的情况下获取执行程序函数的问题的答案:

有必要在一个给定的文件中包含各个执行程序的函数。并可通过PATH环境访问。因此,我可能会编写一个Python模块,然后我将其安装在执行程序中并在环境中可用。

以上是关于从Pyspark UDF调用另一个自定义Python函数的主要内容,如果未能解决你的问题,请参考以下文章

Pyspark UDF 中自定义 Python 对象的使用

PySpark 自定义 UDF ModuleNotFoundError: No module named

pyspark中未定义的函数UDF?

Pyspark:从 Struct 中识别 arrayType 列并调用 udf 将数组转换为字符串

Pyspark 使用 udf 处理数组列并返回另一个数组

Pyspark UDF 广播变量未定义仅在由单独脚本导入时