Python多处理脚本部分输出
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Python多处理脚本部分输出相关的知识,希望对你有一定的参考价值。
我遵循这个post中规定的原则来安全地输出最终将被写入文件的结果。不幸的是,代码只打印1和2,而不是3到6。
import os
import argparse
import pandas as pd
import multiprocessing
from multiprocessing import Process, Queue
from time import sleep
def feed(queue, parlist):
for par in parlist:
queue.put(par)
print("Queue size", queue.qsize())
def calc(queueIn, queueOut):
while True:
try:
par=queueIn.get(block=False)
res=doCalculation(par)
queueOut.put((res))
queueIn.task_done()
except:
break
def doCalculation(par):
return par
def write(queue):
while True:
try:
par=queue.get(block=False)
print("response:",par)
except:
break
if __name__ == "__main__":
nthreads = 2
workerQueue = Queue()
writerQueue = Queue()
considerperiod=[1,2,3,4,5,6]
feedProc = Process(target=feed, args=(workerQueue, considerperiod))
calcProc = [Process(target=calc, args=(workerQueue, writerQueue)) for i in range(nthreads)]
writProc = Process(target=write, args=(writerQueue,))
feedProc.start()
feedProc.join()
for p in calcProc:
p.start()
for p in calcProc:
p.join()
writProc.start()
writProc.join()
在运行它打印的代码时,
$ python3 tst.py
Queue size 6
response: 1
response: 2
此外,是否可以确保写入功能始终输出1,2,3,4,5,6,即按照与馈送队列相同的顺序输出?
答案
错误是以某种方式与task_done()
调用。如果你删除那个,那么它的工作原理,不要问我为什么(IMO这是一个错误)。但它的工作方式是queueIn.get(block=False)
调用抛出异常,因为队列是空的。这可能仅适用于您的用例,但更好的方法是使用标记(如multiprocessing docs, see last example中所述)。这里有一点重写,所以你的程序使用了哨兵:
import os
import argparse
import multiprocessing
from multiprocessing import Process, Queue
from time import sleep
def feed(queue, parlist, nthreads):
for par in parlist:
queue.put(par)
for i in range(nthreads):
queue.put(None)
print("Queue size", queue.qsize())
def calc(queueIn, queueOut):
while True:
par=queueIn.get()
if par is None:
break
res=doCalculation(par)
queueOut.put((res))
def doCalculation(par):
return par
def write(queue):
while not queue.empty():
par=queue.get()
print("response:",par)
if __name__ == "__main__":
nthreads = 2
workerQueue = Queue()
writerQueue = Queue()
considerperiod=[1,2,3,4,5,6]
feedProc = Process(target=feed, args=(workerQueue, considerperiod, nthreads))
calcProc = [Process(target=calc, args=(workerQueue, writerQueue)) for i in range(nthreads)]
writProc = Process(target=write, args=(writerQueue,))
feedProc.start()
feedProc.join()
for p in calcProc:
p.start()
for p in calcProc:
p.join()
writProc.start()
writProc.join()
有几点需要注意:
- 哨兵将
None
放入队列。请注意,每个工作进程都需要一个标记。 - 对于
write
函数,你不需要进行哨兵处理,因为只有一个进程,你不需要处理并发性(如果你在empty()
函数中做get()
然后calc
thingie你会遇到问题,如果例如,队列中只剩下一个项目,两个工人同时检查empty()
,然后两个都想做get()
,然后其中一个永久锁定) - 你不需要将
feed
和write
放入进程中,只需将它们放入主函数中,因为你不想并行运行它。
如何在输出中输出与输入相同的顺序? [...]我想multiprocessing.map可以做到这一点
是的map keeps the order。将程序重写为更简单的东西(因为您不需要workerQueue
和writerQueue
并添加随机睡眠以证明输出仍然有序:
from multiprocessing import Pool
import time
import random
def calc(val):
time.sleep(random.random())
return val
if __name__ == "__main__":
considerperiod=[1,2,3,4,5,6]
with Pool(processes=2) as pool:
print(pool.map(calc, considerperiod))
以上是关于Python多处理脚本部分输出的主要内容,如果未能解决你的问题,请参考以下文章
在 Python 多处理进程中运行较慢的 OpenCV 代码片段