Pandas UDF 函数在大数据上完成的时间异常长

Posted

技术标签:

【中文标题】Pandas UDF 函数在大数据上完成的时间异常长【英文标题】:Pandas UDF Function Takes Unusually Long to Complete on Big Data 【发布时间】:2021-08-30 12:05:39 【问题描述】:

我是 PySpark 和 Pandas UDF 的新手,我正在运行以下 Pandas UDF 函数来混淆包含字符串的列(例如:输入 'Luke' 将导致 'ulek')

pandas_udf("string")
def jumble_string(column: pd.Series)-> pd.Series:
  return column.apply(lambda x: None if x==None else ''.join(random.sample(x, len(x))).lower()) 

spark_df = spark_df.withColumn("names", jumble_string("names"))

在大型数据集上运行上述函数时,我注意到执行时间异常长。

我猜.apply 函数与这个问题有关。

无论如何我可以重写这个函数,以便它可以有效地在大数据集上执行吗? 请指教

【问题讨论】:

【参考方案1】:

Spark 中没有实现将列中的字符串打乱的函数,因此我们不得不求助于 UDF 或 Pandas UDF。

您的解决方案实际上非常好;也许我们可以通过从系列中删除.apply 方法并在每行的字符串上仅使用基础 Python 来改进它。

@pandas_udf("string")
def jumble_string_new(column: pd.Series)-> pd.Series:
  x = column.iloc[0]   # each pd.Series is made only of one element
  if x is None:
    return pd.Series([None])
  else:
    return pd.Series([''.join(random.sample(x, len(x))).lower()])

结果与你的函数相同;但是,我无法在非常大的数据帧上对其进行测试。自己尝试一下,看看它的计算效率是否更高。

【讨论】:

我一定会试试这个并回复你 我尝试使用 spark_df.withColumn("first_name", shuffle_string("first_name")) 在 500 行的数据集上运行它并得到错误 PythonException: 'RuntimeError: Result vector from pandas_udf was not the required length: expected 500, got 1' 您收到此错误是因为这种 Pandas UDF(SCALAR 类型)需要 pd.Series 作为输出,而不是单个字符串。顺便说一句,shuffle_string 是什么?它是在哪里定义的? 我的错,shuffle_stringjumble_string 是一样的 我在一个非常小的数据集上尝试了我提出的jumble_string_new,它运行得很顺利。你确定Pandas UDF的输入输出实际上是pd.Series吗?【参考方案2】:

由于.apply 方法没有向量化,因此给定的操作是通过循环遍历元素来完成的,这会随着数据量的变大而减慢执行速度。

对于小型数据,时间差通常可以忽略不计。然而,随着尺寸的增加,差异开始变得明显。我们可能会处理大量数据,因此应始终考虑时间。

您可以阅读更多关于 Apply vs Vectorized Operations here.

因此我决定使用列表推导,这确实略微提高了我的表现。

@pandas_udf("string")
def jumble_string(column: pd.Series)-> pd.Series:
  return pd.Series([None if x==None else ''.join(random.sample(x, len(x))).lower() for x in column])

【讨论】:

以上是关于Pandas UDF 函数在大数据上完成的时间异常长的主要内容,如果未能解决你的问题,请参考以下文章

Pandas UDF 函数中无法识别的函数

Pandas UDF Facebook Prophet / 多个参数

将多行结构化流式传输到 pandas udf

如何在 Pyspark 中使用 @pandas_udf 返回多个数据帧?

pandas_udf结果无法写入表

带有 Pandas 矢量化 UDF 的 Spark 3