python多处理子进程未正常退出

Posted

技术标签:

【中文标题】python多处理子进程未正常退出【英文标题】:python multiprocessing child processes not quiting normally 【发布时间】:2022-01-12 06:20:32 【问题描述】:

我一直在使用 python 多处理来处理一些任务。开发环境是 Windows Server 2016 和 python 3.7.0。 有时会有子进程留在任务列表中。但实际上,它们似乎已完成(数据已写入数据库)。影响是日志卡在那里,无法附加最新的日志。

这里是代码。主函数启动一个监听进程和几个工作进程:

queue = multiprocessing.Queue(-1)
listener = multiprocessing.Process(target=listener_process, args=(queue, listener_configurer))
listener.start()

...

workers = []
for loop:
    worker = multiprocessing.Process(target=process_start, args=(queue, worker_configurer, plist))
    workers.append(worker)
    worker.start()
for w in workers:
    w.join()

...

queue.put_nowait(None)
listener.join()

监听进程在它得到 None 时结束,从而导致整个任务结束。

def listener_process(queue, configurer):
    configurer()
    while True:
        try:
            record = queue.get()
            if record is None:
                break
            if type(record) is not int:
                Logger = logging.getLogger(record.name)
                Logger.handle(record)
        except Exception as e:
            Logger.error(str(e), exc_info=True)

任务由 Windows 任务调度程序安排运行。 知道为什么一些多处理进程“卡”在那里吗? 它困扰了我一段时间。提前致谢。

【问题讨论】:

语义 nitpick:windows 没有“fork”。在 Windows 上没有直接的模拟。仅生成。 "The Impact is that the logging sticked there"是什么意思? 查看这篇文章,只是帮助我解决了听起来类似的问题:pythonspeed.com/articles/python-multiprocessing @Timus,对不起,我没有说清楚。 'Stuck' 表示如果之前的任务留下一些子进程未结束,则当任务按计划再次启动时,日志文件无法更新。 【参考方案1】:

我可以确定您的问题是什么吗?不,我可以肯定地说你正在做一些可能导致僵局的事情吗?是的。

如果您仔细阅读multiprocessing.Queue 上的文档,您将看到以下警告:

警告: 如上所述,如果子进程已将项目放入队列(并且尚未使用JoinableQueue.cancel_join_thread),则该进程将不会终止,直到所有缓冲项目都已刷新到管道。

这意味着如果您尝试加入该进程,您可能会遇到死锁,除非您确定已放入队列的所有项目都已被消耗。同样,如果子进程是非守护进程,则父进程在尝试加入其所有非守护子进程时可能会在退出时挂起。

请注意,使用管理器创建的队列不存在此问题。请参阅编程指南。

这意味着为了完全安全,在加入workers 进程(向队列中发出put)之前,您必须先加入侦听器进程(从队列中发出gets),以确保所有消息都放入在您尝试加入已完成放入队列的任务之前,队列已从队列中读取。

但是,如果当前正在寻找主进程向队列写入None sentinel 消息,表示它正在退出时间但在新设计的主进程必须先等待侦听器终止,然后再等待工作人员终止?大概您可以控制process_start 函数的来源,该函数实现了写入队列的消息的生产者,并且可能有什么触发了它终止的决定。当这些进程终止时,它们必须各自向队列写入None 标记消息,表示它们将不再产生任何消息。然后函数listener_process 必须传递一个附加参数,即消息生产者的数量,以便它知道它应该期望看到多少个这些sentinels。不幸的是,我无法从您编码的内容(即for loop:)中辨别出该进程的数量是多少,并且您似乎正在使用相同的参数实例化每个进程。但为了清楚起见,我会将您的代码修改为更明确的内容:

queue = multiprocessing.Queue(-1)
listener = multiprocessing.Process(target=listener_process, args=(queue, listener_configurer, len(plist)))
listener.start()

...

workers = []
# There will be len(plist) producer of messages:
for p in plist:
    worker = multiprocessing.Process(target=process_start, args=(queue, worker_configurer, p))
    workers.append(worker)
    worker.start()
listener.join() # join the listener first
for w in workers:
    w.join()


....


def listener_process(queue, configurer, n_producers):
    configurer()
    sentinel_count = 0
    while True:
        try:
            record = queue.get()
            if record is None:
                sentinel_count += 1
                if sentinel_count == n_producers:
                    break # we are done
                continue
            if type(record) is not int:
                Logger = logging.getLogger(record.name)
                Logger.handle(record)
        except Exception as e:
            Logger.error(str(e), exc_info=True)

更新

这是一个完整的例子。但是为了避免使用处理程序配置各种记录器的复杂性,我只是使用了一个简单的打印语句。但正如你所见,一切都被“记录”了。

import multiprocessing

def process_start(queue, p):
    for i in range(3):
        queue.put(p)
    queue.put(None) # Sentinel


def listener_process(queue, n_producers):
    sentinel_count = 0
    while True:
        try:
            record = queue.get()
            if record is None:
                sentinel_count += 1
                if sentinel_count == n_producers:
                    break # we are done
                continue
            if type(record) is not int:
                print(record)
        except Exception as e:
            print(e)


class Record:
    def __init__(self, name, value):
        self.name = name
        self.value = value

    def __repr__(self):
        return f'name=self.name, value=self.value'


def main():
    plist = [Record('basic', 'A'), Record('basic', 'B'), Record('basic', 'C')]

    queue = multiprocessing.Queue(-1)
    listener = multiprocessing.Process(target=listener_process, args=(queue, len(plist)))
    listener.start()

    workers = []
    # There will be len(plist) producer of messages:
    for p in plist:
        worker = multiprocessing.Process(target=process_start, args=(queue, p))
        workers.append(worker)
        worker.start()
    listener.join() # join the listener first
    for w in workers:
        w.join()

# Required for Windows:
if __name__ == '__main__':
    main()

打印:

name=basic, value=A
name=basic, value=A
name=basic, value=A
name=basic, value=B
name=basic, value=B
name=basic, value=B
name=basic, value=C
name=basic, value=C
name=basic, value=C

【讨论】:

这解决了您的问题吗? 感谢您的回答。我试过你的解决方案。当工作人员完成并在侦听器中计数时,我让他们将 int 1 放入队列中。侦听器可以按预期退出。但是,如果侦听器在工作人员之前加入而不是在主函数结束时加入。监听器加入后的代码不会被记录。也许是因为所有日志都绑定到同一个队列? 我不确定你在说什么,我已经用一个完整的示例更新了我的示例,用打印语句代替了日志记录以保持简单。确保您遵循此示例。 必须在listener_process 的过程中定义和配置记录器,因为这是记录记录的地方。

以上是关于python多处理子进程未正常退出的主要内容,如果未能解决你的问题,请参考以下文章

多任务编程 -- 孤儿进程和僵尸进程

多任务编程 -- 孤儿进程和僵尸进程

Python 子进程返回错误的退出代码

python threading父进程不死,子线程不退出..如何才能使用完线程后回收线程?

Python 子进程、通信和多处理/多线程

python多处理子进程无法访问全局变量