Pandas UDF 函数在大数据上完成的时间异常长
Posted
技术标签:
【中文标题】Pandas UDF 函数在大数据上完成的时间异常长【英文标题】:Pandas UDF Function Takes Unusually Long to Complete on Big Data 【发布时间】:2021-08-30 12:05:39 【问题描述】:我是 PySpark 和 Pandas UDF 的新手,我正在运行以下 Pandas UDF 函数来混淆包含字符串的列(例如:输入 'Luke' 将导致 'ulek')
pandas_udf("string")
def jumble_string(column: pd.Series)-> pd.Series:
return column.apply(lambda x: None if x==None else ''.join(random.sample(x, len(x))).lower())
spark_df = spark_df.withColumn("names", jumble_string("names"))
在大型数据集上运行上述函数时,我注意到执行时间异常长。
我猜.apply
函数与这个问题有关。
无论如何我可以重写这个函数,以便它可以有效地在大数据集上执行吗? 请指教
【问题讨论】:
【参考方案1】:Spark 中没有实现将列中的字符串打乱的函数,因此我们不得不求助于 UDF 或 Pandas UDF。
您的解决方案实际上非常好;也许我们可以通过从系列中删除.apply
方法并在每行的字符串上仅使用基础 Python 来改进它。
@pandas_udf("string")
def jumble_string_new(column: pd.Series)-> pd.Series:
x = column.iloc[0] # each pd.Series is made only of one element
if x is None:
return pd.Series([None])
else:
return pd.Series([''.join(random.sample(x, len(x))).lower()])
结果与你的函数相同;但是,我无法在非常大的数据帧上对其进行测试。自己尝试一下,看看它的计算效率是否更高。
【讨论】:
我一定会试试这个并回复你 我尝试使用spark_df.withColumn("first_name", shuffle_string("first_name"))
在 500 行的数据集上运行它并得到错误 PythonException: 'RuntimeError: Result vector from pandas_udf was not the required length: expected 500, got 1'
您收到此错误是因为这种 Pandas UDF(SCALAR 类型)需要 pd.Series 作为输出,而不是单个字符串。顺便说一句,shuffle_string
是什么?它是在哪里定义的?
我的错,shuffle_string
和 jumble_string
是一样的
我在一个非常小的数据集上尝试了我提出的jumble_string_new
,它运行得很顺利。你确定Pandas UDF的输入输出实际上是pd.Series吗?【参考方案2】:
由于.apply
方法没有向量化,因此给定的操作是通过循环遍历元素来完成的,这会随着数据量的变大而减慢执行速度。
对于小型数据,时间差通常可以忽略不计。然而,随着尺寸的增加,差异开始变得明显。我们可能会处理大量数据,因此应始终考虑时间。
您可以阅读更多关于 Apply vs Vectorized Operations here.
因此我决定使用列表推导,这确实略微提高了我的表现。
@pandas_udf("string")
def jumble_string(column: pd.Series)-> pd.Series:
return pd.Series([None if x==None else ''.join(random.sample(x, len(x))).lower() for x in column])
【讨论】:
以上是关于Pandas UDF 函数在大数据上完成的时间异常长的主要内容,如果未能解决你的问题,请参考以下文章
Pandas UDF Facebook Prophet / 多个参数