Python多处理:如何创建x个进程并返回返回值

Posted

技术标签:

【中文标题】Python多处理:如何创建x个进程并返回返回值【英文标题】:Python multiprocessing: how to create x number of processes and get return value back 【发布时间】:2021-10-15 22:31:38 【问题描述】:

我有一个使用线程创建的程序,但后来我了解到线程不会在 python 中并发运行,而进程可以。结果,我试图使用多处理重写程序,但我很难这样做。我尝试了几个示例来展示如何创建进程和池,但我不认为这正是我想要的。

以下是我尝试过的代码。该程序试图通过在包含圆的图形上随机放置点来估计 pi ​​的值。该程序有两个命令行参数:一个是我要创建的线程/进程的数量,另一个是尝试在图表上放置的点的总数 (N)。

import math
import sys
from time import time
import concurrent.futures
import random
import multiprocessing as mp


def myThread(arg):
    # Take care of imput argument
    n = int(arg)
    print("Thread received. n = ", n)

    # main calculation loop
    count = 0
    for i in range (0, n):
        x = random.uniform(0,1)
        y = random.uniform(0,1)
        d = math.sqrt(x * x + y * y)
        if (d < 1):
            count = count + 1
    print("Thread found ", count, " points inside circle.")
    return count;
        
# end myThread

# receive command line arguments
if (len(sys.argv) == 3):
    N = sys.argv[1]  # original ex: 0.01
    N = int(N)
    totalThreads = sys.argv[2]
    totalThreads = int(totalThreads)
    print("N = ", N)
    print("totalThreads = ", totalThreads)
else:
    print("Incorrect number of arguments!")
    sys.exit(1)

if ((totalThreads == 1) or (totalThreads == 2) or (totalThreads == 4) or (totalThreads == 8)):
    print()
else:
    print("Invalid number of threads. Please use 1, 2, 4, or 8 threads.")
    sys.exit(1)

# start experiment
t = int(time() * 1000)  # begin run time
total = 0

# ATTEMPT 1
# processes = []
# for i in range(totalThreads):
#     process = mp.Process(target=myThread, args=(N/totalThreads))
#     processes.append(process)
#     process.start()
    
# for process in processes:
#     process.join()

# ATTEMPT 2
#pool = mp.Pool(mp.cpu_count())
#total = pool.map(myThread, [N/totalThreads])

# ATTEMPT 3
#for i in range(totalThreads):
    #total = total + pool.map(myThread, [N/totalThreads])
#    p = mp.Process(target=myThread, args=(N/totalThreads))
#    p.start()

# ATTEMPT 4
# with concurrent.futures.ThreadPoolExecutor() as executor:
#     for i in range(totalThreads):
#         future = executor.submit(myThread, N/totalThreads) # start thread
#         total = total + future.result() # get result

# analyze results
pi = 4 * total / N
print("pi estimate =", pi)
delta_time = int(time() * 1000) - t # calculate time required
print("Time =", delta_time, " milliseconds")

我认为创建一个从 0 到 totalThreads 的循环,为每次迭代创建一个进程是可行的。我还想传入 N/totalThreads(以划分工作),但似乎进程接受一个可迭代列表而不是传递给方法的参数。

多处理缺少什么?甚至有可能对流程做我想做的事情吗? 提前感谢您的帮助,非常感谢:)

【问题讨论】:

简短的回答是你可以用进程做你想做的事。但不清楚你的目标是什么。您的函数 myThread 看起来会执行得非常快(除非传递的参数很大)。与传统的多线程相比,在构建和启动进程时会有更大的(并且在您的情况下可能很重要)开销。所以,如果你追求的是性能,我会坚持使用线程 @DarkKnight 这是一个练习程序,用于我要做的涉及大量计算的事情。主要是这样做,以便在迁移到更大的程序之前,我可以熟悉 python 中的线程/处理是如何工作的 你应该将参数作为元组传递,所以你想要args=(N/totalThreads,)而不是args=(N/totalThreads)。进程通常比线程更昂贵,并且在某些情况下开销可能太大。如果你真的想快速做到这一点,你应该考虑使用numba。从字面上看,他们的首页以蒙特卡罗模拟为例。 【参考方案1】:

我已经简化了您的代码并使用了一些可能合理也可能不合理的硬编码值。

import math
import concurrent.futures
import random
from datetime import datetime


def myThread(arg):
    count = 0
    for i in range(0, arg[0]):
        x = random.uniform(0, 1)
        y = random.uniform(0, 1)
        d = math.sqrt(x * x + y * y)
        if (d < 1):
            count += 1
    return count


N = 10_000
T = 8

_start = datetime.now()
with concurrent.futures.ThreadPoolExecutor() as executor:
    futures = executor.submit(myThread, (int(N / T),)): _ for _ in range(T)
    total = 0
    for future in concurrent.futures.as_completed(futures):
        total += future.result()
_end = datetime.now()
print(f'Estimate for PI = 4 * total / N')
print(f'Run duration = _end-_start')

我机器上的典型输出如下所示:-

估计 PI = 3.1472 运行时长 = 0:00:00.008895

请记住,您启动的线程数由 ThreadPoolExecutor (TPE) [在没有参数构造时] 有效管理。它根据机器的处理能力(内核数量等)决定可以运行的线程数。因此,如果您真的愿意,您可以将 T 设置为一个非常高的数字,并且 TPE 将阻止任何新线程的执行,直到它确定有容量。

【讨论】:

将futures保存为字典中的键,其值是submit的对应参数,如果您需要在处理完成而不是提交顺序时恢复参数,则很有用你在这里做。 但是没有恢复这里的参数,因此将期货保存在列表中更有意义。另外,为什么 submit 的参数是一个带有单个 int 元素的元组,而不是单个 int 元素?如果这是一个常数,为什么还要通过它呢? 顺便说一下,不使用多线程或多处理,即`total = 0;对于_在范围内(T):; total += myThread(int(N / T)) ` 运行得更快。也就是说,无论是多线程还是多处理,这都不是一个好的候选。 @Booboo 我完全同意在这种情况下多线程/处理是多余的。但是,OP 确实说过他的示例是一个“实践”程序,用于更实质性的东西,我猜它可能适合 MP 或 MT。我的代码显示了它是如何完成的 - 不一定是应该如何完成 我的最后一条评论更多是为了 OP 的利益而不是您的利益,两个 cmets 只是一些观察;我确实赞成你的回答。

以上是关于Python多处理:如何创建x个进程并返回返回值的主要内容,如果未能解决你的问题,请参考以下文章

Python如何使用多进程加速获取请求

Python多处理 - 产生的进程终止时主进程不会继续

如何使用线程去执行一个 有返回值的方法,并获取返回值?

Python 多处理返回结果,记录并在 Windows 上冻结运行

python如何将多个进程同时运行成千上万个进程

python执行多进程时,如何获取函数返回的值