具有许多进程的 Python multiprocessing.Pool [关闭]

Posted

技术标签:

【中文标题】具有许多进程的 Python multiprocessing.Pool [关闭]【英文标题】:Python multiprocessing.Pool with many processes [closed] 【发布时间】:2013-03-06 15:44:48 【问题描述】:

我正在尝试创建许多并行进程来利用 32 核机器,但是当我查看顶部屏幕时,它只显示了 5 个 Python 进程。这是我的代码:

max_processes = min(len(corpus_paths), cpu_count()*2)
__log.debug("Max processes being used: " + str(max_processes))
pool = Pool(max_processes)
for path in corpus_paths:
    pool.apply_async(...)
pool.close()
pool.join()

这是机器的配置:

[minh.lengoc@compute-1-5 ~]$ lscpu
Architecture:          x86_64
CPU op-mode(s):        32-bit, 64-bit
Byte Order:            Little Endian
CPU(s):                32
On-line CPU(s) list:   0-31
Thread(s) per core:    2
Core(s) per socket:    8
CPU socket(s):         2
NUMA node(s):          4
Vendor ID:             AuthenticAMD
CPU family:            21
Model:                 1
Stepping:              2
CPU MHz:               2099.877
BogoMIPS:              4199.44
Virtualization:        AMD-V
L1d cache:             16K
L1i cache:             64K
L2 cache:              2048K
L3 cache:              6144K
NUMA node0 CPU(s):     0,2,4,6,8,10,12,14
NUMA node1 CPU(s):     16,18,20,22,24,26,28,30
NUMA node2 CPU(s):     1,3,5,7,9,11,13,15
NUMA node3 CPU(s):     17,19,21,23,25,27,29,31

谢谢!


现在可以了。我的代码一定有问题,但我无法回滚查看它是什么。关闭。

【问题讨论】:

让我猜猜:它们都是从同一个磁盘读取的? 请问从同一个磁盘读取意味着什么? 磁盘经常是多核程序的瓶颈。如果进程无法以足够快的速度从磁盘获取工作,它们将遭受资源匮乏的困扰。 @larsmans:尽管如此,仍然会产生超过 5 个进程。 【参考方案1】:

没有使用所有内核的一个可能原因是 pool.apply_async 运行的目标函数完成得太快。这种情况下的解决方案是向目标函数发送更多数据(因此每次调用它会做更多的工作)。

这就像把煤铲进 32 个熔炉。如果你用小铲子,你可能只能在第一个炉子里的煤用完之前到达第五个炉子。然后你必须重新装满第一个熔炉。即使你有一大堆煤,你也永远无法使用所有的熔炉。如果您使用足够大的铲子,那么您可以将所有熔炉都烧掉。

【讨论】:

谢谢,但事实并非如此,因为每个数据块大约 120MB 也许是个愚蠢的问题,但max_processes 报告的值是多少? 是的,我自己问过这个问题。我打印出来是 64。 len(corpus_paths) 是什么? 是 128(我有大约 16GB 的数据,所以我把它分成了很多小部分)【参考方案2】:

我有一个类似的问题,在我的情况下,我使用的是 gearman,并且希望每个核心都有工作人员,最初使用“Pool”,但注意到只有一个工作人员正在处理消息,所以我用下面的代码替换了“Pool”使用所有“核心 - 1”,以便我可以让工作人员同时读取队列:

if __name__ == '__main__':
jobs = []
for i in range(multiprocessing.cpu_count() - 1): 
    p = multiprocessing.Process(target=start_worker)
    jobs.append(p)
    p.start()

for j in jobs:
    j.join()
    print '%s.exitcode = %s' % (j.name, j.exitcode)

你觉得呢?有什么更好的方法/想法来处理这个问题?

【讨论】:

以上是关于具有许多进程的 Python multiprocessing.Pool [关闭]的主要内容,如果未能解决你的问题,请参考以下文章

Python多进程相关的坑

python 进程池的使用

python并发之multiprocessing

Python爬虫提速小技巧,多线程与多进程(附源码示例)

准确确定在 Python 多处理期间腌制的内容

python 对mongodb进行压力测试