如何在用户定义的函数(又名 udf)中返回 Pandas.Series?

Posted

技术标签:

【中文标题】如何在用户定义的函数(又名 udf)中返回 Pandas.Series?【英文标题】:How to return a Pandas.Series in a user defined function, aka udf? 【发布时间】:2019-07-31 15:19:03 【问题描述】:

在提问之前,我想指出我对 Spark 没有任何了解,我上周在研究团队的一个项目中开始搞乱它。我必须对他们的代码进行一些调整,我很难使用pandas_udf 函数。

研究团队代码有一个包含一些方法的类,并且在该类之外有 5 个pandas_udf 方法使用全局变量来操作数据。我正在做的修改是取出那些全局变量并在类中获取这 5 个 pandas_udf 方法,但我这样做时出错了。

代码如下:

class Signal(object):
    # some methods, __init__, etc, etc


    def propagate(self, column1):
        signal_list = []
        # some code and return a list
        return signal_list

    @pandas_udf(ArrayType(StringType()),  PandasUDFType.SCALAR)
    def propagate_signal(self, signal_column):
        return signal_column.apply(
            self.propagate,
            column1=signal_column
        )

    def execute(self):
        # some code...
        dataframe = dataframe.withColumn('col_name', self.propagate_signal(dataframe['col_name']))

在这里我遇到了一个错误,我做了一些研究,比较了两个代码,在调试模式下同时执行了它们,并发现当 pandas_udf 方法被执行时,self 参数不是预期的,而是导致错误,而该方法的原始代码没有 self 参数并且执行得很好。然后我有了一个想法,嵌套函数。

def propagate_signal(self, signal_column):
    @pandas_udf(ArrayType(StringType()), PandasUDFType.SCALAR)
    def propagate_signal_nested(signal_column):
        signal_column.apply(self.propagate, column1=signal_column)

    return = propagate_signal_nested(signal_column)

它有效(我认为它有效,因为它从这里传递,它不是从这一行传递),但在这一行之后,我无法将此 spark 数据帧转换为 pandas 数据帧(请注意,在此行之前我可以调用 dataframe.toPandas() 并且它被执行并且我可以看到数据帧),当我在该行之后调用 dataframe.toPandas() 时,我收到错误 TypeError: Return type of the user-defined function should be Pandas.Series, but is <class 'NoneType'> 并且还收到错误 ERROR TaskSetManager: Task 0 in stage 5.0 failed 1 times; aborting job 我试图搜索了一下,得到了一些关于parallelize的建议,增加内存,但是没用。

在整个历史之后,这里有一个简单的问题,这个 pandas_udf 函数是否正确,我在返回时做错了什么,我怎样才能让它返回 pandas.Series?如果有人可以帮忙,请。感谢您的关注。

【问题讨论】:

【参考方案1】:

您在pandas_udf 中似乎没有return 语句。所以默认情况下,python 会为您返回None。 试试这样的:

    @pandas_udf(ArrayType(StringType()), PandasUDFType.SCALAR)
    def propagate_signal_nested(signal_column):
        ps = signal_column.apply(self.propagate, column1=signal_column)
        return ps

【讨论】:

以上是关于如何在用户定义的函数(又名 udf)中返回 Pandas.Series?的主要内容,如果未能解决你的问题,请参考以下文章

udf(用户定义函数)如何在 pyspark 中工作?

在 PostgreSQL 中返回表的用户定义函数

[Hive]Hive自定义函数UDF

如何从 arangodb udf 中的集合返回数据

如何在用户定义的函数中使用自定义类型?

hive之udf函数的使用