python执行多进程时,如何获取函数返回的值
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了python执行多进程时,如何获取函数返回的值相关的知识,希望对你有一定的参考价值。
p=Pool(20)
with open(temp_path,'r') as f:
for line in f.readlines():
p.apply_async(execute_monitor_sql,args=(type,)) --其中execute_monitor_sql函数会返回一个执行结果。我需要将这些结果汇总到一个参数
p.close()
p.join()
但执行的时候并没有返回任何参数
共享变量的方法。
没有办法直接实现你的需求,但是,你可以用共享变量的方法实现,比如:
def worker(procnum, return_dict):
'''worker function'''
print str(procnum) + ' represent!'
return_dict[procnum] = procnumif __name__ == '__main__':
manager = Manager()
return_dict = manager.dict()
jobs = [] for i in range(5):
p = multiprocessing.Process(target=worker, args=(i,return_dict))
jobs.append(p)
p.start() for proc in jobs:
proc.join() print return_dict.values()
17.2.1.3. Exchanging objects between processes
multiprocessing supports two types of communication
channel between processes:
Queues
The Queue
class is a near clone of queue.Queue. For example:
from multiprocessing import Process, Queue
def f(q):
q.put([42, None, 'hello'])
if __name__ == '__main__':
q = Queue()
p = Process(target=f, args=(q,))
p.start()
print(q.get()) # prints "[42, None, 'hello']"
p.join()
Queues are thread and process safe.
Pipes
The Pipe() function returns a pair of connection objects
connected by a pipe which by default is duplex (two-way). For example:
from multiprocessing import Process, Pipe
def f(conn):
conn.send([42, None, 'hello'])
conn.close()
if __name__ == '__main__':
parent_conn, child_conn = Pipe()
p = Process(target=f, args=(child_conn,))
p.start()
print(parent_conn.recv()) # prints "[42, None, 'hello']"
p.join()
The two connection objects returned by Pipe()
represent the two ends of the pipe. Each connection object has send()
and recv()
methods (among others). Note that data in a pipe may become corrupted if two
processes (or threads) try to read from or write to the same end of the
pipe at the same time. Of course there is no risk of corruption from processes
using different ends of the pipe at the same time.
Python:进程
多任务实现的3种方式:
多进程模式
多线程模式
多进程+多线程模式
线程是最小的执行单元,而进程由至少一个线程组成。
多进程(multiprocessing)
Unix/linux用fork()函数,调用一次返回两个值,子进程永远返回0,夫进程返回子进程的ID。
子进程调用getppid()函数就可以拿到夫进程的ID。
Unix/Linux创建子进程
import os print("Process(%s) start..." %os.getpip()) pid = os.fork() #windows上没有fork调用 if pid == 0: print("I am child process (%s) and my parent is %s" %(os.getpid(), os.getppid())) else: print("I (%s) just created a child process (%s)" %(os.getpid(),pid))
multiprocessing:跨平台多进程
multiprocessing模块提供了Process类来代表一个进程对象
from multiprocessing import Process import os def run_proc(name): print("Run child process %s(%s)" %(name, os.getpid())) if __name__ == "__main__": print("Parent process %s." %os.getpid()) p = Process(target=run_proc, args=(‘test‘,)) #创建子进程,传入一个执行函数和执行函数的参数 print("Child process will start.") p.start() #启动子进程 p.join() #等待子进程结束再继续往下运行,用于进程间的同步 print("Child process end.")
执行结果:
Parent process 928. Process will start. Run child process test (929)... Process end.
Pool:进程池,批量创建子进程
from multiprocessing import Pool import os, time, random def long_time_task(name): print("Run task %s(%s)..." %(name, os.getpid())) start = time.time() time.sleep(random.random()*3) end = time.time() print("Task %s runs %0.2f seconds." %(name, (end-start))) if __name__ == "__main__": print("Parent process %s." %os.getpid()) p = Pool(4) for i in range(5): p.apply_async(long_time_task, args=(i, )) print("Waiting for all subprocess done...") p.close() p.join() print("All subprocess done.")
执行结果:
Parent process 4756. Waiting for all subprocess done... Run task 0(3920)... Run task 1(1688)... Run task 2(2996)... Run task 3(3132)... Task 1 runs 0.32 seconds. Run task 4(1688)... Task 2 runs 0.61 seconds. Task 3 runs 1.41 seconds. Task 4 runs 2.26 seconds. Task 0 runs 2.89 seconds. All subprocess done.
分析:
对Pool
对象调用join()
方法会等待所有子进程执行完毕,调用join()
之前必须先调用close()
,调用close()
之后就不能继续添加新的Process
了。
请注意输出的结果,task 0
,1
,2
,3
是立刻执行的,而task 4
要等待前面某个task完成后才执行,这是因为Pool
的默认大小在我的电脑上是4,因此,最多同时执行4个进程。这是Pool
有意设计的限制,并不是操作系统的限制
子进程(subprocess)
subprocess
模块可以让我们非常方便地启动一个子进程,然后控制其输入和输出
import subprocess #下面的例子演示了如何在Python代码中运行命令nslookup www.python.org,这和命令行直接运行的效果是一样的: print("$ nslookup www.python.rog") r = subprocess.call(["nslookup", "www.python.org"]) print("Exit code:", r)
运行结果:
$ nslookup www.python.org Server: 192.168.19.4 Address: 192.168.19.4#53 Non-authoritative answer: www.python.org canonical name = python.map.fastly.net. Name: python.map.fastly.net Address: 199.27.79.223 Exit code: 0
如果子进程还需要输入,则可通过communicate()方法输入:
import subprocess print(‘$ nslookup‘) p = subprocess.Popen([‘nslookup‘], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) output, err = p.communicate(b‘set q=mx\npython.org\nexit\n‘) print(output.decode(‘utf-8‘)) print(‘Exit code:‘, p.returncode)
进程间通信
Process
之间肯定是需要通信的,操作系统提供了很多机制来实现进程间的通信。Python的multiprocessing
模块包装了底层的机制,提供了Queue
、Pipes
等多种方式来交换数据
以Queue
为例,在父进程中创建两个子进程,一个往Queue
里写数据,一个从Queue
里读数据:
from multiprocessing import Process, Queue import os, time, random # 写数据进程执行的代码: def write(q): print(‘Process to write: %s‘ % os.getpid()) for value in [‘A‘, ‘B‘, ‘C‘]: print(‘Put %s to queue...‘ % value) q.put(value) time.sleep(random.random()) # 读数据进程执行的代码: def read(q): print(‘Process to read: %s‘ % os.getpid()) while True: value = q.get(True) print(‘Get %s from queue.‘ % value) if __name__==‘__main__‘: # 父进程创建Queue,并传给各个子进程: q = Queue() pw = Process(target=write, args=(q,)) pr = Process(target=read, args=(q,)) # 启动子进程pw,写入: pw.start() # 启动子进程pr,读取: pr.start() # 等待pw结束: pw.join() # pr进程里是死循环,无法等待其结束,只能强行终止: pr.terminate()
执行结果:
Process to write: 3872 Put A to queue... Process to read: 4756 Get A from queue. Put B to queue... Get B from queue. Put C to queue... Get C from queue.
以上是关于python执行多进程时,如何获取函数返回的值的主要内容,如果未能解决你的问题,请参考以下文章