pandas_udf 与 pyspark 3.0 的 scipiy.find_peaks 结果不一致
Posted
技术标签:
【中文标题】pandas_udf 与 pyspark 3.0 的 scipiy.find_peaks 结果不一致【英文标题】:Inconsistent scipiy.find_peaks results from pandas_udf with pyspark 3.0 【发布时间】:2020-07-08 13:02:28 【问题描述】:我尝试在 pyspark 的 pandas_udf 中使用 scipy 的 find_peaks。一个准系统示例:
from pyspark.sql import SparkSession, SQLContext, Row
from pyspark.sql.functions import pandas_udf, col
from pyspark.sql.types import DoubleType
import pandas as pd
import numpy as np
from scipy.signal import find_peaks
spark = SparkSession.builder.master("yarn") \
.appName("UDF_debug") \
.config("spark.yarn.dist.archives", "hdfs://PATH/TO/MY/USERFOLDER/envs/my_env.zip#MYENV")\
.config("spark.submit.deployMode", "client")\
.config("spark.yarn.queue", "root.dev")\
.enableHiveSupport()\
.getOrCreate()
# Create a sample dataframe and a corresponding pandas data frame, for cross-checking
df = spark.createDataFrame(
[Row(id=1, c=3),
Row(id=2, c=6),
Row(id=3, c=2),
Row(id=4, c=9),
Row(id=5, c=7)])
dfp = df.toPandas()
def peak_finder(C: pd.Series) -> pd.Series:
# Find peaks (maxima)
pos_peaks, pos_properties = find_peaks(C)
# Create an empty series of appropriate length
r = pd.Series(np.full(len(C), np.nan))
# Wherever a peak was found ...
for idx in pos_peaks:
# ... mark it by noting its height
r[idx] = C[idx]
return r
# Peak finding using pyspark's pandas_udf
peak_finder_udf = pandas_udf(peak_finder, returnType=DoubleType())
df = df.withColumn('peak', peak_finder_udf(df.c))
df.show()
# Peak finding directly on a pandas df
dfp["peaks_pandas"] = peak_finder(dfp["c"])
print(dfp)
两次打印的结果如下。首先,用 pandas_udf 找峰:
+---+---+----+
| id| c|peak|
+---+---+----+
| 1| 3|null|
| 2| 6|null|
| 3| 2|null|
| 4| 9| 9.0|
| 5| 7|null|
+---+---+----+
其次,在边缘节点上只使用 stock pandas 和 numpy:
id c peaks_pandas
0 1 3 NaN
1 2 6 6.0
2 3 2 NaN
3 4 9 9.0
4 5 7 NaN
id=2 的行不一致。
这可能从pyspark documentation 可以理解,说明:
在内部,PySpark 将通过拆分列来执行 Pandas UDF 成批次并为每个批次调用函数作为 数据,然后将结果连接在一起。
出现这么小的分裂似乎很奇怪,但也许......
问题 1:这种不一致的行为是预期的吗?我可以避免吗?
编辑:回答:是的,这是由于分区。请参阅下面的评论。
另一个奇怪的行为可能指向解决方案(但对我来说提出了更多问题)。继续上面的代码:
fname = "debug.parquet"
df.dropna().write.parquet(fname)
dfnew = spark.read.parquet(fname)
dfnew.show()
它产生结果
+---+---+----+
| id| c|peak|
+---+---+----+
| 4| 9|null|
+---+---+----+
峰值不再是应该的 = 9,而是空值。
问题 2:谁能解释一下保存过程中的数据丢失情况?
conda env 中的相关包:
# Name Version Build Channel
_libgcc_mutex 0.1 main defaults
arrow-cpp 0.15.1 py38h7cd5009_5 defaults
attrs 19.3.0 py_0 defaults
backcall 0.2.0 py_0 defaults
blas 1.0 mkl defaults
bleach 3.1.5 py_0 defaults
boost-cpp 1.71.0 h7b6447c_0 defaults
brotli 1.0.7 he6710b0_0 defaults
brotlipy 0.7.0 py38h7b6447c_1000 defaults
bzip2 1.0.8 h7b6447c_0 defaults
c-ares 1.15.0 h7b6447c_1001 defaults
ca-certificates 2020.6.24 0 defaults
certifi 2020.6.20 py38_0 defaults
cffi 1.14.0 py38he30daa8_1 defaults
chardet 3.0.4 py38_1003 defaults
cryptography 2.9.2 py38h1ba5d50_0 defaults
dbus 1.13.16 hb2f20db_0 defaults
decorator 4.4.2 py_0 defaults
defusedxml 0.6.0 py_0 defaults
double-conversion 3.1.5 he6710b0_1 defaults
entrypoints 0.3 py38_0 defaults
expat 2.2.9 he6710b0_2 defaults
fontconfig 2.13.0 h9420a91_0 defaults
freetype 2.10.2 h5ab3b9f_0 defaults
gflags 2.2.2 he6710b0_0 defaults
glib 2.65.0 h3eb4bd4_0 defaults
glog 0.4.0 he6710b0_0 defaults
grpc-cpp 1.26.0 hf8bcb03_0 defaults
gst-plugins-base 1.14.0 hbbd80ab_1 defaults
gstreamer 1.14.0 hb31296c_0 defaults
icu 58.2 he6710b0_3 defaults
idna 2.10 py_0 defaults
importlib-metadata 1.7.0 py38_0 defaults
importlib_metadata 1.7.0 0 defaults
intel-openmp 2020.1 217 defaults
ipykernel 5.3.0 py38h5ca1d4c_0 defaults
ipython 7.16.1 py38h5ca1d4c_0 defaults
ipython_genutils 0.2.0 py38_0 defaults
ipywidgets 7.5.1 py_0 defaults
jedi 0.17.1 py38_0 defaults
jinja2 2.11.2 py_0 defaults
jpeg 9b h024ee3a_2 defaults
json5 0.9.5 py_0 defaults
jsonschema 3.2.0 py38_0 defaults
jupyter 1.0.0 py38_7 defaults
jupyter_client 6.1.3 py_0 defaults
jupyter_console 6.1.0 py_0 defaults
jupyter_core 4.6.3 py38_0 defaults
jupyterlab 2.1.5 py_0 defaults
jupyterlab_server 1.1.5 py_0 defaults
ld_impl_linux-64 2.33.1 h53a641e_7 defaults
libboost 1.71.0 h97c9712_0 defaults
libedit 3.1.20191231 h7b6447c_0 defaults
libevent 2.1.8 h1ba5d50_0 defaults
libffi 3.3 he6710b0_2 defaults
libgcc-ng 9.1.0 hdf63c60_0 defaults
libgfortran-ng 7.3.0 hdf63c60_0 defaults
libpng 1.6.37 hbc83047_0 defaults
libprotobuf 3.11.4 hd408876_0 defaults
libsodium 1.0.18 h7b6447c_0 defaults
libstdcxx-ng 9.1.0 hdf63c60_0 defaults
libuuid 1.0.3 h1bed415_2 defaults
libxcb 1.14 h7b6447c_0 defaults
libxml2 2.9.10 he19cac6_1 defaults
lz4-c 1.8.1.2 h14c3975_0 defaults
markupsafe 1.1.1 py38h7b6447c_0 defaults
mistune 0.8.4 py38h7b6447c_1000 defaults
mkl 2020.1 217 defaults
mkl-service 2.3.0 py38he904b0f_0 defaults
mkl_fft 1.1.0 py38h23d657b_0 defaults
mkl_random 1.1.1 py38h0573a6f_0 defaults
nbconvert 5.6.1 py38_0 defaults
nbformat 5.0.7 py_0 defaults
ncurses 6.2 he6710b0_1 defaults
notebook 6.0.3 py38_0 defaults
numpy 1.18.5 py38ha1c710e_0 defaults
numpy-base 1.18.5 py38hde5b4d6_0 defaults
openssl 1.1.1g h7b6447c_0 defaults
packaging 20.4 py_0 defaults
pandas 1.0.5 py38h0573a6f_0 defaults
pandoc 2.9.2.1 0 defaults
pandocfilters 1.4.2 py38_1 defaults
parso 0.7.0 py_0 defaults
pcre 8.44 he6710b0_0 defaults
pexpect 4.8.0 py38_0 defaults
pickleshare 0.7.5 py38_1000 defaults
pip 20.1.1 py38_1 defaults
prometheus_client 0.8.0 py_0 defaults
prompt-toolkit 3.0.5 py_0 defaults
prompt_toolkit 3.0.5 0 defaults
ptyprocess 0.6.0 py38_0 defaults
py4j 0.10.9 py_0 defaults
pyarrow 0.15.1 py38h0573a6f_0 defaults
pycparser 2.20 py_0 defaults
pygments 2.6.1 py_0 defaults
pyopenssl 19.1.0 py38_0 defaults
pyparsing 2.4.7 py_0 defaults
pyqt 5.9.2 py38h05f1152_4 defaults
pyrsistent 0.16.0 py38h7b6447c_0 defaults
pysocks 1.7.1 py38_0 defaults
pyspark 3.0.0 py_0 defaults
python 3.8.3 hcff3b4d_2 defaults
python-dateutil 2.8.1 py_0 defaults
pytz 2020.1 py_0 defaults
pyzmq 19.0.1 py38he6710b0_1 defaults
qt 5.9.7 h5867ecd_1 defaults
qtconsole 4.7.5 py_0 defaults
qtpy 1.9.0 py_0 defaults
re2 2019.08.01 he6710b0_0 defaults
readline 8.0 h7b6447c_0 defaults
requests 2.24.0 py_0 defaults
scipy 1.5.0 py38h0b6359f_0 defaults
send2trash 1.5.0 py38_0 defaults
setuptools 47.3.1 py38_0 defaults
sip 4.19.13 py38he6710b0_0 defaults
six 1.15.0 py_0 defaults
snappy 1.1.8 he6710b0_0 defaults
sqlite 3.32.3 h62c20be_0 defaults
terminado 0.8.3 py38_0 defaults
testpath 0.4.4 py_0 defaults
thrift-cpp 0.11.0 h02b749d_3 defaults
tk 8.6.10 hbc83047_0 defaults
tornado 6.0.4 py38h7b6447c_1 defaults
traitlets 4.3.3 py38_0 defaults
uriparser 0.9.3 he6710b0_1 defaults
urllib3 1.25.9 py_0 defaults
wcwidth 0.2.5 py_0 defaults
webencodings 0.5.1 py38_1 defaults
wheel 0.34.2 py38_0 defaults
widgetsnbextension 3.5.1 py38_0 defaults
xz 5.2.5 h7b6447c_0 defaults
zeromq 4.3.2 he6710b0_2 defaults
zipp 3.1.0 py_0 defaults
zlib 1.2.11 h7b6447c_3 defaults
zstd 1.3.7 h0b5b093_0 defaults
我也尝试过使用 pyspark 2.4.5(结合 pyarrow 0.8)。结果完全相同。
【问题讨论】:
回答了问题 1:我刚刚发现确实是分区导致了这种行为。我只是通过在 peak_finder 函数中分配一个随机数来做到这一点。 为问题 2 找到了一种解决方法:先转换做 rdd 然后立即返回数据框解决了问题(即添加 .rdd.toDF())。 【参考方案1】:问题 1:不一致的行为确实是由于分区造成的。
问题 2:找到解决方法:先转换为 rdd,然后立即返回数据框解决了问题(即添加 .rdd.toDF())。我不清楚原因,可能是我不明白的背景发生了什么。
【讨论】:
以上是关于pandas_udf 与 pyspark 3.0 的 scipiy.find_peaks 结果不一致的主要内容,如果未能解决你的问题,请参考以下文章
PySpark。将 Dataframe 传递给 pandas_udf 并返回一个系列
如何在 pyspark.sql.functions.pandas_udf 和 pyspark.sql.functions.udf 之间进行选择?
如何在 Pyspark 中使用 @pandas_udf 返回多个数据帧?