在 pyspark 作业中发送和使用 virtualenv

Posted

技术标签:

【中文标题】在 pyspark 作业中发送和使用 virtualenv【英文标题】:Shipping and using virtualenv in a pyspark job 【发布时间】:2017-09-07 00:29:36 【问题描述】:

问题:我正在尝试将 spark-submit 脚本从本地机器运行到机器集群。集群完成的工作使用 numpy。我目前收到以下错误:

ImportError: 
Importing the multiarray numpy extension module failed.  Most
likely you are trying to import a failed build of numpy.
If you're working with a numpy git repo, try `git clean -xdf` (removes all
files not under version control).  Otherwise reinstall numpy.

Original error was: cannot import name multiarray

详细信息: 在我的本地环境中,我设置了一个 virtualenv,其中包括 numpy 以及我在项目和其他各种库中使用的私有 repo。我从 venv/lib/site-packages 的站点包目录创建了一个 zip 文件 (lib/libs.zip),其中“venv”是我的虚拟环境。我将此 zip 发送到远程节点。我用于执行 spark-submit 的 shell 脚本如下所示:

$SPARK_HOME/bin/spark-submit \
  --deploy-mode cluster \
  --master yarn \
  --conf spark.pyspark.virtualenv.enabled=true  \
  --conf spark.pyspark.virtualenv.type=native \
  --conf spark.pyspark.virtualenv.requirements=$parent/requirements.txt \
  --conf spark.pyspark.virtualenv.bin.path=$parent/venv \
  --py-files "$parent/lib/libs.zip" \
  --num-executors 1 \
  --executor-cores 2 \
  --executor-memory 2G \
  --driver-memory 2G \
  $parent/src/features/pi.py

我还知道在远程节点上有一个 /usr/local/bin/python2.7 文件夹,其中包含一个 python 2.7 安装。

所以在我的 conf/spark-env.sh 我设置了以下内容:

export PYSPARK_PYTHON=/usr/local/bin/python2.7
export PYSPARK_DRIVER_PYTHON=/usr/local/bin/python2.7

当我运行脚本时,我得到了上面的错误。如果我屏幕打印 installed_distributions,我会得到一个零长度列表 []。我的私人图书馆也正确导入(这对我说它实际上是在访问我的 libs.zip 站点包。)。我的 pi.py 文件看起来像这样:

from myprivatelibrary.bigData.spark import spark_context
spark = spark_context()
import numpy as np
spark.parallelize(range(1, 10)).map(lambda x: np.__version__).collect()

期望/我的想法: 我希望这可以正确导入 numpy,尤其是因为我知道 numpy 在我的本地 virtualenv 中可以正常工作。我怀疑这是因为我实际上并没有使用安装在远程节点上的 virtualenv 中的 python 版本。我的问题是首先,如何解决这个问题,其次如何在远程节点上使用我的 virtualenv 安装的 python,而不是刚刚手动安装且当前位于这些机器上的 python?我看过一些关于这方面的文章,但坦率地说,它们写得不好。

【问题讨论】:

【参考方案1】:

使用--conf spark.pyspark.export PYSPARK_PYTHON=/usr/local/bin/python2.7,您可以为本地环境/驱动程序设置选项。要为集群(执行程序)设置选项,请使用以下语法:

--conf spark.yarn.appMasterEnv.PYSPARK_PYTHON

此外,我想您应该创建您的virtualenv relocatable(不过,这是实验性的)。 <edit 20170908> 这意味着 virtualenv 使用相对链接而不是绝对链接。 </edit>

在这种情况下我们做了什么:我们通过 hdfs 发布了整个 anaconda 发行版。

<edit 20170908>

如果我们谈论的是不同的环境(MacOs 与 Linux,如下面的评论中提到的),您不能只提交一个 virtualenv,至少如果您的 virtualenv 包含带有二进制文件的包(如 numpy 的情况)。在这种情况下,我建议您自己创建一个“便携式”anaconda,即在 Linux VM 中安装 Anaconda 并对其进行压缩。

关于--archives--py-files

--py-files 将 python 文件/包添加到 python 路径。来自spark-submit documentation:

对于 Python 应用程序,只需传递 .py 文件代替 JAR,然后使用 --py-files 将 Python .zip、.egg 或 .py 文件添加到搜索路径。

--archives 表示将这些提取到每个执行器的工作目录中(仅限纱线集群)。

但是,在我看来,缺乏清晰的区别 - 参见例如 this SO post。

在给定的情况下,通过--archives 添加anaconda.zip,并通过--py-files 添加您的“其他python 文件”。

</edit>

另请参阅:Running Pyspark with Virtualenv,Henning Kropp 的博文。

【讨论】:

要为执行者设置PYSPARK_PYTHON,你应该使用spark.executorEnv.PYSPARK_PYTHON而不是spark.yarn.appMasterEnv.PYSPARK_PYTHON。根据spark文档spark.yarn.appMasterEnv实际上在yarn cluster模式下运行时管理驱动程序的环境,在yarn client模式下运行时只管理执行程序LAUNCHERS的环境。 @jkukul 在 Spark Docs 的注释中指出 When running Spark on YARN in cluster mode, environment variables need to be set using the spark.yarn.appMasterEnv.[EnvironmentVariableName] property [...]。 OP 要求这样做(yarncluster 模式下)。这当然也适用于执行程序——我们(遗憾地)在集群上只有 Python2.7,但以这种方式分发 Python3.6(Anaconda)——Spark 无法在 Driver 和 Executors 上使用不同的 Python 版本。

以上是关于在 pyspark 作业中发送和使用 virtualenv的主要内容,如果未能解决你的问题,请参考以下文章

在 scala 中编写 udf 函数并在 pyspark 作业中使用它们

如何检查 Dataproc 上 pyspark 作业的每个执行程序/节点内存使用指标?

使用 PySpark 和 Step Functions 处理 Sagemaker 作业

如何使用 Python 或 Pyspark 或 scala 在数据块中获取笔记本的作业运行结果日志

在 google-dataproc 的 Spark 集群中的 pyspark 作业中使用外部库

如何在 pyspark 的结构化流作业中运行地图转换