具有分布式集群的 Python 多处理
Posted
技术标签:
【中文标题】具有分布式集群的 Python 多处理【英文标题】:Python Multiprocessing with Distributed Cluster 【发布时间】:2015-01-08 17:03:08 【问题描述】:我正在寻找一个 python 包,它不仅可以在单台计算机中跨不同内核进行多处理,还可以在分布在多台机器上的集群中进行多处理。有很多不同的用于分布式计算的 Python 包,但大多数似乎都需要更改代码才能运行(例如,指示对象位于远程计算机上的前缀)。具体来说,我想要尽可能接近多处理pool.map
函数的东西。因此,例如,如果在一台机器上,脚本是:
from multiprocessing import Pool
pool = Pool(processes = 8)
resultlist = pool.map(function, arglist)
那么分布式集群的伪代码将是:
from distprocess import Connect, Pool, Cluster
pool1 = Pool(processes = 8)
c = Connect(ipaddress)
pool2 = c.Pool(processes = 4)
cluster = Cluster([pool1, pool2])
resultlist = cluster.map(function, arglist)
【问题讨论】:
可能需要比您想要的更多的设置,但您可以查看 celery 的分布式任务队列。 docs.celeryproject.org/en/latest/getting-started/… 我会去看看jug 我可能最终会使用 celery,但它需要进行大量设置,并且帮助文件很难遵循(脱节,而不是清晰的分步说明以及整个脚本的副本)结束)。 Jug 的文档讨论并行化,但不讨论跨不同计算机的并行化。 还有一个非常全面的解决方案列表:wiki.python.org/moin/ParallelProcessing 另外值得注意的是,有一个更新的解决方案类似于pathos
——一个名为dask
的包。
【参考方案1】:
如果您想要一个非常简单的解决方案,那么没有。
但是,有一个解决方案具有multiprocessing
接口——pathos
——它能够通过并行映射建立与远程服务器的连接,并进行多处理。
如果您想建立 ssh 隧道连接,您可以这样做……或者如果您可以使用不太安全的方法,您也可以这样做。
>>> # establish a ssh tunnel
>>> from pathos.core import connect
>>> tunnel = connect('remote.computer.com', port=1234)
>>> tunnel
Tunnel('-q -N -L55774:remote.computer.com:1234 remote.computer.com')
>>> tunnel._lport
55774
>>> tunnel._rport
1234
>>>
>>> # define some function to run in parallel
>>> def sleepy_squared(x):
... from time import sleep
... sleep(1.0)
... return x**2
...
>>> # build a pool of servers and execute the parallel map
>>> from pathos.pp import ParallelPythonPool as Pool
>>> p = Pool(8, servers=('localhost:55774',))
>>> p.servers
('localhost:55774',)
>>> y = p.map(sleepy_squared, x)
>>> y
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
或者,您可以配置为直接连接(无 ssh)
>>> p = Pool(8, servers=('remote.computer.com:5678',))
# use an asynchronous parallel map
>>> res = p.amap(sleepy_squared, x)
>>> res.get()
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
这有点挑剔,要使远程服务器正常工作,您必须事先在指定端口上启动运行在remote.computer.com
上的服务器——并且您必须确保本地主机和远程服务器上的设置主机将允许直接连接或 ssh 隧道连接。另外,您需要在每台主机上运行相同版本的pathos
和pathos
fork 的pp
。此外,对于 ssh,您需要运行 ssh-agent 以允许使用 ssh 进行无密码登录。
但是,希望一切正常……如果您的功能代码可以使用dill.source.importable
传输到远程主机。
仅供参考,pathos
早就应该发布了,基本上,在新的稳定版本被删除之前,有一些错误和界面更改需要解决。
【讨论】:
我应该提到我是pathos
作者。
我强烈建议您从头到尾创建详细的设置指南,以便在服务器上运行 server.py 文件和客户端上运行 client.py 文件后,客户端可以实际访问服务器并运行跨客户端和服务器池化的作业。阅读此答案和您对我的其他问题的回答后,我仍然不确定如何 (a) 设置服务器或 (b) 与服务器建立安全连接。
我不知道该怎么做(a)。设置服务器是否意味着如果 SSH 身份验证正确,则有一台将运行 python 的服务器?我相信您假设 (b) 是由 openSSH 在 python 之外处理的?在您提供的示例中,您似乎建立了连接,但是tunnel
对象不再使用,remote.computer.com
不包含在您创建的下一个池中。它在“相反您可以配置为直接连接(无 ssh)”中被引用,但我真的不明白它是如何工作的,因为没有 SSH 我如何能够对服务器进行身份验证?
创建隧道时,隧道会将本地端口链接到远程端口。因此,您的计算机只需将所有请求发送到本地端口,隧道就会使用 SSH 将其通过管道传输到远程服务器。您只需要 SSH 来建立隧道,所以只需要调用它一次。从那时起,您可以通过与您自己的本地端口通信来通过安全隧道传输不安全的通信。如果您不使用隧道,则必须告诉池连接到远程服务器。查看一些有关 ssh 隧道如何工作的文档。 Pathos 只是为您设置了一个。
如果您使用pp
与远程服务器通信,您需要在远程主机上运行ppserver
。如果您正在使用其他东西(zmq,...),那么您需要运行该类型的服务器。 Pathos 确实有一些代码可以在远程主机上为您启动服务器,但它并不完全健壮,因为您需要保存 jobid 引用以将其关闭,否则您需要登录并找出正在运行的作业是你的服务器。您也可以使用 pathos 远程执行此操作,但如果您不习惯杀死 unix 进程,这并不是您真正想要的。【参考方案2】:
我建议看看 Ray,它的目的就是为了做到这一点。
Ray 在单机多核设置中使用与在分布式设置中相同的语法来并行化代码。如果您愿意使用 for 循环而不是 map 调用,那么您的示例将如下所示。
import ray
import time
ray.init()
@ray.remote
def function(x):
time.sleep(0.1)
return x
arglist = [1, 2, 3, 4]
result_ids = [function.remote(x) for x in arglist]
resultlist = ray.get(result_ids)
这将使用您在本地拥有的许多内核并行运行四个任务。要在集群上运行相同的示例,唯一会改变的行是对ray.init()
的调用。相关文档可以在here找到。
请注意,我正在帮助开发 Ray。
【讨论】:
【参考方案3】:在这里聚会有点晚了,但由于我也在寻找类似的解决方案,而这个问题仍未标记为已回答,我想我会贡献我的发现。
我最终使用了SCOOP。它提供了一个并行映射实现,可以跨多个内核、跨多个主机工作。如果在调用期间需要,它还可以回退到 Python 的串行 map
函数。
从SCOOP的介绍页面中,它引用了以下特点:
SCOOP 相对于期货、多处理和 类似的模块如下:
通过网络利用多台计算机的功能; 能够在一个任务中生成多个任务; API 与PEP-3148 兼容; 只需少量修改即可并行化串行代码; 高效的负载平衡。
它确实有一些怪癖(函数/类必须是可腌制的),如果它们不都共享相同的文件系统架构,那么让事情在多个主机上顺利运行的设置可能会很乏味,但总的来说我很高兴结果。为了我们的目的,做了很多 Numpy 和 Cython,它提供了出色的性能。
希望这会有所帮助。
【讨论】:
SCOOP
是一个比pathos
功能更弱但支持更好的软件包……但是,这仍然是一个不错的选择。据我所知,pathos
和 SCOOP
是仅有的两个提供分层并行/分布式地图的此类包。
感谢您的建议。你能多说一点/你有让多个节点运行的示例代码吗?这个链接是我能找到的最接近实际设置它并在多台机器上运行的东西,但它严重不足。 scoop.readthedocs.org/en/0.7/install.html#remote-usage
@MikeMcKerns,我也看过 Apache Spark。你能解释一下这与 pathos(或 SCOOP)有何不同吗?
@迈克尔。设置多个节点并不是那么棘手。 [此处] (scoop.readthedocs.org/en/0.7/…) 的文档以 Hostfile 格式涵盖了这一点。但是,确保主(代理)和所有远程主机具有完全相同的目录布局、代码、依赖项以及对任何外部数据的访问,这将有助于使事情变得更容易,并为您节省大量的调试时间。在我们的案例中,在启动之前将数据和代码同步到所有工作主机是有意义的。 SCOOP 社区很小,但也很有帮助。
@MikeMcKerns:我们在 SCOOP 上用 literally 两行代码并行化了我们的代码。一行用于导入语句 (from scoop import futures
),另一行将 Python 的内置串行映射替换为 SCOOP 的映射 (futures.map(func,arraydata)
)。再简单不过了。【参考方案4】:
你看过disco吗?
特点:
Map / Reduce 范例 Python 编程 分布式共享磁盘 ssh 底层传输 Web 和控制台界面 轻松添加/阻止/删除节点 master 在没有用户干预的情况下启动 slaves 节点 slave 节点在发生故障时会自动重启 很好的文档。在Install Guide 之后,我能够在几分钟内启动一个 2 机集群(我唯一需要做的就是创建 $DISCO_HOME/root 文件夹以连接到 WebUI,我猜是由于创建日志文件错误)。迪斯科文档中的一个简单示例:
from disco.core import Job, result_iterator
def map(line, params):
for word in line.split():
yield word, 1
def reduce(iter, params):
from disco.util import kvgroup
for word, counts in kvgroup(sorted(iter)):
yield word, sum(counts)
if __name__ == '__main__':
job = Job().run(input=["http://discoproject.org/media/text/chekhov.txt"],
map=map,
reduce=reduce)
for word, count in result_iterator(job.wait(show=True)):
print(word, count)
【讨论】:
以上是关于具有分布式集群的 Python 多处理的主要内容,如果未能解决你的问题,请参考以下文章