如何在 pyspark 中获取 Python 库?

Posted

技术标签:

【中文标题】如何在 pyspark 中获取 Python 库?【英文标题】:How do I get Python libraries in pyspark? 【发布时间】:2016-03-25 09:11:54 【问题描述】:

我想在 pyspark 中使用 matplotlib.bblpath 或 shapely.geometry 库。

当我尝试导入其中任何一个时,我收到以下错误:

>>> from shapely.geometry import polygon
Traceback (most recent call last):
 File "<stdin>", line 1, in <module>
ImportError: No module named shapely.geometry

我知道该模块不存在,但是如何将这些包带到我的 pyspark 库中?

【问题讨论】:

我想将它安装在 pyspark 中,而不是我的本地机器中。此命令在 pyspark shell 中不起作用。 这可能与 ***.com/q/29495435/1711188 重复 shipping python modules in pyspark to other nodes?的可能重复 【参考方案1】:

在 Spark 上下文中尝试使用:

SparkContext.addPyFile("module.py")  # also .zip

,引用自docs:

为要在此执行的所有任务添加 .py 或 .zip 依赖项 未来的 SparkContext。传递的路径可以是本地的 文件,HDFS(或其他 Hadoop 支持的文件系统)中的文件,或 HTTP、HTTPS 或 FTP URI。

【讨论】:

我可以添加这个依赖。当我进行火花提交时,有没有办法做到这一点。我正在执行 file.py 的 spark-submit,我应该在该文件中执行 addPyFile("module.py") 还是有办法通过向 spark-submit 命令添加参数来添加依赖项 从 Spark 文档 (spark.apache.org/docs/1.1.0/submitting-applications.html) 通过参数添加一个 py 文件(将其放在搜索路径中)似乎是可行的。但是我不知道 PySpark 的提交 API 是否有任何不同。 好的,我会在一个参数和我的文件中尝试它。两种方式都可以看到有效的方法。 有没有人以这种方式成功上传 .zip 文件?上传包时它对我不起作用,即使是那些没有依赖项的。 @ivan_bilan 晚了,但是......有一个类似的问题,让 addPyFile() 为我工作。请在此处查看完整帖子:***.com/q/51450462/8236733。该问题可以作为示例对您有所帮助,而答案至少可能是一个有用的调试步骤。【参考方案2】:

这就是我在我们的 AWS EMR 集群中的工作方式(在任何其他集群中也应该相同)。我创建了以下 shell 脚本并将其作为引导操作执行:

#!/bin/bash
# shapely installation
wget http://download.osgeo.org/geos/geos-3.5.0.tar.bz2
tar jxf geos-3.5.0.tar.bz2
cd geos-3.5.0 && ./configure --prefix=$HOME/geos-bin && make && make install
sudo cp /home/hadoop/geos-bin/lib/* /usr/lib
sudo /bin/sh -c 'echo "/usr/lib" >> /etc/ld.so.conf'
sudo /bin/sh -c 'echo "/usr/lib/local" >> /etc/ld.so.conf'
sudo /sbin/ldconfig
sudo /bin/sh -c 'echo -e "\nexport LD_LIBRARY_PATH=/usr/lib" >> /home/hadoop/.bashrc'
source /home/hadoop/.bashrc
sudo pip install shapely
echo "Shapely installation complete"
pip install https://pypi.python.org/packages/74/84/fa80c5e92854c7456b591f6e797c5be18315994afd3ef16a58694e1b5eb1/Geohash-1.0.tar.gz
#
exit 0

注意:该脚本可以在集群中的每个节点上独立执行,而不是作为引导操作运行。我已经测试了这两种情况。

以下是一个示例 pyspark 和 shapely 代码 (Spark SQL UDF),以确保上述命令按预期工作:

Python 2.7.10 (default, Dec  8 2015, 18:25:23) 
[GCC 4.8.3 20140911 (Red Hat 4.8.3-9)] on linux2
Type "help", "copyright", "credits" or "license" for more information.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 1.6.1
      /_/

Using Python version 2.7.10 (default, Dec  8 2015 18:25:23)
SparkContext available as sc, HiveContext available as sqlContext.
>>> from pyspark.sql.functions import udf
>>> from pyspark.sql.types import StringType
>>> from shapely.wkt import loads as load_wkt
>>> def parse_region(region):
...     from shapely.wkt import loads as load_wkt
...     reverse_coordinate = lambda coord: ' '.join(reversed(coord.split(':')))
...     coordinate_list = map(reverse_coordinate, region.split(', '))
...     if coordinate_list[0] != coordinate_list[-1]:
...         coordinate_list.append(coordinate_list[0])
...     return str(load_wkt('POLYGON ((%s))' % ','.join(coordinate_list)).wkt)
... 
>>> udf_parse_region=udf(parse_region, StringType())
16/09/06 22:18:34 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 1.2.0
16/09/06 22:18:34 WARN ObjectStore: Failed to get database default, returning NoSuchObjectException
>>> df = sqlContext.sql('select id, bounds from <schema.table_name> limit 10')
>>> df2 = df.withColumn('bounds1', udf_parse_region('bounds'))
>>> df2.first()
Row(id=u'0089d43a-1b42-4fba-80d6-dda2552ee08e', bounds=u'33.42838509594465:-119.0533447265625, 33.39170168789402:-119.0203857421875, 33.29992542601392:-119.0478515625', bounds1=u'POLYGON ((-119.0533447265625 33.42838509594465, -119.0203857421875 33.39170168789402, -119.0478515625 33.29992542601392, -119.0533447265625 33.42838509594465))')
>>> 

谢谢, 侯赛因·博拉

【讨论】:

【参考方案3】:

这是在独立环境中(即笔记本电脑/台式机)还是在集群环境中(例如 AWS EMR)?

    如果在您的笔记本电脑/台式机上,pip install shapely 应该可以正常工作。您可能需要检查默认 Python 环境的环境变量。例如,如果您通常使用 Python 3,但将 Python 2 用于 pyspark,那么您将无法使用 pyspark。

    如果在AWS EMR等集群环境下,可以试试:

    import os
    
    def myfun(x):`
            os.system("pip install shapely")
            return x
    rdd = sc.parallelize([1,2,3,4]) ## assuming 4 worker nodes
    rdd.map(lambda x: myfun(x)).collect() 
    ## call each cluster to run the code to import the library
    

“我知道该模块不存在,但我想知道如何将这些包带到我的 pyspark 库中。”

在 EMR 上,如果您希望 pyspark 预先准备好您想要的任何其他库和配置,您可以使用引导步骤进行这些调整。除此之外,如果不在 Scala 中编译 Spark,就无法将库“添加”到 pyspark(如果您不熟悉 SBT,这样做会很痛苦)。

【讨论】:

这个问题是如果它正在使用中,则无法在节点3上安装包。 您可以在启动 EMR 时使用 bash 脚本(希望您在 AWS 上使用 EMR)来安装所有需要的库。这是“引导安装步骤” @user48956 在更新您需要的所有内容之前,您不得导入任何可能会更新的第三方包。【参考方案4】:

我从 AWS Docs 中找到了一个使用 SparkContext 的绝佳解决方案。我可以使用这个添加 Pandas 和其他包:

Using SparkContext to add packages to notebook with PySpark Kernel in EMR

sc.install_pypi_package("pandas==0.25.1")

【讨论】:

我认为 install_pypi_package 只会安装在驱动节点中,但这些库在工作人员中不可用

以上是关于如何在 pyspark 中获取 Python 库?的主要内容,如果未能解决你的问题,请参考以下文章

如何从 Pyspark / Python 数据集中先前计算的列中获取值

如何使用 Python 或 Pyspark 或 scala 在数据块中获取笔记本的作业运行结果日志

在 python / pyspark 中获取 k-means 质心和异常值

python, pyspark : 获取 pyspark 数据框列值的总和

pyspark 中使用集群未安装的python三方库:加载虚拟python环境

如何使用 pyspark 获取 Delta 表的所有当前分区?