如何在 Apache Spark (pyspark) 中使用自定义类?

Posted

技术标签:

【中文标题】如何在 Apache Spark (pyspark) 中使用自定义类?【英文标题】:How to use custom classes with Apache Spark (pyspark)? 【发布时间】:2015-09-14 14:07:01 【问题描述】:

我已经编写了一个在 python 中实现分类器的类。我想使用 Apache Spark 使用这个分类器并行化大量数据点的分类。

    我在一个有 10 个从属服务器的集群上使用 Amazon EC2 进行设置,基于一个带有 python 的 Anaconda 发行版的 ami。 ami 让我可以远程使用 IPython Notebook。 我已经在 /root/anaconda/lib/python2.7/ 文件夹中的 master 上的一个名为 BoTree.py 的文件中定义了 BoTree 类,这是我所有 python 模块所在的位置 我检查了在从 master 运行命令行 spark 时我可以导入和使用 BoTree.py(我只需从编写 import BoTree 开始,我的类 BoTree 就可用了 我使用 spark 的 /root/spark-ec2/copy-dir.sh 脚本在我的集群中复制 /python2.7/ 目录。 我已经 ssh-ed 到其中一个从属服务器并尝试在那里运行 ipython,并且能够导入 BoTree,所以我认为该模块已成功通过集群发送(我还可以看到 BoTree.py 文件在.../python2.7/ 文件夹) 在我检查过的主服务器上,我可以使用 cPickle 腌制和取消腌制 BoTree 实例,据我所知,这是 pyspark 的序列化程序。

但是,当我执行以下操作时:

import BoTree
bo_tree = BoTree.train(data)
rdd = sc.parallelize(keyed_training_points) #create rdd of 10 (integer, (float, float) tuples
rdd = rdd.mapValues(lambda point, bt = bo_tree: bt.classify(point[0], point[1]))
out = rdd.collect()

Spark 因错误而失败(我认为只是相关位):

  File "/root/spark/python/pyspark/worker.py", line 90, in main
    command = pickleSer.loads(command.value)
  File "/root/spark/python/pyspark/serializers.py", line 405, in loads
    return cPickle.loads(obj)
ImportError: No module named BoroughTree

谁能帮帮我?有点绝望……

谢谢

【问题讨论】:

【参考方案1】:

获得 SparkContext 后,还可以使用addPyFile 随后将模块发送给每个工作人员。

sc.addPyFile('/path/to/BoTree.py')

pyspark.SparkContext.addPyFile(path) documentation

【讨论】:

这太棒了,正是我想要的……干杯。【参考方案2】:

可能最简单的解决方案是在创建SparkContext 时使用pyFiles 参数

from pyspark import SparkContext
sc = SparkContext(master, app_name, pyFiles=['/path/to/BoTree.py'])

放置在那里的每个文件都将发送给工作人员并添加到PYTHONPATH

如果您在交互模式下工作,则必须在创建新上下文之前使用 sc.stop() 停止现有上下文。

还要确保 Spark worker 实际使用的是 Anaconda 发行版,而不是默认的 Python 解释器。根据您的描述,很可能是问题所在。要设置PYSPARK_PYTHON,您可以使用conf/spark-env.sh 文件。

顺便说一句,将文件复制到lib 是一个相当混乱的解决方案。如果您想避免使用 pyFiles 推送文件,我建议您创建纯 Python 包或 Conda 包并正确安装。通过这种方式,您可以轻松跟踪已安装的内容、删除不必要的软件包并避免一些难以调试的问题。

【讨论】:

谢谢。我以交互方式使用 python,所以我无法设置 SparkContext。在这种情况下,我该如何做与 pyFiles 等效的操作?我定义了一个导入 sys 然后返回 sys.executable 的函数。我认为这告诉我我所有的奴隶都在运行 Anaconda。但是,如果我 ssh 进入它们,我可以看到环境变量 PYSPARK_PYTHON 未设置。如何在我的奴隶上编辑 PYTHONPATH? 其实你可以在交互模式下创建一个SparkContext。有关此和 PYSPARK_PYTHON 变量的一些详细信息,请参阅更新的答案

以上是关于如何在 Apache Spark (pyspark) 中使用自定义类?的主要内容,如果未能解决你的问题,请参考以下文章

如何在 Apache Spark (pyspark) 中使用自定义类?

如何有效地将 MySQL 表读入 Apache Spark/PySpark?

如何在 Pyspark 中启用 Apache Arrow

如何在 Apache livy 中提交 pyspark 作业?

Apache Spark:启动 PySpark 时出错

Apache Spark (PySpark) 在读取 CSV 时处理空值