pandas_udf 与 pyspark 3.0 的 scipiy.find_peaks 结果不一致

Posted

技术标签:

【中文标题】pandas_udf 与 pyspark 3.0 的 scipiy.find_peaks 结果不一致【英文标题】:Inconsistent scipiy.find_peaks results from pandas_udf with pyspark 3.0 【发布时间】:2020-07-08 13:02:28 【问题描述】:

我尝试在 pyspark 的 pandas_udf 中使用 scipy 的 find_peaks。一个准系统示例:

from pyspark.sql import SparkSession, SQLContext, Row
from pyspark.sql.functions import pandas_udf, col
from pyspark.sql.types import DoubleType 

import pandas as pd
import numpy as np

from scipy.signal import find_peaks

spark = SparkSession.builder.master("yarn") \
.appName("UDF_debug") \
.config("spark.yarn.dist.archives", "hdfs://PATH/TO/MY/USERFOLDER/envs/my_env.zip#MYENV")\
.config("spark.submit.deployMode", "client")\
.config("spark.yarn.queue", "root.dev")\
.enableHiveSupport()\
.getOrCreate()

# Create a sample dataframe and a corresponding pandas data frame, for cross-checking
df = spark.createDataFrame(
    [Row(id=1, c=3),
    Row(id=2,  c=6),
    Row(id=3,  c=2),
    Row(id=4,  c=9),
    Row(id=5,  c=7)])

dfp = df.toPandas()

def peak_finder(C: pd.Series) -> pd.Series:
     # Find peaks (maxima)
    pos_peaks, pos_properties = find_peaks(C)
    
    # Create an empty series of appropriate length
    r = pd.Series(np.full(len(C), np.nan))

    # Wherever a peak was found ...
    for idx in pos_peaks:
        # ... mark it by noting its height
        r[idx] = C[idx]

    return r

# Peak finding using pyspark's pandas_udf
peak_finder_udf = pandas_udf(peak_finder, returnType=DoubleType())
df = df.withColumn('peak', peak_finder_udf(df.c))
df.show()


# Peak finding directly on a pandas df
dfp["peaks_pandas"] = peak_finder(dfp["c"])
print(dfp)

两次打印的结果如下。首先,用 pandas_udf 找峰:

+---+---+----+
| id|  c|peak|
+---+---+----+
|  1|  3|null|
|  2|  6|null|
|  3|  2|null|
|  4|  9| 9.0|
|  5|  7|null|
+---+---+----+

其次,在边缘节点上只使用 stock pandas 和 numpy:

   id  c  peaks_pandas
0   1  3           NaN
1   2  6           6.0
2   3  2           NaN
3   4  9           9.0
4   5  7           NaN

id=2 的行不一致。

这可能从pyspark documentation 可以理解,说明:

在内部,PySpark 将通过拆分列来执行 Pandas UDF 成批次并为每个批次调用函数作为 数据,然后将结果连接在一起。

出现这么小的分裂似乎很奇怪,但也许......

问题 1:这种不一致的行为是预期的吗?我可以避免吗?

编辑:回答:是的,这是由于分区。请参阅下面的评论。

另一个奇怪的行为可能指向解决方案(但对我来说提出了更多问题)。继续上面的代码:

fname = "debug.parquet"
df.dropna().write.parquet(fname)
dfnew = spark.read.parquet(fname)
dfnew.show()

它产生结果

+---+---+----+
| id|  c|peak|
+---+---+----+
|  4|  9|null|
+---+---+----+

峰值不再是应该的 = 9,而是空值。

问题 2:谁能解释一下保存过程中的数据丢失情况?

conda env 中的相关包:

# Name                    Version                   Build  Channel
_libgcc_mutex             0.1                        main    defaults
arrow-cpp                 0.15.1           py38h7cd5009_5    defaults
attrs                     19.3.0                     py_0    defaults
backcall                  0.2.0                      py_0    defaults
blas                      1.0                         mkl    defaults
bleach                    3.1.5                      py_0    defaults
boost-cpp                 1.71.0               h7b6447c_0    defaults
brotli                    1.0.7                he6710b0_0    defaults
brotlipy                  0.7.0           py38h7b6447c_1000    defaults
bzip2                     1.0.8                h7b6447c_0    defaults
c-ares                    1.15.0            h7b6447c_1001    defaults
ca-certificates           2020.6.24                     0    defaults
certifi                   2020.6.20                py38_0    defaults
cffi                      1.14.0           py38he30daa8_1    defaults
chardet                   3.0.4                 py38_1003    defaults
cryptography              2.9.2            py38h1ba5d50_0    defaults
dbus                      1.13.16              hb2f20db_0    defaults
decorator                 4.4.2                      py_0    defaults
defusedxml                0.6.0                      py_0    defaults
double-conversion         3.1.5                he6710b0_1    defaults
entrypoints               0.3                      py38_0    defaults
expat                     2.2.9                he6710b0_2    defaults
fontconfig                2.13.0               h9420a91_0    defaults
freetype                  2.10.2               h5ab3b9f_0    defaults
gflags                    2.2.2                he6710b0_0    defaults
glib                      2.65.0               h3eb4bd4_0    defaults
glog                      0.4.0                he6710b0_0    defaults
grpc-cpp                  1.26.0               hf8bcb03_0    defaults
gst-plugins-base          1.14.0               hbbd80ab_1    defaults
gstreamer                 1.14.0               hb31296c_0    defaults
icu                       58.2                 he6710b0_3    defaults
idna                      2.10                       py_0    defaults
importlib-metadata        1.7.0                    py38_0    defaults
importlib_metadata        1.7.0                         0    defaults
intel-openmp              2020.1                      217    defaults
ipykernel                 5.3.0            py38h5ca1d4c_0    defaults
ipython                   7.16.1           py38h5ca1d4c_0    defaults
ipython_genutils          0.2.0                    py38_0    defaults
ipywidgets                7.5.1                      py_0    defaults
jedi                      0.17.1                   py38_0    defaults
jinja2                    2.11.2                     py_0    defaults
jpeg                      9b                   h024ee3a_2    defaults
json5                     0.9.5                      py_0    defaults
jsonschema                3.2.0                    py38_0    defaults
jupyter                   1.0.0                    py38_7    defaults
jupyter_client            6.1.3                      py_0    defaults
jupyter_console           6.1.0                      py_0    defaults
jupyter_core              4.6.3                    py38_0    defaults
jupyterlab                2.1.5                      py_0    defaults
jupyterlab_server         1.1.5                      py_0    defaults
ld_impl_linux-64          2.33.1               h53a641e_7    defaults
libboost                  1.71.0               h97c9712_0    defaults
libedit                   3.1.20191231         h7b6447c_0    defaults
libevent                  2.1.8                h1ba5d50_0    defaults
libffi                    3.3                  he6710b0_2    defaults
libgcc-ng                 9.1.0                hdf63c60_0    defaults
libgfortran-ng            7.3.0                hdf63c60_0    defaults
libpng                    1.6.37               hbc83047_0    defaults
libprotobuf               3.11.4               hd408876_0    defaults
libsodium                 1.0.18               h7b6447c_0    defaults
libstdcxx-ng              9.1.0                hdf63c60_0    defaults
libuuid                   1.0.3                h1bed415_2    defaults
libxcb                    1.14                 h7b6447c_0    defaults
libxml2                   2.9.10               he19cac6_1    defaults
lz4-c                     1.8.1.2              h14c3975_0    defaults
markupsafe                1.1.1            py38h7b6447c_0    defaults
mistune                   0.8.4           py38h7b6447c_1000    defaults
mkl                       2020.1                      217    defaults
mkl-service               2.3.0            py38he904b0f_0    defaults
mkl_fft                   1.1.0            py38h23d657b_0    defaults
mkl_random                1.1.1            py38h0573a6f_0    defaults
nbconvert                 5.6.1                    py38_0    defaults
nbformat                  5.0.7                      py_0    defaults
ncurses                   6.2                  he6710b0_1    defaults
notebook                  6.0.3                    py38_0    defaults
numpy                     1.18.5           py38ha1c710e_0    defaults
numpy-base                1.18.5           py38hde5b4d6_0    defaults
openssl                   1.1.1g               h7b6447c_0    defaults
packaging                 20.4                       py_0    defaults
pandas                    1.0.5            py38h0573a6f_0    defaults
pandoc                    2.9.2.1                       0    defaults
pandocfilters             1.4.2                    py38_1    defaults
parso                     0.7.0                      py_0    defaults
pcre                      8.44                 he6710b0_0    defaults
pexpect                   4.8.0                    py38_0    defaults
pickleshare               0.7.5                 py38_1000    defaults
pip                       20.1.1                   py38_1    defaults
prometheus_client         0.8.0                      py_0    defaults
prompt-toolkit            3.0.5                      py_0    defaults
prompt_toolkit            3.0.5                         0    defaults
ptyprocess                0.6.0                    py38_0    defaults
py4j                      0.10.9                     py_0    defaults
pyarrow                   0.15.1           py38h0573a6f_0    defaults
pycparser                 2.20                       py_0    defaults
pygments                  2.6.1                      py_0    defaults
pyopenssl                 19.1.0                   py38_0    defaults
pyparsing                 2.4.7                      py_0    defaults
pyqt                      5.9.2            py38h05f1152_4    defaults
pyrsistent                0.16.0           py38h7b6447c_0    defaults
pysocks                   1.7.1                    py38_0    defaults
pyspark                   3.0.0                      py_0    defaults
python                    3.8.3                hcff3b4d_2    defaults
python-dateutil           2.8.1                      py_0    defaults
pytz                      2020.1                     py_0    defaults
pyzmq                     19.0.1           py38he6710b0_1    defaults
qt                        5.9.7                h5867ecd_1    defaults
qtconsole                 4.7.5                      py_0    defaults
qtpy                      1.9.0                      py_0    defaults
re2                       2019.08.01           he6710b0_0    defaults
readline                  8.0                  h7b6447c_0    defaults
requests                  2.24.0                     py_0    defaults
scipy                     1.5.0            py38h0b6359f_0    defaults
send2trash                1.5.0                    py38_0    defaults
setuptools                47.3.1                   py38_0    defaults
sip                       4.19.13          py38he6710b0_0    defaults
six                       1.15.0                     py_0    defaults
snappy                    1.1.8                he6710b0_0    defaults
sqlite                    3.32.3               h62c20be_0    defaults
terminado                 0.8.3                    py38_0    defaults
testpath                  0.4.4                      py_0    defaults
thrift-cpp                0.11.0               h02b749d_3    defaults
tk                        8.6.10               hbc83047_0    defaults
tornado                   6.0.4            py38h7b6447c_1    defaults
traitlets                 4.3.3                    py38_0    defaults
uriparser                 0.9.3                he6710b0_1    defaults
urllib3                   1.25.9                     py_0    defaults
wcwidth                   0.2.5                      py_0    defaults
webencodings              0.5.1                    py38_1    defaults
wheel                     0.34.2                   py38_0    defaults
widgetsnbextension        3.5.1                    py38_0    defaults
xz                        5.2.5                h7b6447c_0    defaults
zeromq                    4.3.2                he6710b0_2    defaults
zipp                      3.1.0                      py_0    defaults
zlib                      1.2.11               h7b6447c_3    defaults
zstd                      1.3.7                h0b5b093_0    defaults

我也尝试过使用 pyspark 2.4.5(结合 pyarrow 0.8)。结果完全相同。

【问题讨论】:

回答了问题 1:我刚刚发现确实是分区导致了这种行为。我只是通过在 peak_finder 函数中分配一个随机数来做到这一点。 为问题 2 找到了一种解决方法:先转换做 rdd 然后立即返回数据框解决了问题(即添加 .rdd.toDF())。 【参考方案1】:

问题 1:不一致的行为确实是由于分区造成的。

问题 2:找到解决方法:先转换为 rdd,然后立即返回数据框解决了问题(即添加 .rdd.toDF())。我不清楚原因,可能是我不明白的背景发生了什么。

【讨论】:

以上是关于pandas_udf 与 pyspark 3.0 的 scipiy.find_peaks 结果不一致的主要内容,如果未能解决你的问题,请参考以下文章

为啥运行 pandas_udf 时 Pyspark 失败?

PySpark。将 Dataframe 传递给 pandas_udf 并返回一个系列

如何在 pyspark.sql.functions.pandas_udf 和 pyspark.sql.functions.udf 之间进行选择?

如何在 Pyspark 中使用 @pandas_udf 返回多个数据帧?

为啥我的应用程序不能以 pandas_udf 和 PySpark+Flask 开头?

在pyspark的pandas_udf中使用外部库