Python中的分布式多处理池

Posted

技术标签:

【中文标题】Python中的分布式多处理池【英文标题】:Distributed multiprocessing pool in Python 【发布时间】:2016-07-10 02:30:16 【问题描述】:

我有一段现有的 Python 代码在我的机器的内核上并行运行。它完成的工作基本上是打开一个输入文件,读取内容,执行一些相当繁重的数学运算,将结果写入输出文件,在 for 循环中获取下一个文件并再次执行。为了在多个内核上实现并行,我使用了multiprocessing 库中的Pool 函数。举个简单的例子:

import multiprocessing
import time

data = (
['a', '2'], ['b', '4'], ['c', '6'], ['d', '8'],
['e', '1'], ['f', '3'], ['g', '5'], ['h', '7']
)

def mp_worker((inputs, the_time)):
    print " Processs %s\tWaiting %s seconds" % (inputs, the_time)
    time.sleep(int(the_time))
    print " Process %s\tDONE" % inputs

def mp_handler():
    p = multiprocessing.Pool(8)
    p.map(mp_worker, data)

if __name__ == '__main__':
    mp_handler()

这个例子只是用来展示我是如何在 8 个内核上实现 multiprocessing.Pool 函数的。本质上,我的代码中的 mp_worker 函数要复杂得多,但你明白我的意思。

我开始意识到我正在处理的网络中有几台机器在 99% 的时间里都处于空闲状态。因此,我想知道是否有办法在这段代码中利用他们的核心以及我的本地核心。

在伪代码中,代码可能变成这样:

def mp_handler():
    p = multiprocessing.Pool(servers=['local host', 192.168.0.1, 192.168.0.2], ncores=[8,8,4])
    p.map(mp_worker, data)

我现在可以将我的本地计算机和其他 IP 地址指定为服务器,以及我想在每台计算机上使用的内核数。

由于我网络上的其他机器归我所有且未连接互联网,因此出于安全目的我不会担心使用 SSH。

谷歌搜索我注意到pathosscoop 库可能可以帮助我解决这个问题。看起来pathos 的命令与multiprocessing 库非常相似,这对我很有吸引力。但是,在这两种情况下,我都找不到一个简单的示例来展示如何将本地并行作业转换为分布式并行作业。我渴望尽可能接近multiprocessing 库的池/地图功能。

任何帮助或示例将不胜感激!

【问题讨论】:

签出wiki.python.org/moin/ParallelProcessing 【参考方案1】:

pathos 的例子很像你的伪代码。

from pathos.parallel import stats
from pathos.parallel import ParallelPool as Pool
pool = Pool()

def host(id):
    import socket
    import time
    time.sleep(1.0)
    return "Rank: %d -- %s" % (id, socket.gethostname())


print "Evaluate 10 items on 2 cpus"
pool.ncpus = 2
pool.servers = ('localhost:5653',)
res5 = pool.map(host, range(10))
print pool
print '\n'.join(res5)
print stats()
print ''

如上,您可以在初始化Pool 实例时将ncpusservers 设置为关键字。

结果如下所示:

Evaluate 10 items on 2 cpus
<pool ParallelPool(ncpus=2, servers=('localhost:5653',))>
Rank: 0 -- hilbert.local
Rank: 1 -- hilbert.local
Rank: 2 -- hilbert.local
Rank: 3 -- hilbert.local
Rank: 4 -- hilbert.local
Rank: 5 -- hilbert.local
Rank: 6 -- hilbert.local
Rank: 7 -- hilbert.local
Rank: 8 -- hilbert.local
Rank: 9 -- hilbert.local
Job execution statistics:
 job count | % of all jobs | job time sum | time per job | job server
        10 |        100.00 |      10.0459 |     1.004588 | local
Time elapsed since server creation 5.0402431488
0 active tasks, 2 cores

如果您有多个服务器,并且可能有远程服务器,您只需向servers 元组添加更多条目。所以这不是一个完美的例子,因为它没有准确地展示如何让服务器在另一台机器上运行。但是,这是一个很好的例子,如果您确实计划使用 ssh tunnel,您应该知道您不会将 pathos 指向远程机器,而是使用隧道端口指向 localhost … 并连接到远程机器。

由于pathos 使用ppft(这是pp 的一个分支),您可以查看pp 中有关如何设置远程服务器的示例。基本上,你可以用一个 shell 脚本来做这样的事情:

for i in $nodes
do
    ssh -f $i /home/username/bin/ppserver.py -p $portnum -w 2 -t 30 &
done

这里的循环是接收到的节点(节点)。对于每个节点,使用ssh -f 命令启动ppserver,指定端口(-p)、两个工作人员(-w)和空闲 30 秒后超时(-t)。请参阅pp 文档 (http://www.parallelpython.com/content/view/15/30)。使用pathos,您只需要启动ppserver 并指定端口即可使其工作。然后,将主机名和端口添加到第一个代码块中的 server 元组中。

但是,如果您不喜欢手动设置,pathos 确实提供了设置 tunnelppserver 的脚本。使用脚本不如手动操作灵活,而且在出现问题时更难诊断……但尽管如此……请在此处查看脚本:https://github.com/uqfoundation/pathos/tree/master/scripts。

【讨论】:

还有一些事情:(1)我是pathos作者,(2)分布式计算相当脆弱,所以要预先警告,事情会在某个时候失败,并留下一团糟清理,(3)你要分发的函数的成本必须高于连接到分布式集群、启动 python 实例和隧道对象的成本,以及(4)你有在所有机器上安装相同版本的ppft,否则会出错。

以上是关于Python中的分布式多处理池的主要内容,如果未能解决你的问题,请参考以下文章

具有分布式集群的 Python 多处理

python爬虫之线程池和进程池

python多处理池:我怎么知道池中的所有工作人员何时完成?

python中的多处理[破池进程]

Python都需要那些技术???

如何为多处理池中的单个进程分配 python 请求会话?