multiprocessing
Posted 十年闷油瓶
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了multiprocessing相关的知识,希望对你有一定的参考价值。
# # This module shows how to use arbitrary callables with a subclass of # `BaseManager`. # # Copyright (c) 2006-2008, R Oudkerk # All rights reserved. # from multiprocessing import freeze_support from multiprocessing.managers import BaseManager, BaseProxy import operator ## class Foo(object): def f(self): print ‘you called Foo.f()‘ def g(self): print ‘you called Foo.g()‘ def _h(self): print ‘you called Foo._h()‘ # A simple generator function def baz(): for i in xrange(10): yield i*i # Proxy type for generator objects class GeneratorProxy(BaseProxy): _exposed_ = (‘next‘, ‘__next__‘) def __iter__(self): return self def next(self): return self._callmethod(‘next‘) def __next__(self): return self._callmethod(‘__next__‘) # Function to return the operator module def get_operator_module(): return operator ## class MyManager(BaseManager): pass # register the Foo class; make `f()` and `g()` accessible via proxy MyManager.register(‘Foo1‘, Foo) # register the Foo class; make `g()` and `_h()` accessible via proxy MyManager.register(‘Foo2‘, Foo, exposed=(‘g‘, ‘_h‘)) # register the generator function baz; use `GeneratorProxy` to make proxies MyManager.register(‘baz‘, baz, proxytype=GeneratorProxy) # register get_operator_module(); make public functions accessible via proxy MyManager.register(‘operator‘, get_operator_module) ## def test(): manager = MyManager() manager.start() print ‘-‘ * 20 f1 = manager.Foo1() f1.f() f1.g() assert not hasattr(f1, ‘_h‘) assert sorted(f1._exposed_) == sorted([‘f‘, ‘g‘]) print ‘-‘ * 20 f2 = manager.Foo2() f2.g() f2._h() assert not hasattr(f2, ‘f‘) assert sorted(f2._exposed_) == sorted([‘g‘, ‘_h‘]) print ‘-‘ * 20 it = manager.baz() for i in it: print ‘<%d>‘ % i, print print ‘-‘ * 20 op = manager.operator() print ‘op.add(23, 45) =‘, op.add(23, 45) print ‘op.pow(2, 94) =‘, op.pow(2, 94) print ‘op.getslice(range(10), 2, 6) =‘, op.getslice(range(10), 2, 6) print ‘op.repeat(range(5), 3) =‘, op.repeat(range(5), 3) print ‘op._exposed_ =‘, op._exposed_ ## if __name__ == ‘__main__‘: freeze_support() test() Using Pool: # # A test of `multiprocessing.Pool` class # # Copyright (c) 2006-2008, R Oudkerk # All rights reserved. # import multiprocessing import time import random import sys # # Functions used by test code # def calculate(func, args): result = func(*args) return ‘%s says that %s%s = %s‘ % ( multiprocessing.current_process().name, func.__name__, args, result ) def calculatestar(args): return calculate(*args) def mul(a, b): time.sleep(0.5*random.random()) return a * b def plus(a, b): time.sleep(0.5*random.random()) return a + b def f(x): return 1.0 / (x-5.0) def pow3(x): return x**3 def noop(x): pass # # Test code # def test(): print ‘cpu_count() = %d\n‘ % multiprocessing.cpu_count() # # Create pool # PROCESSES = 4 print ‘Creating pool with %d processes\n‘ % PROCESSES pool = multiprocessing.Pool(PROCESSES) print ‘pool = %s‘ % pool print # # Tests # TASKS = [(mul, (i, 7)) for i in range(10)] + [(plus, (i, 8)) for i in range(10)] results = [pool.apply_async(calculate, t) for t in TASKS] imap_it = pool.imap(calculatestar, TASKS) imap_unordered_it = pool.imap_unordered(calculatestar, TASKS) print ‘Ordered results using pool.apply_async():‘ for r in results: print ‘\t‘, r.get() print print ‘Ordered results using pool.imap():‘ for x in imap_it: print ‘\t‘, x print print ‘Unordered results using pool.imap_unordered():‘ for x in imap_unordered_it: print ‘\t‘, x print print ‘Ordered results using pool.map() --- will block till complete:‘ for x in pool.map(calculatestar, TASKS): print ‘\t‘, x print # # Simple benchmarks # N = 100000 print ‘def pow3(x): return x**3‘ t = time.time() A = map(pow3, xrange(N)) print ‘\tmap(pow3, xrange(%d)):\n\t\t%s seconds‘ % (N, time.time() - t) t = time.time() B = pool.map(pow3, xrange(N)) print ‘\tpool.map(pow3, xrange(%d)):\n\t\t%s seconds‘ % (N, time.time() - t) t = time.time() C = list(pool.imap(pow3, xrange(N), chunksize=N//8)) print ‘\tlist(pool.imap(pow3, xrange(%d), chunksize=%d)):\n\t\t%s‘ ‘ seconds‘ % (N, N//8, time.time() - t) assert A == B == C, (len(A), len(B), len(C)) print L = [None] * 1000000 print ‘def noop(x): pass‘ print ‘L = [None] * 1000000‘ t = time.time() A = map(noop, L) print ‘\tmap(noop, L):\n\t\t%s seconds‘ % (time.time() - t) t = time.time() B = pool.map(noop, L) print ‘\tpool.map(noop, L):\n\t\t%s seconds‘ % (time.time() - t) t = time.time() C = list(pool.imap(noop, L, chunksize=len(L)//8)) print ‘\tlist(pool.imap(noop, L, chunksize=%d)):\n\t\t%s seconds‘ % (len(L)//8, time.time() - t) assert A == B == C, (len(A), len(B), len(C)) print del A, B, C, L # # Test error handling # print ‘Testing error handling:‘ try: print pool.apply(f, (5,)) except ZeroDivisionError: print ‘\tGot ZeroDivisionError as expected from pool.apply()‘ else: raise AssertionError(‘expected ZeroDivisionError‘) try: print pool.map(f, range(10)) except ZeroDivisionError: print ‘\tGot ZeroDivisionError as expected from pool.map()‘ else: raise AssertionError(‘expected ZeroDivisionError‘) try: print list(pool.imap(f, range(10))) except ZeroDivisionError: print ‘\tGot ZeroDivisionError as expected from list(pool.imap())‘ else: raise AssertionError(‘expected ZeroDivisionError‘) it = pool.imap(f, range(10)) for i in range(10): try: x = it.next() except ZeroDivisionError: if i == 5: pass except StopIteration: break else: if i == 5: raise AssertionError(‘expected ZeroDivisionError‘) assert i == 9 print ‘\tGot ZeroDivisionError as expected from IMapIterator.next()‘ print # # Testing timeouts # print ‘Testing ApplyResult.get() with timeout:‘, res = pool.apply_async(calculate, TASKS[0]) while 1: sys.stdout.flush() try: sys.stdout.write(‘\n\t%s‘ % res.get(0.02)) break except multiprocessing.TimeoutError: sys.stdout.write(‘.‘) print print print ‘Testing IMapIterator.next() with timeout:‘, it = pool.imap(calculatestar, TASKS) while 1: sys.stdout.flush() try: sys.stdout.write(‘\n\t%s‘ % it.next(0.02)) except StopIteration: break except multiprocessing.TimeoutError: sys.stdout.write(‘.‘) print print # # Testing callback # print ‘Testing callback:‘ A = [] B = [56, 0, 1, 8, 27, 64, 125, 216, 343, 512, 729] r = pool.apply_async(mul, (7, 8), callback=A.append) r.wait() r = pool.map_async(pow3, range(10), callback=A.extend) r.wait() if A == B: print ‘\tcallbacks succeeded\n‘ else: print ‘\t*** callbacks failed\n\t\t%s != %s\n‘ % (A, B) # # Check there are no outstanding tasks # assert not pool._cache, ‘cache = %r‘ % pool._cache # # Check close() methods # print ‘Testing close():‘ for worker in pool._pool: assert worker.is_alive() result = pool.apply_async(time.sleep, [0.5]) pool.close() pool.join() assert result.get() is None for worker in pool._pool: assert not worker.is_alive() print ‘\tclose() succeeded\n‘ # # Check terminate() method # print ‘Testing terminate():‘ pool = multiprocessing.Pool(2) DELTA = 0.1 ignore = pool.apply(pow3, [2]) results = [pool.apply_async(time.sleep, [DELTA]) for i in range(100)] pool.terminate() pool.join() for worker in pool._pool: assert not worker.is_alive() print ‘\tterminate() succeeded\n‘ # # Check garbage collection # print ‘Testing garbage collection:‘ pool = multiprocessing.Pool(2) DELTA = 0.1 processes = pool._pool ignore = pool.apply(pow3, [2]) results = [pool.apply_async(time.sleep, [DELTA]) for i in range(100)] results = pool = None time.sleep(DELTA * 2) for worker in processes: assert not worker.is_alive() print ‘\tgarbage collection succeeded\n‘ if __name__ == ‘__main__‘: multiprocessing.freeze_support() assert len(sys.argv) in (1, 2) if len(sys.argv) == 1 or sys.argv[1] == ‘processes‘: print ‘ Using processes ‘.center(79, ‘-‘) elif sys.argv[1] == ‘threads‘: print ‘ Using threads ‘.center(79, ‘-‘) import multiprocessing.dummy as multiprocessing else: print ‘Usage:\n\t%s [processes | threads]‘ % sys.argv[0] raise SystemExit(2) test() Synchronization types like locks, conditions and queues: # # A test file for the `multiprocessing` package # # Copyright (c) 2006-2008, R Oudkerk # All rights reserved. # import time, sys, random from Queue import Empty import multiprocessing # may get overwritten #### TEST_VALUE def value_func(running, mutex): random.seed() time.sleep(random.random()*4) mutex.acquire() print ‘\n\t\t\t‘ + str(multiprocessing.current_process()) + ‘ has finished‘ running.value -= 1 mutex.release() def test_value(): TASKS = 10 running = multiprocessing.Value(‘i‘, TASKS) mutex = multiprocessing.Lock() for i in range(TASKS): p = multiprocessing.Process(target=value_func, args=(running, mutex)) p.start() while running.value > 0: time.sleep(0.08) mutex.acquire() print running.value, sys.stdout.flush() mutex.release() print print ‘No more running processes‘ #### TEST_QUEUE def queue_func(queue): for i in range(30): time.sleep(0.5 * random.random()) queue.put(i*i) queue.put(‘STOP‘) def test_queue(): q = multiprocessing.Queue() p = multiprocessing.Process(target=queue_func, args=(q,)) p.start() o = None while o != ‘STOP‘: try: o = q.get(timeout=0.3) print o, sys.stdout.flush() except Empty: print ‘TIMEOUT‘ print #### TEST_CONDITION def condition_func(cond): cond.acquire() print ‘\t‘ + str(cond) time.sleep(2) print ‘\tchild is notifying‘ print ‘\t‘ + str(cond) cond.notify() cond.release() def test_condition(): cond = multiprocessing.Condition() p = multiprocessing.Process(target=condition_func, args=(cond,)) print cond cond.acquire() print cond cond.acquire() print cond p.start() print ‘main is waiting‘ cond.wait() print ‘main has woken up‘ print cond cond.release() print cond cond.release() p.join() print cond #### TEST_SEMAPHORE def semaphore_func(sema, mutex, running): sema.acquire() mutex.acquire() running.value += 1 print running.value, ‘tasks are running‘ mutex.release() random.seed() time.sleep(random.random()*2) mutex.acquire() running.value -= 1 print ‘%s has finished‘ % multiprocessing.current_process() mutex.release() sema.release() def test_semaphore(): sema = multiprocessing.Semaphore(3) mutex = multiprocessing.RLock() running = multiprocessing.Value(‘i‘, 0) processes = [ multiprocessing.Process(target=semaphore_func, args=(sema, mutex, running)) for i in range(10) ] for p in processes: p.start() for p in processes: p.join() #### TEST_JOIN_TIMEOUT def join_timeout_func(): print ‘\tchild sleeping‘ time.sleep(5.5) print ‘\n\tchild terminating‘ def test_join_timeout(): p = multiprocessing.Process(target=join_timeout_func) p.start() print ‘waiting for process to finish‘ while 1: p.join(timeout=1) if not p.is_alive(): break print ‘.‘, sys.stdout.flush() #### TEST_EVENT def event_func(event): print ‘\t%r is waiting‘ % multiprocessing.current_process() event.wait() print ‘\t%r has woken up‘ % multiprocessing.current_process() def test_event(): event = multiprocessing.Event() processes = [multiprocessing.Process(target=event_func, args=(event,)) for i in range(5)] for p in processes: p.start() print ‘main is sleeping‘ time.sleep(2) print ‘main is setting event‘ event.set() for p in processes: p.join() #### TEST_SHAREDVALUES def sharedvalues_func(values, arrays, shared_values, shared_arrays): for i in range(len(values)): v = values[i][1] sv = shared_values[i].value assert v == sv for i in range(len(values)): a = arrays[i][1] sa = list(shared_arrays[i][:]) assert a == sa print ‘Tests passed‘ def test_sharedvalues(): values = [ (‘i‘, 10), (‘h‘, -2), (‘d‘, 1.25) ] arrays = [ (‘i‘, range(100)), (‘d‘, [0.25 * i for i in range(100)]), (‘H‘, range(1000)) ] shared_values = [multiprocessing.Value(id, v) for id, v in values] shared_arrays = [multiprocessing.Array(id, a) for id, a in arrays] p = multiprocessing.Process( target=sharedvalues_func, args=(values, arrays, shared_values, shared_arrays) ) p.start() p.join() assert p.exitcode == 0 #### def test(namespace=multiprocessing): global multiprocessing multiprocessing = namespace for func in [ test_value, test_queue, test_condition, test_semaphore, test_join_timeout, test_event, test_sharedvalues ]: print ‘\n\t######## %s\n‘ % func.__name__ func() ignore = multiprocessing.active_children() # cleanup any old processes if hasattr(multiprocessing, ‘_debug_info‘): info = multiprocessing._debug_info() if info: print info raise ValueError(‘there should be no positive refcounts left‘) if __name__ == ‘__main__‘: multiprocessing.freeze_support() assert len(sys.argv) in (1, 2) if len(sys.argv) == 1 or sys.argv[1] == ‘processes‘: print ‘ Using processes ‘.center(79, ‘-‘) namespace = multiprocessing elif sys.argv[1] == ‘manager‘: print ‘ Using processes and a manager ‘.center(79, ‘-‘) namespace = multiprocessing.Manager() namespace.Process = multiprocessing.Process namespace.current_process = multiprocessing.current_process namespace.active_children = multiprocessing.active_children elif sys.argv[1] == ‘threads‘: print ‘ Using threads ‘.center(79, ‘-‘) import multiprocessing.dummy as namespace else: print ‘Usage:\n\t%s [processes | manager | threads]‘ % sys.argv[0] raise SystemExit(2) test(namespace) An example showing how to use queues to feed tasks to a collection of worker processes and collect the results: # # Simple example which uses a pool of workers to carry out some tasks. # # Notice that the results will probably not come out of the output # queue in the same in the same order as the corresponding tasks were # put on the input queue. If it is important to get the results back # in the original order then consider using `Pool.map()` or # `Pool.imap()` (which will save on the amount of code needed anyway). # # Copyright (c) 2006-2008, R Oudkerk # All rights reserved. # import time import random from multiprocessing import Process, Queue, current_process, freeze_support # # Function run by worker processes # def worker(input, output): for func, args in iter(input.get, ‘STOP‘): result = calculate(func, args) output.put(result) # # Function used to calculate result # def calculate(func, args): result = func(*args) return ‘%s says that %s%s = %s‘ % (current_process().name, func.__name__, args, result) # # Functions referenced by tasks # def mul(a, b): time.sleep(0.5*random.random()) return a * b def plus(a, b): time.sleep(0.5*random.random()) return a + b # # # def test(): NUMBER_OF_PROCESSES = 4 TASKS1 = [(mul, (i, 7)) for i in range(20)] TASKS2 = [(plus, (i, 8)) for i in range(10)] # Create queues task_queue = Queue() done_queue = Queue() # Submit tasks for task in TASKS1: task_queue.put(task) # Start worker processes for i in range(NUMBER_OF_PROCESSES): Process(target=worker, args=(task_queue, done_queue)).start() # Get and print results print ‘Unordered results:‘ for i in range(len(TASKS1)): print ‘\t‘, done_queue.get() # Add more tasks using `put()` for task in TASKS2: task_queue.put(task) # Get and print some more results for i in range(len(TASKS2)): print ‘\t‘, done_queue.get() # Tell child processes to stop for i in range(NUMBER_OF_PROCESSES): task_queue.put(‘STOP‘) if __name__ == ‘__main__‘: freeze_support() test() An example of how a pool of worker processes can each run a SimpleHTTPServer.HttpServer instance while sharing a single listening socket. # # Example where a pool of http servers share a single listening socket # # On Windows this module depends on the ability to pickle a socket # object so that the worker processes can inherit a copy of the server # object. (We import `multiprocessing.reduction` to enable this pickling.) # # Not sure if we should synchronize access to `socket.accept()` method by # using a process-shared lock -- does not seem to be necessary. # # Copyright (c) 2006-2008, R Oudkerk # All rights reserved. # import os import sys from multiprocessing import Process, current_process, freeze_support from BaseHTTPServer import HTTPServer from SimpleHTTPServer import SimpleHTTPRequestHandler if sys.platform == ‘win32‘: import multiprocessing.reduction # make sockets pickable/inheritable def note(format, *args): sys.stderr.write(‘[%s]\t%s\n‘ % (current_process().name, format%args)) class RequestHandler(SimpleHTTPRequestHandler): # we override log_message() to show which process is handling the request def log_message(self, format, *args): note(format, *args) def serve_forever(server): note(‘starting server‘) try: server.serve_forever() except KeyboardInterrupt: pass def runpool(address, number_of_processes): # create a single server object -- children will each inherit a copy server = HTTPServer(address, RequestHandler) # create child processes to act as workers for i in range(number_of_processes-1): Process(target=serve_forever, args=(server,)).start() # main process also acts as a worker serve_forever(server) def test(): DIR = os.path.join(os.path.dirname(__file__), ‘..‘) ADDRESS = (‘localhost‘, 8000) NUMBER_OF_PROCESSES = 4 print ‘Serving at http://%s:%d using %d worker processes‘ % (ADDRESS[0], ADDRESS[1], NUMBER_OF_PROCESSES) print ‘To exit press Ctrl-‘ + [‘C‘, ‘Break‘][sys.platform==‘win32‘] os.chdir(DIR) runpool(ADDRESS, NUMBER_OF_PROCESSES) if __name__ == ‘__main__‘: freeze_support() test() Some simple benchmarks comparing multiprocessing with threading: # # Simple benchmarks for the multiprocessing package # # Copyright (c) 2006-2008, R Oudkerk # All rights reserved. # import time, sys, multiprocessing, threading, Queue, gc if sys.platform == ‘win32‘: _timer = time.clock else: _timer = time.time delta = 1 #### TEST_QUEUESPEED def queuespeed_func(q, c, iterations): a = ‘0‘ * 256 c.acquire() c.notify() c.release() for i in xrange(iterations): q.put(a) q.put(‘STOP‘) def test_queuespeed(Process, q, c): elapsed = 0 iterations = 1 while elapsed < delta: iterations *= 2 p = Process(target=queuespeed_func, args=(q, c, iterations)) c.acquire() p.start() c.wait() c.release() result = None t = _timer() while result != ‘STOP‘: result = q.get() elapsed = _timer() - t p.join() print iterations, ‘objects passed through the queue in‘, elapsed, ‘seconds‘ print ‘average number/sec:‘, iterations/elapsed #### TEST_PIPESPEED def pipe_func(c, cond, iterations): a = ‘0‘ * 256 cond.acquire() cond.notify() cond.release() for i in xrange(iterations): c.send(a) c.send(‘STOP‘) def test_pipespeed(): c, d = multiprocessing.Pipe() cond = multiprocessing.Condition() elapsed = 0 iterations = 1 while elapsed < delta: iterations *= 2 p = multiprocessing.Process(target=pipe_func, args=(d, cond, iterations)) cond.acquire() p.start() cond.wait() cond.release() result = None t = _timer() while result != ‘STOP‘: result = c.recv() elapsed = _timer() - t p.join() print iterations, ‘objects passed through connection in‘,elapsed,‘seconds‘ print ‘average number/sec:‘, iterations/elapsed #### TEST_SEQSPEED def test_seqspeed(seq): elapsed = 0 iterations = 1 while elapsed < delta: iterations *= 2 t = _timer() for i in xrange(iterations): a = seq[5] elapsed = _timer()-t print iterations, ‘iterations in‘, elapsed, ‘seconds‘ print ‘average number/sec:‘, iterations/elapsed #### TEST_LOCK def test_lockspeed(l): elapsed = 0 iterations = 1 while elapsed < delta: iterations *= 2 t = _timer() for i in xrange(iterations): l.acquire() l.release() elapsed = _timer()-t print iterations, ‘iterations in‘, elapsed, ‘seconds‘ print ‘average number/sec:‘, iterations/elapsed #### TEST_CONDITION def conditionspeed_func(c, N): c.acquire() c.notify() for i in xrange(N): c.wait() c.notify() c.release() def test_conditionspeed(Process, c): elapsed = 0 iterations = 1 while elapsed < delta: iterations *= 2 c.acquire() p = Process(target=conditionspeed_func, args=(c, iterations)) p.start() c.wait() t = _timer() for i in xrange(iterations): c.notify() c.wait() elapsed = _timer()-t c.release() p.join() print iterations * 2, ‘waits in‘, elapsed, ‘seconds‘ print ‘average number/sec:‘, iterations * 2 / elapsed #### def test(): manager = multiprocessing.Manager() gc.disable() print ‘\n\t######## testing Queue.Queue\n‘ test_queuespeed(threading.Thread, Queue.Queue(), threading.Condition()) print ‘\n\t######## testing multiprocessing.Queue\n‘ test_queuespeed(multiprocessing.Process, multiprocessing.Queue(), multiprocessing.Condition()) print ‘\n\t######## testing Queue managed by server process\n‘ test_queuespeed(multiprocessing.Process, manager.Queue(), manager.Condition()) print ‘\n\t######## testing multiprocessing.Pipe\n‘ test_pipespeed() print print ‘\n\t######## testing list\n‘ test_seqspeed(range(10)) print ‘\n\t######## testing list managed by server process\n‘ test_seqspeed(manager.list(range(10))) print ‘\n\t######## testing Array("i", ..., lock=False)\n‘ test_seqspeed(multiprocessing.Array(‘i‘, range(10), lock=False)) print ‘\n\t######## testing Array("i", ..., lock=True)\n‘ test_seqspeed(multiprocessing.Array(‘i‘, range(10), lock=True)) print print ‘\n\t######## testing threading.Lock\n‘ test_lockspeed(threading.Lock()) print ‘\n\t######## testing threading.RLock\n‘ test_lockspeed(threading.RLock()) print ‘\n\t######## testing multiprocessing.Lock\n‘ test_lockspeed(multiprocessing.Lock()) print ‘\n\t######## testing multiprocessing.RLock\n‘ test_lockspeed(multiprocessing.RLock()) print ‘\n\t######## testing lock managed by server process\n‘ test_lockspeed(manager.Lock()) print ‘\n\t######## testing rlock managed by server process\n‘ test_lockspeed(manager.RLock()) print print ‘\n\t######## testing threading.Condition\n‘ test_conditionspeed(threading.Thread, threading.Condition()) print ‘\n\t######## testing multiprocessing.Condition\n‘ test_conditionspeed(multiprocessing.Process, multiprocessing.Condition()) print ‘\n\t######## testing condition managed by a server process\n‘ test_conditionspeed(multiprocessing.Process, manager.Condition()) gc.enable() if __name__ == ‘__main__‘: multiprocessing.freeze_support() test()
以上是关于multiprocessing的主要内容,如果未能解决你的问题,请参考以下文章
multiprocessing:maxtasksperchild和chunksize冲突?
如何在继续之前等待所有 multiprocessing.Processes 完成?