我可以将 spark 数据帧作为参数发送给 pandas UDF

Posted

技术标签:

【中文标题】我可以将 spark 数据帧作为参数发送给 pandas UDF【英文标题】:Can I send a spark dataframe as an argument to pandas UDF 【发布时间】:2020-11-26 04:42:42 【问题描述】:

是否可以将 spark 数据帧作为参数发送到 pandas UDF 并获取 pandas 数据帧作为返回。下面是我正在使用的示例代码集,在调用函数时出现错误:

import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import pandas_udf



spark = SparkSession \
    .builder \
    .appName("PrimeBatch") \
    .master("local[*]") \
    .getOrCreate()

srcFile = <Some CSV file>
df = spark.read.option("header",True)\
    .csv(srcFile)

# Declare the function and create the UDF
@pandas_udf("Count int")
def count_udf(v: pd.DataFrame) -> pd.DataFrame:
    return v.count()

p_df = count_udf(df)
p_df

我在运行代码时遇到的错误如下:

TypeError:参数无效,不是字符串或列: 类型的 DataFrame[]。对于列字面量,请使用“lit”、“array”、“struct”或“create_map”函数。

提前致谢!!!

【问题讨论】:

【参考方案1】:

一般来说,Pandas UDF 将采用 Pandas.Series。您定义的 count_udf 函数只是一个普通函数,它接受一个 pandas DataFrame 并返回一个 pandas DataFrame

如果您想将 spark DataFrame 转换为 pandas DataFrame,那么您可以尝试以下操作:

pandas_df  = df.toPandas()

您可以参考以下链接以更好地了解如何应用 panda UDF:

    Introducing vectorized udfs for pyspark Spark Pandas UDF

【讨论】:

这是有道理的..但是当我在大数据集上尝试 toPandas 时失败了..比如说一个 10+ gb 的文件..我认为 spark 试图将整个数据集加载到内存中...... Pandas DataFrame 需要适合驱动程序内存,因此不建议在大型数据集上使用此方法。

以上是关于我可以将 spark 数据帧作为参数发送给 pandas UDF的主要内容,如果未能解决你的问题,请参考以下文章

我如何将form_id作为参数发送给javascript函数?

将包含 Vector 作为特征的 Spark 数据帧转换为 CSV 文件

使用 udf 传递列作为参数将自定义列添加到 pyspark 数据帧

通过将键作为列将 json 字典转换为 spark 数据帧

将大型 Spark 数据帧作为镶木地板写入 s3 存储桶

如何将回调作为参数传递给另一个函数