如何在 pyspark 中使用 pandas UDF 并在 StructType 中返回结果

Posted

技术标签:

【中文标题】如何在 pyspark 中使用 pandas UDF 并在 StructType 中返回结果【英文标题】:How to use pandas UDF in pyspark and return result in StructType 【发布时间】:2019-02-22 16:48:58 【问题描述】:

如何在 pyspark 中驱动基于 panda-udf 的列。我写的 udf 如下:

from pyspark.sql.functions import pandas_udf, PandasUDFType

@pandas_udf("in_type string, in_var string, in_numer int", PandasUDFType.GROUPED_MAP)

def getSplitOP(in_data):
    if in_data is None or len(in_data) < 1:
        return None
    #Input/variable.12-2017
    splt=in_data.split("/",1)
    in_type=splt[0]

    splt_1=splt[1].split(".",1)
    in_var = splt_1[0]

    splt_2=splt_1[1].split("-",1)
    in_numer=int(splt_2[0])

    return (in_type, in_var, in_numer)
    #Expected output: ("input", "variable", 12)

df = df.withColumn("splt_col", getSplitOP(df.In_data))

谁能帮我找出上面的代码有什么问题,以及为什么它不起作用。

【问题讨论】:

查看执行此代码的结果可能很有用。 为什么在这里使用 pandas udf?您可以使用常规 udf 甚至使用标准 API 函数来执行此操作。 我在寻找 pandas udf,因为它们比普通 udf 快。所以我一直在寻找他们的实现。 我相信 pandas udf 比普通 udf 更快。 【参考方案1】:

这将起作用:

df = spark.createDataFrame([("input/variable.12-2017",), ("output/invariable.11-2018",)], ("in_data",))
df.show()

from pyspark.sql.functions import pandas_udf, PandasUDFType

@pandas_udf("in_type string, in_var string, in_numer int", PandasUDFType.GROUPED_MAP)
def getSplitOP(pdf):
    in_data = pdf.in_data

    #Input/variable.12-2017
    splt = in_data.apply(lambda x: x.split("/",1))
    in_type = splt.apply(lambda x: x[0])

    splt_1 = splt.apply(lambda x: x[1].split(".",1))
    in_var = splt_1.apply(lambda x: x[0])

    splt_2 = splt_1.apply(lambda x: x[1].split("-",1))
    in_numer = splt_2.apply(lambda x: int(x[0]))

    return pd.DataFrame("in_type": in_type, "in_var": in_var, "in_numer": in_numer)
    #Expected output: ("input", "variable", 12)

df = df.groupBy().apply(getSplitOP)
df.show()
@pandas_udf 后面不能有空行。 pandas Series 对象不直接支持 split 等字符串函数。使用 apply 对每个系列进行元素操作。 您使用 GROUPED_MAP 来返回多个列,但您的代码本身并没有按任何内容分组。请注意,此处使用的 groupBy 没有任何参数。这要求所有数据都适合单个处理器。

【讨论】:

以上是关于如何在 pyspark 中使用 pandas UDF 并在 StructType 中返回结果的主要内容,如果未能解决你的问题,请参考以下文章

如何在 Pyspark 中使用 @pandas_udf 返回多个数据帧?

如何在 jupyter 中像 pandas Dataframe 一样打印 Pyspark Dataframe

如何在 PySpark 中复制 Pandas 的 between_time 函数

如何在 PySpark 中将多个参数传递给 Pandas UDF?

如何在 PySpark 上将所有功能组合成一列?

如何在 pyspark.sql.functions.pandas_udf 和 pyspark.sql.functions.udf 之间进行选择?