Python多处理脚本部分输出

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Python多处理脚本部分输出相关的知识,希望对你有一定的参考价值。

我遵循这个post中规定的原则来安全地输出最终将被写入文件的结果。不幸的是,代码只打印1和2,而不是3到6。

import os
import argparse
import pandas as pd
import multiprocessing
from multiprocessing import Process, Queue
from time import sleep


def feed(queue, parlist):

    for par in parlist:
            queue.put(par)
    print("Queue size", queue.qsize())

def calc(queueIn, queueOut):

    while True:
        try:
            par=queueIn.get(block=False)
            res=doCalculation(par)
            queueOut.put((res))
            queueIn.task_done()
        except:
            break

def doCalculation(par):

    return par

def write(queue):
    while True:
        try:
            par=queue.get(block=False)
            print("response:",par)
        except:
            break


if __name__ == "__main__":

    nthreads = 2
    workerQueue = Queue()
    writerQueue = Queue()

    considerperiod=[1,2,3,4,5,6]

    feedProc = Process(target=feed, args=(workerQueue, considerperiod))
    calcProc = [Process(target=calc, args=(workerQueue, writerQueue)) for i in range(nthreads)]
    writProc = Process(target=write, args=(writerQueue,))

    feedProc.start()
    feedProc.join()
    for p in calcProc:
        p.start()

    for p in calcProc:
        p.join()
    writProc.start()
    writProc.join()

在运行它打印的代码时,

$ python3 tst.py
Queue size 6
response: 1
response: 2

此外,是否可以确保写入功能始终输出1,2,3,4,5,6,即按照与馈送队列相同的顺序输出?

答案

错误是以某种方式与task_done()调用。如果你删除那个,那么它的工作原理,不要问我为什么(IMO这是一个错误)。但它的工作方式是queueIn.get(block=False)调用抛出异常,因为队列是空的。这可能仅适用于您的用例,但更好的方法是使用标记(如multiprocessing docs, see last example中所述)。这里有一点重写,所以你的程序使用了哨兵:

import os
import argparse
import multiprocessing
from multiprocessing import Process, Queue
from time import sleep

def feed(queue, parlist, nthreads):
    for par in parlist:
        queue.put(par)
    for i in range(nthreads):
        queue.put(None)
    print("Queue size", queue.qsize())

def calc(queueIn, queueOut):
    while True:
        par=queueIn.get()
        if par is None:
            break
        res=doCalculation(par)
        queueOut.put((res))

def doCalculation(par):
    return par

def write(queue):
    while not queue.empty():
        par=queue.get()
        print("response:",par)


if __name__ == "__main__":
    nthreads = 2
    workerQueue = Queue()
    writerQueue = Queue()

    considerperiod=[1,2,3,4,5,6]

    feedProc = Process(target=feed, args=(workerQueue, considerperiod, nthreads))
    calcProc = [Process(target=calc, args=(workerQueue, writerQueue)) for i in range(nthreads)]
    writProc = Process(target=write, args=(writerQueue,))

    feedProc.start()
    feedProc.join()
    for p in calcProc:
        p.start()

    for p in calcProc:
        p.join()
    writProc.start()
    writProc.join()

有几点需要注意:

  • 哨兵将None放入队列。请注意,每个工作进程都需要一个标记。
  • 对于write函数,你不需要进行哨兵处理,因为只有一个进程,你不需要处理并发性(如果你在empty()函数中做get()然后calc thingie你会遇到问题,如果例如,队列中只剩下一个项目,两个工人同时检查empty(),然后两个都想做get(),然后其中一个永久锁定)
  • 你不需要将feedwrite放入进程中,只需将它们放入主函数中,因为你不想并行运行它。

如何在输出中输出与输入相同的顺序? [...]我想multiprocessing.map可以做到这一点

是的map keeps the order。将程序重写为更简单的东西(因为您不需要workerQueuewriterQueue并添加随机睡眠以证明输出仍然有序:

from multiprocessing import Pool
import time
import random    

def calc(val):
    time.sleep(random.random())
    return val

if __name__ == "__main__":
    considerperiod=[1,2,3,4,5,6]
    with Pool(processes=2) as pool:
        print(pool.map(calc, considerperiod))

以上是关于Python多处理脚本部分输出的主要内容,如果未能解决你的问题,请参考以下文章

在 Python 多处理进程中运行较慢的 OpenCV 代码片段

常用python日期日志获取内容循环的代码片段

从批处理文件运行多处理arcpy python

错误或功能?无法在python脚本中执行两个连续的多处理步骤

代码片段:Shell脚本实现重复执行和多进程

python-shell和多处理不打印