如何在用户定义的函数(又名 udf)中返回 Pandas.Series?
Posted
技术标签:
【中文标题】如何在用户定义的函数(又名 udf)中返回 Pandas.Series?【英文标题】:How to return a Pandas.Series in a user defined function, aka udf? 【发布时间】:2019-07-31 15:19:03 【问题描述】:在提问之前,我想指出我对 Spark 没有任何了解,我上周在研究团队的一个项目中开始搞乱它。我必须对他们的代码进行一些调整,我很难使用pandas_udf
函数。
研究团队代码有一个包含一些方法的类,并且在该类之外有 5 个pandas_udf
方法使用全局变量来操作数据。我正在做的修改是取出那些全局变量并在类中获取这 5 个 pandas_udf
方法,但我这样做时出错了。
代码如下:
class Signal(object):
# some methods, __init__, etc, etc
def propagate(self, column1):
signal_list = []
# some code and return a list
return signal_list
@pandas_udf(ArrayType(StringType()), PandasUDFType.SCALAR)
def propagate_signal(self, signal_column):
return signal_column.apply(
self.propagate,
column1=signal_column
)
def execute(self):
# some code...
dataframe = dataframe.withColumn('col_name', self.propagate_signal(dataframe['col_name']))
在这里我遇到了一个错误,我做了一些研究,比较了两个代码,在调试模式下同时执行了它们,并发现当 pandas_udf
方法被执行时,self 参数不是预期的,而是导致错误,而该方法的原始代码没有 self 参数并且执行得很好。然后我有了一个想法,嵌套函数。
def propagate_signal(self, signal_column):
@pandas_udf(ArrayType(StringType()), PandasUDFType.SCALAR)
def propagate_signal_nested(signal_column):
signal_column.apply(self.propagate, column1=signal_column)
return = propagate_signal_nested(signal_column)
它有效(我认为它有效,因为它从这里传递,它不是从这一行传递),但在这一行之后,我无法将此 spark 数据帧转换为 pandas 数据帧(请注意,在此行之前我可以调用 dataframe.toPandas() 并且它被执行并且我可以看到数据帧),当我在该行之后调用 dataframe.toPandas()
时,我收到错误 TypeError: Return type of the user-defined function should be Pandas.Series, but is <class 'NoneType'>
并且还收到错误 ERROR TaskSetManager: Task 0 in stage 5.0 failed 1 times; aborting job
我试图搜索了一下,得到了一些关于parallelize
的建议,增加内存,但是没用。
在整个历史之后,这里有一个简单的问题,这个 pandas_udf
函数是否正确,我在返回时做错了什么,我怎样才能让它返回 pandas.Series?如果有人可以帮忙,请。感谢您的关注。
【问题讨论】:
【参考方案1】:您在pandas_udf
中似乎没有return
语句。所以默认情况下,python 会为您返回None
。
试试这样的:
@pandas_udf(ArrayType(StringType()), PandasUDFType.SCALAR)
def propagate_signal_nested(signal_column):
ps = signal_column.apply(self.propagate, column1=signal_column)
return ps
【讨论】:
以上是关于如何在用户定义的函数(又名 udf)中返回 Pandas.Series?的主要内容,如果未能解决你的问题,请参考以下文章