Python 多处理正确完成工作,但进程仍然存在(Linux)

Posted

技术标签:

【中文标题】Python 多处理正确完成工作,但进程仍然存在(Linux)【英文标题】:Python multiprocessing finish the work correctly, but the processes still alive (Linux) 【发布时间】:2021-10-04 04:57:58 【问题描述】:

我使用 python 多处理从一个大文件中计算 DNA 序列的某种分数。 为此,我编写并使用下面的脚本。 我在 python 3.8 环境中使用 48 cpu 的 Linux 机器。

代码工作正常,并正确终止工作并在最后打印处理时间。

问题:当我使用 htop 命令时,发现所有 48 个进程都还活着。 我不知道为什么,也不知道要在脚本中添加什么来避免这种情况。


import csv
import sys
import concurrent.futures
from itertools import combinations
import psutil
import time


nb_cpu = psutil.cpu_count(logical=False)


def fun_job(seq_1, seq_2): # seq_i : (id, string)
    start = time.time()
    score_dist = compute_score_dist(seq_1[1], seq_2[1])
    end = time.time()

    return seq_1[0], seq_2[0], score_dist, end - start # id seq1, id seq2, score, time


def help_fun_job(nested_pair):
    return fun_job(nested_pair[0], nested_pair[1])


def compute_using_multi_processing(list_comb_ids, dict_ids_seqs):
    start = time.perf_counter()

    with concurrent.futures.ProcessPoolExecutor(max_workers=nb_cpu) as executor:
        results = executor.map(help_fun_job,
                               [((pair_ids[0], dict_ids_seqs[pair_ids[0]]), (pair_ids[1], dict_ids_seqs[pair_ids[1]]))
                                for pair_ids in list_comb_ids])

    save_results_to_csv(results)

    finish = time.perf_counter()

    proccessing_time = str(datetime.timedelta(seconds=round(finish - start, 2)))
    print(f' Processing time Finished in proccessing_time hh:mm:ss')

def main():
    print("nb_cpu in this machine : ", nb_cpu)

    file_path = sys.argv[1]

    dict_ids_seqs = get_dict_ids_seqs(file_path)

    list_ids = list(dict_ids_seqs)  # This will convert the dict_keys to a list
    list_combined_ids = list(combinations(list_ids, 2))
    
    compute_using_multi_processing(list_combined_ids, dict_ids_seqs)


if __name__ == '__main__':
    main()

感谢您的帮助。

编辑:添加 fun_job 的完整代码(在@Booboo 回答之后)

from Bio import Align

def fun_job(seq_1, seq_2): # seq_i : (id, string)
    start = time.time()

    aligner = Align.PairwiseAligner()
    aligner.mode = 'global'
    score_dist = aligner.score(seq_1[1],seq_2[1])    

    end = time.time()

    return seq_1[0], seq_2[0], score_dist, end - start # id seq1, id seq2, score, time

【问题讨论】:

你在哪里打电话join() 没有开始,用这个语法加入(concurrent.futures)。 【参考方案1】:

with ... as executor: 块退出时,会隐式调用executor.shutdown(wait=True)。这将等待所有待处理的期货完成执行“并且与执行程序关联的资源已被释放”,这可能包括终止池中的进程(如果可能的话?)。为什么您的程序终止(或终止?)或者至少您说所有期货都已完成执行,而进程尚未终止,这有点神秘。但是您还没有提供fun_job 的代码,所以谁能说这是为什么?

您可能会尝试的一件事是从multiprocessing 模块切换到使用multiprocessing.pool.Pool 类。它支持terminate 方法,该方法在其上下文管理器with 块退出时被隐式调用,显式尝试终止池中的所有进程:

#import concurrent.futures
import multiprocessing
... # etc.

def compute_using_multi_processing(list_comb_ids, dict_ids_seqs):
    start = time.perf_counter()

    with multiprocessing.Pool(processes=nb_cpu) as executor:
        results = executor.map(help_fun_job,
                               [((pair_ids[0], dict_ids_seqs[pair_ids[0]]), (pair_ids[1], dict_ids_seqs[pair_ids[1]]))
                                for pair_ids in list_comb_ids])

    save_results_to_csv(results)

    finish = time.perf_counter()

    proccessing_time = str(datetime.timedelta(seconds=round(finish - start, 2)))
    print(f' Processing time Finished in proccessing_time hh:mm:ss')

【讨论】:

非常感谢您的支持和建议。 fun_job 的代码是 Bio python 的成对全局对齐形式。我会将它添加到上面的源代码中。 Bio Aligner 的函数形式返回完成工作并返回一个正确的值。不过不知道是不是这样,说不定回来后还有什么东西在后面。 使用multiprocessing.Pool 有什么不同吗? 是的,我使用了multiprocessing.Pool,效果很好。非常感谢您的帮助。但是我仍然不明白为什么 concurrent.futures.ProcessPoolExecutor 会导致问题,因为它只是 multiprocessing.Pool 的接口? (multiprocessing.Pool vs concurrent.futures.ProcessPoolExecutor) 首先,concurrent.futures.ProcessPoolExecutor不是基于multiprocessing.pool;这是一个单独的实现。其次,参见What should I do when someone answers my question?,尤其是关于接受答案的部分。另外,您引用的链接,如果您有时间,请查看我对该问题的回答。 完成 :) ,非常感谢 BooBoo :)

以上是关于Python 多处理正确完成工作,但进程仍然存在(Linux)的主要内容,如果未能解决你的问题,请参考以下文章

Python 多处理:所有进程在 5 秒内完成,但程序需要额外 10 秒才能返回读取主脚本

Python多处理:如何在异常时关闭多处理池

子python进程卡住了

甚至在完成后仍运行无限子进程的多处理模块

python多处理子进程未正常退出

多处理时正确检查文件是不是存在