multiprocessing.Pool() 比只使用普通函数慢

Posted

技术标签:

【中文标题】multiprocessing.Pool() 比只使用普通函数慢【英文标题】:multiprocessing.Pool() slower than just using ordinary functions 【发布时间】:2014-01-10 17:13:40 【问题描述】:

(这个问题是关于如何让 multiprocessing.Pool() 运行代码更快,我终于解决了,最终解决方案可以在帖子底部找到。)

原问题:

我正在尝试使用 Python 将一个单词与列表中的许多其他单词进行比较,并检索最相似的单词列表。为此,我使用了 difflib.get_close_matches 函数。我在一台相对较新且功能强大的 Windows 7 笔记本电脑上,使用 Python 2.6.5。

我想要的是加快比较过程,因为我的比较单词列表很长,我必须重复比较过程几次。当我听说多处理模块时,如果比较可以分解为工作任务并同时运行(从而利用机器功率换取更快的速度),我的比较任务会更快完成,这似乎是合乎逻辑的。

但是,即使尝试了许多不同的方法,并使用了文档中显示的方法和论坛帖子中建议的方法,Pool 方法似乎还是非常慢,比仅在一次完整的列表。我想帮助理解为什么 Pool() 这么慢以及我是否正确使用它。我只使用这个字符串比较场景作为例子,因为这是我能想到的最新例子,我无法理解或让多处理工作而不是反对我。下面是一个来自 difflib 场景的示例代码,显示了普通方法和池化方法之间的时间差异:

from multiprocessing import Pool
import random, time, difflib

# constants
wordlist = ["".join([random.choice([letter for letter in "abcdefghijklmnopqersty"]) for lengthofword in xrange(5)]) for nrofwords in xrange(1000000)]
mainword = "hello"

# comparison function
def findclosematch(subwordlist):
    matches = difflib.get_close_matches(mainword,subwordlist,len(subwordlist),0.7)
    if matches <> []:
        return matches

# pool
print "pool method"
if __name__ == '__main__':
    pool = Pool(processes=3)
    t=time.time()
    result = pool.map_async(findclosematch, wordlist, chunksize=100)
    #do something with result
    for r in result.get():
        pass
    print time.time()-t

# normal
print "normal method"
t=time.time()
# run function
result = findclosematch(wordlist)
# do something with results
for r in result:
    pass
print time.time()-t

要查找的单词是“hello”,要查找密切匹配的单词列表是 100 万长的 5 个随机连接字符的列表(仅用于说明目的)。我使用 3 个处理器内核和 map 函数,块大小为 100(我认为每个工人要处理的列表项??)(我也尝试了 1000 和 10 000 的块大小,但没有真正的区别)。请注意,在这两种方法中,我在调用我的函数之前启动计时器,并在遍历结果后立即结束它。正如您在下面看到的,计时结果显然有利于原始的非 Pool 方法:

>>> 
pool method
37.1690001488 seconds
normal method
10.5329999924 seconds
>>> 

Pool 方法几乎比原始方法慢 4 倍。我在这里遗漏了什么,或者对池化/多处理的工作方式有误解吗?我确实怀疑这里的部分问题可能是 map 函数返回 None 并因此将数千个不必要的项目添加到结果列表中,即使我只希望将实际匹配项返回到结果中并将其写入函数中。据我了解,这就是地图的工作原理。我听说过一些其他的功能,比如 filter 只收集非 False 结果,但我不认为 multiprocessing/Pool 支持 filter 方法。除了多处理模块中的 map/imap 之外,还有其他函数可以帮助我只返回函数返回的内容吗?据我了解,Apply 函数更多地用于提供多个参数。

我知道还有 imap 功能,我尝试过但没有任何时间改进。原因与我在理解 itertools 模块的优点时遇到问题的原因相同,据说是“闪电般快”,我注意到调用该函数是正确的,但根据我的经验和我所读到的因为调用函数实际上并不进行任何计算,所以当需要遍历结果以收集和分析它们时(没有它就没有调用函数的意义),它所花费的时间与只需使用普通版本的功能即可。但我想那是另一篇文章。

无论如何,很高兴看到有人可以在这里推动我朝着正确的方向前进,并且非常感谢任何帮助。我对理解多处理一般比让这个例子工作更感兴趣,尽管它对一些示例解决方案代码建议很有用,以帮助我理解。

答案:

似乎放缓与其他进程的启动时间缓慢有关。我无法让 .Pool() 函数足够快。我让它更快的最终解决方案是手动拆分工作负载列表,使用多个 .Process() 而不是 .Pool(),然后在队列中返回解决方案。但我想知道最关键的变化是否可能是根据要查找的主要词而不是要比较的词来划分工作量,也许是因为 difflib 搜索功能已经如此之快。这是同时运行 5 个进程的新代码,结果比运行简单代码快约 10 倍(6 秒对 55 秒)。除了 difflib 已经有多快之外,对于快速模糊查找非常有用。

from multiprocessing import Process, Queue
import difflib, random, time

def f2(wordlist, mainwordlist, q):
    for mainword in mainwordlist:
        matches = difflib.get_close_matches(mainword,wordlist,len(wordlist),0.7)
        q.put(matches)

if __name__ == '__main__':

    # constants (for 50 input words, find closest match in list of 100 000 comparison words)
    q = Queue()
    wordlist = ["".join([random.choice([letter for letter in "abcdefghijklmnopqersty"]) for lengthofword in xrange(5)]) for nrofwords in xrange(100000)]
    mainword = "hello"
    mainwordlist = [mainword for each in xrange(50)]

    # normal approach
    t = time.time()
    for mainword in mainwordlist:
        matches = difflib.get_close_matches(mainword,wordlist,len(wordlist),0.7)
        q.put(matches)
    print time.time()-t

    # split work into 5 or 10 processes
    processes = 5
    def splitlist(inlist, chunksize):
        return [inlist[x:x+chunksize] for x in xrange(0, len(inlist), chunksize)]
    print len(mainwordlist)/processes
    mainwordlistsplitted = splitlist(mainwordlist, len(mainwordlist)/processes)
    print "list ready"

    t = time.time()
    for submainwordlist in mainwordlistsplitted:
        print "sub"
        p = Process(target=f2, args=(wordlist,submainwordlist,q,))
        p.Daemon = True
        p.start()
    for submainwordlist in mainwordlistsplitted:
        p.join()
    print time.time()-t
    while True:
        print q.get()

【问题讨论】:

你试过增加块大小吗?比如 chunksize=100000 左右? 比较苹果和苹果,你应该比较:result = pool.map(findclosematch, wordlist) vs. result = map(findclosematch, wordlist) 然后更改调用,以便 findclosematch() 完成更多工作。否则pickling/unpickling参数将支配运行时间。 不要不要使用&lt;&gt;。它已被 很多 时间弃用,在 python3 中它将引发SyntaxError,因此使用它会使代码的前向兼容性大大降低。请注意,生成进程和进程间通信会花费很多。如果你想减少多进程的时间,你必须确保计算时间足够大,这样开销就无关紧要了。在你的情况下,我认为这不是真的。 另外,if matches: 检查完全没用,可能会产生错误。我只是尝试运行脚本,修改了一些参数并得到了TypeError: NoneType object is not iterable,因为那个虚假检查。 99.9% 的时间函数应该总是返回相同的时间。不要使用None 处理特殊情况的 void 结果,因为您只是在其余代码中使函数结果的处理复杂化。 【参考方案1】:

Pool.map 较慢,因为它需要时间来启动进程,然后将必要的内存从一个进程转移到所有进程,正如 Multimedia Mike 所说。我遇到了类似的问题,我切换到multiprocessing.Process

但是multiprocessing.ProcessPool.map 需要更多时间来启动进程

解决方案:

提前创建流程,将静态数据保存到流程中。 使用队列将数据传递给进程 还可以使用队列从进程接收结果。

通过这种方式,我设法在 3 秒内在带有 Windows 的酷睿 i5 8265U 处理器笔记本电脑上从 100 万个面部特征中搜索到最佳匹配。

代码 - multiprocess_queue_matcher.py:

import multiprocessing

from utils import utils

no_of_processes = 0
input_queues = []
output_queues = []
db_embeddings = []
slices = None


def set_data(no_of_processes1, input_queues1, output_queues1, db_embeddings1):
    global no_of_processes
    no_of_processes = no_of_processes1
    global input_queues
    input_queues = input_queues1
    global output_queues
    output_queues = output_queues1
    global db_embeddings
    print("db_embeddings1 size = " + str(len(db_embeddings1)))
    db_embeddings.extend(db_embeddings1)
    global slices
    slices = chunks()


def chunks():
    size = len(db_embeddings) // no_of_processes
    return [db_embeddings[i:i + size] for i in range(0, len(db_embeddings), size)]


def do_job2(slice, input_queue, output_queue):
    while True:
        emb_to_search = input_queue.get()
        dist1 = 2
        item1 = []
        data_slice = slice
        # emb_to_search = obj[1]
        for item in data_slice:
            emb = item[0]
            dist = utils.calculate_squared_distance(emb_to_search, emb)
            if dist < dist1:
                dist1 = dist
                item1 = item
                item1.append(dist1)
        output_queue.put(item1)
    # if return_value is None:
    #     return item1
    # else:
    #     return_value.set_value(None, item1[1], item1[2], item1[3], item1[4], dist1)


def submit_job(emb):
    for i in range(len(slices)):
        input_queues[i].put(emb)


def get_output_queues():
    return output_queues


def start_processes():
    # slice = self.chunks()
    # ctx = multiprocessing.get_context("spawn")
    # BaseManager.register('FaceData', FaceData)
    # manager = BaseManager()
    # manager.start()
    # return_values = []
    global no_of_processes
    global input_queues
    global output_queues
    processes = []
    pos = 0
    for i in range(no_of_processes):
        p = multiprocessing.Process(target=do_job2, args=(slices[i], input_queues[i], output_queues[i],))
        p.Daemon = True
        processes.append(p)
        pos += 1
        p.start()

然后在需要的地方使用这个模块。

flask 的高级启动代码:

mysql = None

db_operator = None

all_db_embeddings = []

input_queues = []
output_queues = []
no_of_processes = 4


@app.before_first_request
def initialize():
    global mysql
    global db_operator
    mysql = MySQL(app)
    db_operator = DBOperator(mysql)
    ret, db_embeddings, error_message = db_operator.get_face_data_for_all_face_ids_for_all_users()
    all_db_embeddings.extend(db_embeddings)
    for i in range(no_of_processes):
        in_q = multiprocessing.Queue()
        out_q = multiprocessing.Queue()
        input_queues.append(in_q)
        output_queues.append(out_q)
    multiprocess_queue_matcher.set_data(no_of_processes, input_queues, output_queues, all_db_embeddings)
    multiprocess_queue_matcher.start_processes()

在任何请求端点上按需将作业传递给进程

emb_to_match = all_db_embeddings[0][0]
    starttime = time.time()
    multiprocess_queue_matcher.submit_job(emb_to_match)
    outputs = []
    for i in range(no_of_processes):
        out_q = output_queues[i]
        outputs.append(out_q.get())
    max = [None, None, None, None, None, 2.0]
    for val in outputs:
        if val[5] < max[5]:
            max = val
    time_elapsed = time.time() - starttime
    return jsonify(
        "status": "success", "message": "Face search completed", "best_match_faceid": max[1],
         "name": max[2], "distance": max[5], "search_time": time_elapsed)

此代码有什么建议和改进吗?

【讨论】:

【参考方案2】:

这些问题通常归结为以下几点:

您尝试并行化的函数不需要足够的 CPU 资源(即 CPU 时间)来合理化并行化!

当然,当您与 multiprocessing.Pool(8) 并行化时,理论上(但实际上并非如此) 可以得到 8 倍 的加速。

但是,请记住,这不是免费的 - 您获得这种并行化的代价是以下开销:

    为传递给Pool.map(f, iter)iter 中的每个chunk(大小为chunksize)创建一个task 对于每个task
      序列化tasktask's返回值(想想pickle.dumps()) 反序列化 tasktask's 返回值(想想 pickle.loads()) 浪费大量时间在共享内存 Queues 上等待 Locks,而工作进程和父进程 get()put() 从/到这些 Queues
    为每个工作进程调用一次os.fork() 的成本很高。

本质上,当你想使用Pool()时:

    CPU 资源要求高 传递给每个函数调用的数据占用少 相当长的iter 以证明上述 (3) 的一次性成本是合理的。

For a more in-depth exploration, this post and linked talk 演练如何将大量数据传递给Pool.map()和朋友)让您陷入困境。

Raymond Hettinger also talks about proper use of Python's concurrency here.

【讨论】:

请注意,上面的链接引用了我的 Python 波士顿用户组演讲和博客文章。【参考方案3】:

我在另一个问题上遇到了与 Pool 类似的情况。目前我不确定真正的原因......

答案 由 OP Karim Bahgat 编辑,与对我有用的解决方案相同。切换到 Process & Queue 系统后,我能够看到加速与机器的内核数量成正比。

这是一个例子。

def do_something(data):
    return data * 2

def consumer(inQ, outQ):
    while True:
        try:
            # get a new message
            val = inQ.get()

            # this is the 'TERM' signal
            if val is None:
                break;

            # unpack the message
            pos = val[0]  # its helpful to pass in/out the pos in the array
            data = val[1]

            # process the data
            ret = do_something(data)

            # send the response / results
            outQ.put( (pos, ret) )


        except Exception, e:
            print "error!", e
            break

def process_data(data_list, inQ, outQ):
    # send pos/data to workers
    for i,dat in enumerate(data_list):
        inQ.put( (i,dat) )

    # process results
    for i in range(len(data_list)):
        ret = outQ.get()
        pos = ret[0]
        dat = ret[1]
        data_list[pos] = dat


def main():
    # initialize things
    n_workers = 4
    inQ = mp.Queue()
    outQ = mp.Queue()
    # instantiate workers
    workers = [mp.Process(target=consumer, args=(inQ,outQ))
               for i in range(n_workers)]

    # start the workers
    for w in workers:
        w.start()

    # gather some data
    data_list = [ d for d in range(1000)]

    # lets process the data a few times
    for i in range(4):
        process_data(data_list)

    # tell all workers, no more data (one msg for each)
    for i in range(n_workers):
        inQ.put(None)
    # join on the workers
    for w in workers:
        w.join()

    # print out final results  (i*16)
    for i,dat in enumerate(data_list):
        print i, dat

【讨论】:

【参考方案4】:

我最好的猜测是进程间通信 (IPC) 开销。在单进程实例中,单进程有词表。当委派给其他各种进程时,主进程需要不断地将列表的各个部分传递给其他进程。

因此,更好的方法可能是分拆 n 个进程,每个进程负责加载/生成列表的 1/n 段和检查单词是否在列表的那部分。

不过,我不确定如何使用 Python 的多处理库来做到这一点。

【讨论】:

我同意并怀疑存在诸如进程启动时间和通信之类的东西阻碍了我的脚本。我最终使用了 multiprocessing.Process 函数,它允许我手动拆分我的列表并进行 10 倍的时间改进。有关我使用的新代码,请参阅我更新的帖子。

以上是关于multiprocessing.Pool() 比只使用普通函数慢的主要内容,如果未能解决你的问题,请参考以下文章

如何将二维数组作为 multiprocessing.Array 传递给 multiprocessing.Pool?

Python 多进程编程之multiprocessing--Pool

我们啥时候应该调用 multiprocessing.Pool.join?

`multiprocessing.Pool.map()` 似乎安排错误

python之multiprocessing:multiprocessing.Pool

multiprocessing.pool.ApplyResult 的文档在哪里?