在 pandas udf pyspark 中使用 numpy

Posted

技术标签:

【中文标题】在 pandas udf pyspark 中使用 numpy【英文标题】:Using numpy inside pandas udf pyspark 【发布时间】:2020-04-20 15:38:40 【问题描述】:

我正在尝试定义一个 pandas udf 来计算每个周期的对数正态分布的偏斜。

我目前做了以下事情:

@pandas_udf("double", PandasUDFType.GROUPED_AGG)  
def lognormal_skew(v):
  return (np.exp(v.std()) + 2) * np.sqrt(np.exp(v.std()) - 1)

my_df.groupBy('period').agg(lognormal_skew(my_df['my_columns'])).show()

但是我得到一个错误:

rg.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 3047.0 failed 4 times, most recent failure: Lost task 0.3 in stage 3047.0 (TID 208812, 10.139.64.8, executor 82): org.apache.spark.api.python.PythonException: Traceback (most recent call last):

我的猜测是这与numpy 有关系,因为如果我尝试如下定义倾斜:

@pandas_udf("double", PandasUDFType.GROUPED_AGG)  
def skew(v):
  return v.skew()

my_df.groupBy('period').agg(skew(my_df['my_columns'])).show()

它输出一个DataFrame,它没有错误。

【问题讨论】:

使用 numpy 的代码可以正常工作。确保您使用的是 PyArrow 0.14.1 或更低版本。 【参考方案1】:

序言

根据我的经验,我认为只要可以使用pyspark 内置函数来实现某些东西,它就比用户定义的函数更可取。

udf 的一个问题是错误消息难以解密。例如,在您的情况下,我不知道您为什么会遇到此错误。

pyspark.sql.functions 允许你做很多事情,如果你接受更多的步骤。但是,就性能而言,这将很难被击败,因为这些功能是由专家优化的。如果您想做的事情不能用pyspark.sql.functions 完成(发生这种情况),我更喜欢使用rdd 而不是udfrdd 更自然地应用 Python 函数。相对于内置的 DataFrame 方法,您会降低性能,但您会获得一些灵活性。

也许一个关于你的问题的例子可能是有启发性的。

Python

让我们以基于 numpy 的示例为例。你给了python 实现:

import numpy as np
def lognormal_skew_numpy(v):
    return (np.exp(v.std()) + 2) * np.sqrt(np.exp(v.std()) - 1)

可以用来控制其他实现是否一致:

print(lognormal_skew_numpy(np.array([1,3,5])))
print(lognormal_skew_numpy(np.array([5,6])))
# 14.448897615797454
# 2.938798148174726

DataFrame API 逻辑

现在,让我们转至Spark。我将使用以下DataFrame

df = spark.createDataFrame([(1, 'a'), (3, 'a'), (5, 'a'), (5,'b'), (6,'b')], ['x','period'])
df.show(2)

+---+------+
|  x|period|
+---+------+
|  1|     a|
|  3|     a|
+---+------+
only showing top 2 rows

偏度函数只执行基本的数学运算。它们都在pyspark.sql.functions 中实现,因此在这种情况下创建一个执行此操作的函数并不难

import pyspark.sql.functions as psf

def lognormal_skew(df, xvar = 'x'):
    df_agg = (df
              .groupBy('period')
              .agg(psf.stddev_pop(xvar).alias('sd'))
             )
    df_agg = df_agg.withColumn('skew', (psf.exp(psf.col('sd')) + 2)*psf.sqrt(psf.exp('sd') - 1))
    return df_agg

请注意,在psf 中存在不同的计算标准差的函数:我使用效率较低但报告总体水平方差的stddev_pop,而不是估计量(如果有 3 或 2 个点,估计量的精度将很穷)。

我们可以控制它产生所需的输出:

lognormal_skew(df).show(2)
+------+-----------------+------------------+
|period|               sd|              skew|
+------+-----------------+------------------+
|     b|              0.5| 2.938798148174726|
|     a|1.632993161855452|14.448897615797454|
+------+-----------------+------------------+

我们设法通过纯 DataFrame 逻辑获得了预期结果。

rdd

让我们将数据排列成一个rdd,它看起来像并行化的 numpy 数组:

rdd = df.rdd
rdd = rdd.mapValues(lambda l: l).map(lambda l: (l[1], [l[0]] )).reduceByKey(lambda x,y: x + y)
rdd.take(2)
[('b', [5, 6]), ('a', [1, 3, 5])]

这里我们使用reduceByKey 将值分组到一个列表中。在这一步,如果数据量很大,您的 RAM 可能会爆炸。

最后,您可以轻松地将函数与该结构并行:

rdd = rdd.map(lambda l: (l[0], np.array(l[1]))).map(lambda l: (l[0], lognormal_skew_numpy(l[1])))
rdd.take(2)
[('b', 2.938798148174726), ('a', 14.448897615797454)]

我们又得到了同样的结果。我发现这种方法有两个缺陷:

它的可读性和便携性较差。如果你想用不同的数据集重用代码,你将不得不做更多的工作 效率较低(速度和内存)。这里的reduceByKey 操作是主要瓶颈。

但是,您可以获得一些灵活性。这是一个权衡。

【讨论】:

以上是关于在 pandas udf pyspark 中使用 numpy的主要内容,如果未能解决你的问题,请参考以下文章

在pyspark的pandas_udf中使用外部库

在 PySpark 中使用 pandas_udf 时无法填充数组

在 pyspark 中使用 pandas_udf 过滤数据框

在 pandas udf pyspark 中使用 numpy

为啥运行 pandas_udf 时 Pyspark 失败?

在pyspark中使用pandas udf/apache Arrow