python多处理子进程未正常退出
Posted
技术标签:
【中文标题】python多处理子进程未正常退出【英文标题】:python multiprocessing child processes not quiting normally 【发布时间】:2022-01-12 06:20:32 【问题描述】:我一直在使用 python 多处理来处理一些任务。开发环境是 Windows Server 2016 和 python 3.7.0。 有时会有子进程留在任务列表中。但实际上,它们似乎已完成(数据已写入数据库)。影响是日志卡在那里,无法附加最新的日志。
这里是代码。主函数启动一个监听进程和几个工作进程:
queue = multiprocessing.Queue(-1)
listener = multiprocessing.Process(target=listener_process, args=(queue, listener_configurer))
listener.start()
...
workers = []
for loop:
worker = multiprocessing.Process(target=process_start, args=(queue, worker_configurer, plist))
workers.append(worker)
worker.start()
for w in workers:
w.join()
...
queue.put_nowait(None)
listener.join()
监听进程在它得到 None 时结束,从而导致整个任务结束。
def listener_process(queue, configurer):
configurer()
while True:
try:
record = queue.get()
if record is None:
break
if type(record) is not int:
Logger = logging.getLogger(record.name)
Logger.handle(record)
except Exception as e:
Logger.error(str(e), exc_info=True)
任务由 Windows 任务调度程序安排运行。 知道为什么一些多处理进程“卡”在那里吗? 它困扰了我一段时间。提前致谢。
【问题讨论】:
语义 nitpick:windows 没有“fork”。在 Windows 上没有直接的模拟。仅生成。 "The Impact is that the logging sticked there"是什么意思? 查看这篇文章,只是帮助我解决了听起来类似的问题:pythonspeed.com/articles/python-multiprocessing @Timus,对不起,我没有说清楚。 'Stuck' 表示如果之前的任务留下一些子进程未结束,则当任务按计划再次启动时,日志文件无法更新。 【参考方案1】:我可以确定您的问题是什么吗?不,我可以肯定地说你正在做一些可能导致僵局的事情吗?是的。
如果您仔细阅读multiprocessing.Queue
上的文档,您将看到以下警告:
警告: 如上所述,如果子进程已将项目放入队列(并且尚未使用
JoinableQueue.cancel_join_thread
),则该进程将不会终止,直到所有缓冲项目都已刷新到管道。这意味着如果您尝试加入该进程,您可能会遇到死锁,除非您确定已放入队列的所有项目都已被消耗。同样,如果子进程是非守护进程,则父进程在尝试加入其所有非守护子进程时可能会在退出时挂起。
请注意,使用管理器创建的队列不存在此问题。请参阅编程指南。
这意味着为了完全安全,在加入workers
进程(向队列中发出put)之前,您必须先加入侦听器进程(从队列中发出gets),以确保所有消息都放入在您尝试加入已完成放入队列的任务之前,队列已从队列中读取。
但是,如果当前正在寻找主进程向队列写入None
sentinel 消息,表示它正在退出时间但在新设计的主进程必须先等待侦听器终止,然后再等待工作人员终止?大概您可以控制process_start
函数的来源,该函数实现了写入队列的消息的生产者,并且可能有什么触发了它终止的决定。当这些进程终止时,它们必须各自向队列写入None
标记消息,表示它们将不再产生任何消息。然后函数listener_process
必须传递一个附加参数,即消息生产者的数量,以便它知道它应该期望看到多少个这些sentinels。不幸的是,我无法从您编码的内容(即for loop:
)中辨别出该进程的数量是多少,并且您似乎正在使用相同的参数实例化每个进程。但为了清楚起见,我会将您的代码修改为更明确的内容:
queue = multiprocessing.Queue(-1)
listener = multiprocessing.Process(target=listener_process, args=(queue, listener_configurer, len(plist)))
listener.start()
...
workers = []
# There will be len(plist) producer of messages:
for p in plist:
worker = multiprocessing.Process(target=process_start, args=(queue, worker_configurer, p))
workers.append(worker)
worker.start()
listener.join() # join the listener first
for w in workers:
w.join()
....
def listener_process(queue, configurer, n_producers):
configurer()
sentinel_count = 0
while True:
try:
record = queue.get()
if record is None:
sentinel_count += 1
if sentinel_count == n_producers:
break # we are done
continue
if type(record) is not int:
Logger = logging.getLogger(record.name)
Logger.handle(record)
except Exception as e:
Logger.error(str(e), exc_info=True)
更新
这是一个完整的例子。但是为了避免使用处理程序配置各种记录器的复杂性,我只是使用了一个简单的打印语句。但正如你所见,一切都被“记录”了。
import multiprocessing
def process_start(queue, p):
for i in range(3):
queue.put(p)
queue.put(None) # Sentinel
def listener_process(queue, n_producers):
sentinel_count = 0
while True:
try:
record = queue.get()
if record is None:
sentinel_count += 1
if sentinel_count == n_producers:
break # we are done
continue
if type(record) is not int:
print(record)
except Exception as e:
print(e)
class Record:
def __init__(self, name, value):
self.name = name
self.value = value
def __repr__(self):
return f'name=self.name, value=self.value'
def main():
plist = [Record('basic', 'A'), Record('basic', 'B'), Record('basic', 'C')]
queue = multiprocessing.Queue(-1)
listener = multiprocessing.Process(target=listener_process, args=(queue, len(plist)))
listener.start()
workers = []
# There will be len(plist) producer of messages:
for p in plist:
worker = multiprocessing.Process(target=process_start, args=(queue, p))
workers.append(worker)
worker.start()
listener.join() # join the listener first
for w in workers:
w.join()
# Required for Windows:
if __name__ == '__main__':
main()
打印:
name=basic, value=A
name=basic, value=A
name=basic, value=A
name=basic, value=B
name=basic, value=B
name=basic, value=B
name=basic, value=C
name=basic, value=C
name=basic, value=C
【讨论】:
这解决了您的问题吗? 感谢您的回答。我试过你的解决方案。当工作人员完成并在侦听器中计数时,我让他们将 int 1 放入队列中。侦听器可以按预期退出。但是,如果侦听器在工作人员之前加入而不是在主函数结束时加入。监听器加入后的代码不会被记录。也许是因为所有日志都绑定到同一个队列? 我不确定你在说什么,我已经用一个完整的示例更新了我的示例,用打印语句代替了日志记录以保持简单。确保您遵循此示例。 必须在listener_process
的过程中定义和配置记录器,因为这是记录记录的地方。以上是关于python多处理子进程未正常退出的主要内容,如果未能解决你的问题,请参考以下文章