python执行多进程时,如何获取函数返回的值

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了python执行多进程时,如何获取函数返回的值相关的知识,希望对你有一定的参考价值。

p=Pool(20)
with open(temp_path,'r') as f:
for line in f.readlines():
p.apply_async(execute_monitor_sql,args=(type,)) --其中execute_monitor_sql函数会返回一个执行结果。我需要将这些结果汇总到一个参数
p.close()
p.join()
但执行的时候并没有返回任何参数

共享变量的方法。

没有办法直接实现你的需求,但是,你可以用共享变量的方法实现,比如:

def worker(procnum, return_dict):


'''worker function'''


print str(procnum) + ' represent!'


return_dict[procnum] = procnumif __name__ == '__main__':


manager = Manager()


return_dict = manager.dict()


jobs = []    for i in range(5):


p = multiprocessing.Process(target=worker, args=(i,return_dict))
jobs.append(p)


p.start()    for proc in jobs:


proc.join()    print return_dict.values()

参考技术A 交换数据一般用queue吧。

17.2.1.3. Exchanging objects between processes

multiprocessing supports two types of communication
channel between processes:

Queues

The Queue
class is a near clone of queue.Queue. For example:

from multiprocessing import Process, Queue

def f(q):
q.put([42, None, 'hello'])

if __name__ == '__main__':
q = Queue()
p = Process(target=f, args=(q,))
p.start()
print(q.get()) # prints "[42, None, 'hello']"
p.join()

Queues are thread and process safe.

Pipes

The Pipe() function returns a pair of connection objects
connected by a pipe which by default is duplex (two-way). For example:

from multiprocessing import Process, Pipe

def f(conn):
conn.send([42, None, 'hello'])
conn.close()

if __name__ == '__main__':
parent_conn, child_conn = Pipe()
p = Process(target=f, args=(child_conn,))
p.start()
print(parent_conn.recv()) # prints "[42, None, 'hello']"
p.join()

The two connection objects returned by Pipe()
represent the two ends of the pipe. Each connection object has send()
and recv()
methods (among others). Note that data in a pipe may become corrupted if two
processes (or threads) try to read from or write to the same end of the
pipe at the same time. Of course there is no risk of corruption from processes
using different ends of the pipe at the same time.

Python:进程

多任务实现的3种方式:

  多进程模式

  多线程模式

  多进程+多线程模式

线程是最小的执行单元,而进程由至少一个线程组成。

多进程(multiprocessing)

Unix/linux用fork()函数,调用一次返回两个值,子进程永远返回0,夫进程返回子进程的ID。

子进程调用getppid()函数就可以拿到夫进程的ID。

Unix/Linux创建子进程

import os

print("Process(%s) start..." %os.getpip())

pid = os.fork() #windows上没有fork调用
if pid == 0:
    print("I am child process (%s) and my parent is %s" %(os.getpid(), os.getppid()))
else:
    print("I (%s) just created a child process (%s)" %(os.getpid(),pid))

multiprocessing:跨平台多进程

multiprocessing模块提供了Process类来代表一个进程对象

from multiprocessing import Process
import os

def run_proc(name):
    print("Run child process %s(%s)" %(name, os.getpid()))

if __name__ == "__main__":
    print("Parent process %s." %os.getpid())
    p = Process(target=run_proc, args=(test,)) #创建子进程,传入一个执行函数和执行函数的参数
    print("Child process will start.")
    p.start() #启动子进程
    p.join()  #等待子进程结束再继续往下运行,用于进程间的同步
    print("Child process end.")

执行结果:

Parent process 928.
Process will start.
Run child process test (929)...
Process end.

Pool:进程池,批量创建子进程

from multiprocessing import Pool
import os, time, random

def long_time_task(name):
    print("Run task %s(%s)..." %(name, os.getpid()))
    start = time.time()
    time.sleep(random.random()*3)
    end = time.time()
    print("Task %s runs %0.2f seconds." %(name, (end-start)))

if __name__ == "__main__":
    print("Parent process %s." %os.getpid())
    p = Pool(4)
    for i in range(5):
        p.apply_async(long_time_task, args=(i, ))
    print("Waiting for all subprocess done...")
    p.close()
    p.join()
    print("All subprocess done.")

执行结果:

Parent process 4756.
Waiting for all subprocess done...
Run task 0(3920)...
Run task 1(1688)...
Run task 2(2996)...
Run task 3(3132)...
Task 1 runs 0.32 seconds.
Run task 4(1688)...
Task 2 runs 0.61 seconds.
Task 3 runs 1.41 seconds.
Task 4 runs 2.26 seconds.
Task 0 runs 2.89 seconds.
All subprocess done.

分析:

Pool对象调用join()方法会等待所有子进程执行完毕,调用join()之前必须先调用close(),调用close()之后就不能继续添加新的Process了。

请注意输出的结果,task 0123是立刻执行的,而task 4要等待前面某个task完成后才执行,这是因为Pool的默认大小在我的电脑上是4,因此,最多同时执行4个进程。这是Pool有意设计的限制,并不是操作系统的限制

子进程(subprocess)

subprocess模块可以让我们非常方便地启动一个子进程,然后控制其输入和输出

import subprocess
#下面的例子演示了如何在Python代码中运行命令nslookup www.python.org,这和命令行直接运行的效果是一样的:
print("$ nslookup www.python.rog")
r = subprocess.call(["nslookup", "www.python.org"])
print("Exit code:", r)

运行结果:

$ nslookup www.python.org
Server:        192.168.19.4
Address:    192.168.19.4#53

Non-authoritative answer:
www.python.org    canonical name = python.map.fastly.net.
Name:    python.map.fastly.net
Address: 199.27.79.223

Exit code: 0

如果子进程还需要输入,则可通过communicate()方法输入:

import subprocess

print($ nslookup)
p = subprocess.Popen([nslookup], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
output, err = p.communicate(bset q=mx\npython.org\nexit\n)
print(output.decode(utf-8))
print(Exit code:, p.returncode)

进程间通信

Process之间肯定是需要通信的,操作系统提供了很多机制来实现进程间的通信。Python的multiprocessing模块包装了底层的机制,提供了QueuePipes等多种方式来交换数据

Queue为例,在父进程中创建两个子进程,一个往Queue里写数据,一个从Queue里读数据:

from multiprocessing import Process, Queue
import os, time, random

# 写数据进程执行的代码:
def write(q):
    print(Process to write: %s % os.getpid())
    for value in [A, B, C]:
        print(Put %s to queue... % value)
        q.put(value)
        time.sleep(random.random())

# 读数据进程执行的代码:
def read(q):
    print(Process to read: %s % os.getpid())
    while True:
        value = q.get(True)
        print(Get %s from queue. % value)

if __name__==__main__:
    # 父进程创建Queue,并传给各个子进程:
    q = Queue()
    pw = Process(target=write, args=(q,))
    pr = Process(target=read, args=(q,))
    # 启动子进程pw,写入:
    pw.start()
    # 启动子进程pr,读取:
    pr.start()
    # 等待pw结束:
    pw.join()
    # pr进程里是死循环,无法等待其结束,只能强行终止:
    pr.terminate()

执行结果:

Process to write: 3872
Put A to queue...
Process to read: 4756
Get A from queue.
Put B to queue...
Get B from queue.
Put C to queue...
Get C from queue.

 

以上是关于python执行多进程时,如何获取函数返回的值的主要内容,如果未能解决你的问题,请参考以下文章

python 多进程获取返回值

Python获取多进程执行的返回值

python的线程如何返回值?

python线程间通信的问题,回答有加分!300

如何从 python 中的多处理中获取函数输出?

Python如何使用多进程加速获取请求