Pandas UDF 函数中无法识别的函数

Posted

技术标签:

【中文标题】Pandas UDF 函数中无法识别的函数【英文标题】:Functions not recognised inside Pandas UDF function 【发布时间】:2020-05-10 00:10:46 【问题描述】:

我在 Pyspark 上使用 Pandas UDF。

我有一个主文件 __main_.py,其中包含:

from pyspark.sql import SparkSession
from run_udf import compute


def main():
    spark = SparkSession.builder.getOrCreate()
    df = spark.createDataFrame(
        [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
        ("id", "v"))
    df = compute(df)
    df.show()
    spark.stop()


if __name__ == "__main__":
    main()

还有一个 run_udf.py 文件,其中包含我的 UDF 函数和另一个函数(将单个变量乘以 2):

from pyspark.sql.functions import pandas_udf, PandasUDFType


def multi_by_2(x):
    return 2 * x


def compute(df):

    @pandas_udf("id long, v double", PandasUDFType.GROUPED_MAP)
    def subtract_mean(pdf):
        # pdf is a pandas.DataFrame
        v = pdf.v
        return pdf.assign(v=multi_by_2(v) - v.mean())

    df = df.groupby("id").apply(subtract_mean)

    return df

通过运行 main.py,我收到以下错误:“没有名为 'run_udf' 的模块”。 在此配置中,subtract_mean() 似乎无法访问函数 multi_by_2()。我找到了 2 种方法,但不知道它是否符合最佳实践标准:

方法 1:(将函数移动到计算内部 - 不理想,因为我每次使用另一个 pandas_udf() 函数时都会复制该函数 - 我们失去了“可重用”函数的概念) .

def compute(df):
    def multi_by_2(x):
        return 2 * x
    @pandas_udf("id long, v double", PandasUDFType.GROUPED_MAP)
    def subtract_mean(pdf):
        # pdf is a pandas.DataFrame
        v = pdf.v
        return pdf.assign(v=multi_by_2(v) - v.mean())

    df = df.groupby("id").apply(subtract_mean)


    return df

方法二:将乘法函数作为compute的参数传入。

__main_.py

from pyspark.sql import SparkSession
from run_udf import compute
def multi_by_2(x):
    return 2 * x

def main():
    spark = SparkSession.builder.getOrCreate()
    df = spark.createDataFrame(
        [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
        ("id", "v"))
    df = compute(df, multi_by_2)
    df.show()
    spark.stop()


if __name__ == "__main__":
    main()

run_udf.py from pyspark.sql.functions import pandas_udf, PandasUDFType

def compute(df, multi_by_2):
    @pandas_udf("id long, v double", PandasUDFType.GROUPED_MAP)
    def subtract_mean(pdf):
        # pdf is a pandas.DataFrame
        v = pdf.v
        return pdf.assign(v=multi_by_2(v) - v.mean())

    df = df.groupby("id").apply(subtract_mean)


    return df

我发现的两个解决方案似乎有点老套。有没有更好的方法来解决这个问题?

【问题讨论】:

【参考方案1】:

我知道这个回复是在您发布问题后的一段时间,但我希望它仍然可以提供帮助!

您想将其包装在嵌套函数中的原因是什么?此外,据我所知,使用 spark 数据框作为参数调用函数并不常见,因此也许您可以为您的主脚本尝试以下类似操作:

from pyspark.sql import SparkSession
from run_udf import substract_mean_udf

def main():
    spark = SparkSession.builder.getOrCreate()
    df = spark.createDataFrame(
        [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
        ("id", "v"))
    df =df.groupby("id").apply(subtract_mean_udf)
    df.show()
    spark.stop()

if __name__ == "__main__":
    main()

run_udf.py 脚本如下:

from pyspark.sql.functions import pandas_udf, PandasUDFType

def multi_by_2(x):
    return 2 * x

@pandas_udf(df.schema, PandasUDFType.GROUPED_MAP)
def subtract_mean_udf(pdf):
    # pdf is a pandas.DataFrame
    return pdf.assign(v=multi_by_2(pdf.v) - pdf.v.mean())

大部分信息取自关于 Pandas UDF 的 Databricks 笔记本。

你也可以侥幸逃脱

return pdf.assign(v=pdf.v*2 - pdf.v.mean())

但我还没有测试过,所以我不能 100% 确定。

【讨论】:

以上是关于Pandas UDF 函数中无法识别的函数的主要内容,如果未能解决你的问题,请参考以下文章

appdesign中的函数或变量参数无法识别

使用具有正确语法的 pandas to_datetime() 方法,无法识别的值类型:str?

Swift 3:将无法识别的选择器发送到手势识别器实例

c++程序中,无法识别中文路径怎么办

pandas_udf结果无法写入表

无法在 pyspark 中应用标量 pandas udf