Python多进程multiprocessing使用示例
Posted 水·域
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Python多进程multiprocessing使用示例相关的知识,希望对你有一定的参考价值。
mutilprocess简介
像线程一样管理进程,这个是mutilprocess的核心,他与threading很是相像,对多核CPU的利用率会比threading好的多。
import multiprocessing def worker(num): """thread worker function""" print \'Worker:\', num return if __name__ == \'__main__\': jobs = [] for i in range(5): p = multiprocessing.Process(target=worker, args=(i,)) jobs.append(p) p.start()
确定当前的进程,即是给进程命名,方便标识区分,跟踪
import multiprocessing import time def worker(): name = multiprocessing.current_process().name print name, \'Starting\' time.sleep(2) print name, \'Exiting\' def my_service(): name = multiprocessing.current_process().name print name, \'Starting\' time.sleep(3) print name, \'Exiting\' if __name__ == \'__main__\': service = multiprocessing.Process(name=\'my_service\', target=my_service) worker_1 = multiprocessing.Process(name=\'worker 1\', target=worker) worker_2 = multiprocessing.Process(target=worker) # default name worker_1.start() worker_2.start() service.start()
守护进程就是不阻挡主程序退出,自己干自己的 mutilprocess.setDaemon(True)
就这句等待守护进程退出,要加上join,join可以传入浮点数值,等待n久就不等了
import multiprocessing import time import sys def daemon(): name = multiprocessing.current_process().name print \'Starting:\', name time.sleep(2) print \'Exiting :\', name def non_daemon(): name = multiprocessing.current_process().name print \'Starting:\', name print \'Exiting :\', name if __name__ == \'__main__\': d = multiprocessing.Process(name=\'daemon\', target=daemon) d.daemon = True n = multiprocessing.Process(name=\'non-daemon\', target=non_daemon) n.daemon = False d.start() n.start() d.join(1) print \'d.is_alive()\', d.is_alive() n.join()
最好使用 poison pill,强制的使用terminate()注意 terminate之后要join,使其可以更新状态
import multiprocessing import time def slow_worker(): print \'Starting worker\' time.sleep(0.1) print \'Finished worker\' if __name__ == \'__main__\': p = multiprocessing.Process(target=slow_worker) print \'BEFORE:\', p, p.is_alive() p.start() print \'DURING:\', p, p.is_alive() p.terminate() print \'TERMINATED:\', p, p.is_alive() p.join() print \'JOINED:\', p, p.is_alive()
- == 0 未生成任何错误
- 0 进程有一个错误,并以该错误码退出
- < 0 进程由一个-1 * exitcode信号结束
import multiprocessing import sys import time def exit_error(): sys.exit(1) def exit_ok(): return def return_value(): return 1 def raises(): raise RuntimeError(\'There was an error!\') def terminated(): time.sleep(3) if __name__ == \'__main__\': jobs = [] for f in [exit_error, exit_ok, return_value, raises, terminated]: print \'Starting process for\', f.func_name j = multiprocessing.Process(target=f, name=f.func_name) jobs.append(j) j.start() jobs[-1].terminate() for j in jobs: j.join() print \'%15s.exitcode = %s\' % (j.name, j.exitcode)
方便的调试,可以用logging
import multiprocessing import logging import sys def worker(): print \'Doing some work\' sys.stdout.flush() if __name__ == \'__main__\': multiprocessing.log_to_stderr() logger = multiprocessing.get_logger() logger.setLevel(logging.INFO) p = multiprocessing.Process(target=worker) p.start() p.join()
利用class来创建进程,定制子类
import multiprocessing class Worker(multiprocessing.Process): def run(self): print \'In %s\' % self.name return if __name__ == \'__main__\': jobs = [] for i in range(5): p = Worker() jobs.append(p) p.start() for j in jobs: j.join()
import multiprocessing class MyFancyClass(object): def __init__(self, name): self.name = name def do_something(self): proc_name = multiprocessing.current_process().name print \'Doing something fancy in %s for %s!\' % \\ (proc_name, self.name) def worker(q): obj = q.get() obj.do_something() if __name__ == \'__main__\': queue = multiprocessing.Queue() p = multiprocessing.Process(target=worker, args=(queue,)) p.start() queue.put(MyFancyClass(\'Fancy Dan\')) # Wait for the worker to finish queue.close() queue.join_thread() p.join() import multiprocessing import time class Consumer(multiprocessing.Process): def __init__(self, task_queue, result_queue): multiprocessing.Process.__init__(self) self.task_queue = task_queue self.result_queue = result_queue def run(self): proc_name = self.name while True: next_task = self.task_queue.get() if next_task is None: # Poison pill means shutdown print \'%s: Exiting\' % proc_name self.task_queue.task_done() break print \'%s: %s\' % (proc_name, next_task) answer = next_task() self.task_queue.task_done() self.result_queue.put(answer) return class Task(object): def __init__(self, a, b): self.a = a self.b = b def __call__(self): time.sleep(0.1) # pretend to take some time to do the work return \'%s * %s = %s\' % (self.a, self.b, self.a * self.b) def __str__(self): return \'%s * %s\' % (self.a, self.b) if __name__ == \'__main__\': # Establish communication queues tasks = multiprocessing.JoinableQueue() results = multiprocessing.Queue() # Start consumers num_consumers = multiprocessing.cpu_count() * 2 print \'Creating %d consumers\' % num_consumers consumers = [ Consumer(tasks, results) for i in xrange(num_consumers) ] for w in consumers: w.start() # Enqueue jobs num_jobs = 10 for i in xrange(num_jobs): tasks.put(Task(i, i)) # Add a poison pill for each consumer for i in xrange(num_consumers): tasks.put(None) # Wait for all of the tasks to finish tasks.join() # Start printing results while num_jobs: result = results.get() print \'Result:\', result num_jobs -= 1
Event提供一种简单的方法,可以在进程间传递状态信息。事件可以切换设置和未设置状态。通过使用一个可选的超时值,时间对象的用户可以等待其状态从未设置变为设置。
import multiprocessing import time def wait_for_event(e): """Wait for the event to be set before doing anything""" print \'wait_for_event: starting\' e.wait() print \'wait_for_event: e.is_set()->\', e.is_set() def wait_for_event_timeout(e, t): """Wait t seconds and then timeout""" print \'wait_for_event_timeout: starting\' e.wait(t) print \'wait_for_event_timeout: e.is_set()->\', e.is_set() if __name__ == \'__main__\': e = multiprocessing.Event() w1 = multiprocessing.Process(name=\'block\', target=wait_for_event, args=(e,)) w1.start() w2 = multiprocessing.Process(name=\'nonblock\', target=wait_for_event_timeout, args=(e, 2)) w2.start() print \'main: waiting before calling Event.set()\' time.sleep(3) e.set() print \'main: event is set\'
Python多进程,一般的情况是Queue来传递。
from multiprocessing import Process, Queue def f(q): q.put([42, None, \'hello\']) if __name__ == \'__main__\': q = Queue() p = Process(target=f, args=(q,)) p.start() print q.get() # prints "[42, None, \'hello\']" p.join()
import Queue import threading import time exitFlag = 0 class myThread (threading.Thread): def __init__(self, threadID, name, q): threading.Thread.__init__(self) self.threadID = threadID self.name = name self.q = q def run(self): print "Starting " + self.name process_data(self.name, self.q) print "Exiting " + self.name def process_data(threadName, q): while not exitFlag: queueLock.acquire() if not workQueue.empty(): data = q.get() queueLock.release() print "%s processing %s" % (threadName, data) else: queueLock.release() time.sleep(1) threadList = ["Thread-1", "Thread-2", "Thread-3"] nameList = ["One", "Two", "Three", "Four", "Five"] queueLock = threading.Lock() workQueue = Queue.Queue(10) threads = [] threadID = 1 # Create new threads for tName in threadList: thread = myThread(threadID, tName, workQueue) thread.start() threads.append(thread) threadID += 1 # Fill the queue queueLock.acquire() for word in nameList: workQueue.put(word) queueLock.release() # Wait for queue to empty while not workQueue.empty(): pass # Notify threads it\'s time to exit exitFlag = 1 # Wait for all threads to complete for t in threads: t.join() print "Exiting Main Thread"
多进程使用Queue通信的例子
import time from multiprocessing import Process,Queue MSG_QUEUE = Queue(5) def startA(msgQueue): while True: if msgQueue.empty() > 0: print (\'queue is empty %d\' % (msgQueue.qsize())) else: msg = msgQueue.get() print( \'get msg %s\' % (msg,)) time.sleep(1) def startB(msgQueue): while True: msgQueue.put(\'hello world\') print( \'put hello world queue size is %d\' % (msgQueue.qsize(),)) time.sleep(3) if __name__ == \'__main__\': processA = Process(target=startA,args=(MSG_QUEUE,)) processB = Process(target=startB,args=(MSG_QUEUE,)) processA.start() print( \'processA start..\')
主进程定义了一个Queue类型的变量,并作为Process的args参数传给子进程processA和processB,两个进程一个向队列中写数据,一个读数据。
以上是关于Python多进程multiprocessing使用示例的主要内容,如果未能解决你的问题,请参考以下文章
python多进程-----multiprocessing包