multiprocessing

Posted 十年闷油瓶

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了multiprocessing相关的知识,希望对你有一定的参考价值。

#
# This module shows how to use arbitrary callables with a subclass of
# `BaseManager`.
#
# Copyright (c) 2006-2008, R Oudkerk
# All rights reserved.
#

from multiprocessing import freeze_support
from multiprocessing.managers import BaseManager, BaseProxy
import operator

##

class Foo(object):
    def f(self):
        print ‘you called Foo.f()‘
    def g(self):
        print ‘you called Foo.g()‘
    def _h(self):
        print ‘you called Foo._h()‘

# A simple generator function
def baz():
    for i in xrange(10):
        yield i*i

# Proxy type for generator objects
class GeneratorProxy(BaseProxy):
    _exposed_ = (‘next‘, ‘__next__‘)
    def __iter__(self):
        return self
    def next(self):
        return self._callmethod(‘next‘)
    def __next__(self):
        return self._callmethod(‘__next__‘)

# Function to return the operator module
def get_operator_module():
    return operator

##

class MyManager(BaseManager):
    pass

# register the Foo class; make `f()` and `g()` accessible via proxy
MyManager.register(‘Foo1‘, Foo)

# register the Foo class; make `g()` and `_h()` accessible via proxy
MyManager.register(‘Foo2‘, Foo, exposed=(‘g‘, ‘_h‘))

# register the generator function baz; use `GeneratorProxy` to make proxies
MyManager.register(‘baz‘, baz, proxytype=GeneratorProxy)

# register get_operator_module(); make public functions accessible via proxy
MyManager.register(‘operator‘, get_operator_module)

##

def test():
    manager = MyManager()
    manager.start()

    print ‘-‘ * 20

    f1 = manager.Foo1()
    f1.f()
    f1.g()
    assert not hasattr(f1, ‘_h‘)
    assert sorted(f1._exposed_) == sorted([‘f‘, ‘g‘])

    print ‘-‘ * 20

    f2 = manager.Foo2()
    f2.g()
    f2._h()
    assert not hasattr(f2, ‘f‘)
    assert sorted(f2._exposed_) == sorted([‘g‘, ‘_h‘])

    print ‘-‘ * 20

    it = manager.baz()
    for i in it:
        print ‘<%d>‘ % i,
    print

    print ‘-‘ * 20

    op = manager.operator()
    print ‘op.add(23, 45) =‘, op.add(23, 45)
    print ‘op.pow(2, 94) =‘, op.pow(2, 94)
    print ‘op.getslice(range(10), 2, 6) =‘, op.getslice(range(10), 2, 6)
    print ‘op.repeat(range(5), 3) =‘, op.repeat(range(5), 3)
    print ‘op._exposed_ =‘, op._exposed_

##

if __name__ == ‘__main__‘:
    freeze_support()
    test()
Using Pool:

#
# A test of `multiprocessing.Pool` class
#
# Copyright (c) 2006-2008, R Oudkerk
# All rights reserved.
#

import multiprocessing
import time
import random
import sys

#
# Functions used by test code
#

def calculate(func, args):
    result = func(*args)
    return ‘%s says that %s%s = %s‘ % (
        multiprocessing.current_process().name,
        func.__name__, args, result
        )

def calculatestar(args):
    return calculate(*args)

def mul(a, b):
    time.sleep(0.5*random.random())
    return a * b

def plus(a, b):
    time.sleep(0.5*random.random())
    return a + b

def f(x):
    return 1.0 / (x-5.0)

def pow3(x):
    return x**3

def noop(x):
    pass

#
# Test code
#

def test():
    print ‘cpu_count() = %d\n‘ % multiprocessing.cpu_count()

    #
    # Create pool
    #

    PROCESSES = 4
    print ‘Creating pool with %d processes\n‘ % PROCESSES
    pool = multiprocessing.Pool(PROCESSES)
    print ‘pool = %s‘ % pool
    print

    #
    # Tests
    #

    TASKS = [(mul, (i, 7)) for i in range(10)] +             [(plus, (i, 8)) for i in range(10)]

    results = [pool.apply_async(calculate, t) for t in TASKS]
    imap_it = pool.imap(calculatestar, TASKS)
    imap_unordered_it = pool.imap_unordered(calculatestar, TASKS)

    print ‘Ordered results using pool.apply_async():‘
    for r in results:
        print ‘\t‘, r.get()
    print

    print ‘Ordered results using pool.imap():‘
    for x in imap_it:
        print ‘\t‘, x
    print

    print ‘Unordered results using pool.imap_unordered():‘
    for x in imap_unordered_it:
        print ‘\t‘, x
    print

    print ‘Ordered results using pool.map() --- will block till complete:‘
    for x in pool.map(calculatestar, TASKS):
        print ‘\t‘, x
    print

    #
    # Simple benchmarks
    #

    N = 100000
    print ‘def pow3(x): return x**3‘

    t = time.time()
    A = map(pow3, xrange(N))
    print ‘\tmap(pow3, xrange(%d)):\n\t\t%s seconds‘ %           (N, time.time() - t)

    t = time.time()
    B = pool.map(pow3, xrange(N))
    print ‘\tpool.map(pow3, xrange(%d)):\n\t\t%s seconds‘ %           (N, time.time() - t)

    t = time.time()
    C = list(pool.imap(pow3, xrange(N), chunksize=N//8))
    print ‘\tlist(pool.imap(pow3, xrange(%d), chunksize=%d)):\n\t\t%s‘           ‘ seconds‘ % (N, N//8, time.time() - t)

    assert A == B == C, (len(A), len(B), len(C))
    print

    L = [None] * 1000000
    print ‘def noop(x): pass‘
    print ‘L = [None] * 1000000‘

    t = time.time()
    A = map(noop, L)
    print ‘\tmap(noop, L):\n\t\t%s seconds‘ %           (time.time() - t)

    t = time.time()
    B = pool.map(noop, L)
    print ‘\tpool.map(noop, L):\n\t\t%s seconds‘ %           (time.time() - t)

    t = time.time()
    C = list(pool.imap(noop, L, chunksize=len(L)//8))
    print ‘\tlist(pool.imap(noop, L, chunksize=%d)):\n\t\t%s seconds‘ %           (len(L)//8, time.time() - t)

    assert A == B == C, (len(A), len(B), len(C))
    print

    del A, B, C, L

    #
    # Test error handling
    #

    print ‘Testing error handling:‘

    try:
        print pool.apply(f, (5,))
    except ZeroDivisionError:
        print ‘\tGot ZeroDivisionError as expected from pool.apply()‘
    else:
        raise AssertionError(‘expected ZeroDivisionError‘)

    try:
        print pool.map(f, range(10))
    except ZeroDivisionError:
        print ‘\tGot ZeroDivisionError as expected from pool.map()‘
    else:
        raise AssertionError(‘expected ZeroDivisionError‘)

    try:
        print list(pool.imap(f, range(10)))
    except ZeroDivisionError:
        print ‘\tGot ZeroDivisionError as expected from list(pool.imap())‘
    else:
        raise AssertionError(‘expected ZeroDivisionError‘)

    it = pool.imap(f, range(10))
    for i in range(10):
        try:
            x = it.next()
        except ZeroDivisionError:
            if i == 5:
                pass
        except StopIteration:
            break
        else:
            if i == 5:
                raise AssertionError(‘expected ZeroDivisionError‘)

    assert i == 9
    print ‘\tGot ZeroDivisionError as expected from IMapIterator.next()‘
    print

    #
    # Testing timeouts
    #

    print ‘Testing ApplyResult.get() with timeout:‘,
    res = pool.apply_async(calculate, TASKS[0])
    while 1:
        sys.stdout.flush()
        try:
            sys.stdout.write(‘\n\t%s‘ % res.get(0.02))
            break
        except multiprocessing.TimeoutError:
            sys.stdout.write(‘.‘)
    print
    print

    print ‘Testing IMapIterator.next() with timeout:‘,
    it = pool.imap(calculatestar, TASKS)
    while 1:
        sys.stdout.flush()
        try:
            sys.stdout.write(‘\n\t%s‘ % it.next(0.02))
        except StopIteration:
            break
        except multiprocessing.TimeoutError:
            sys.stdout.write(‘.‘)
    print
    print

    #
    # Testing callback
    #

    print ‘Testing callback:‘

    A = []
    B = [56, 0, 1, 8, 27, 64, 125, 216, 343, 512, 729]

    r = pool.apply_async(mul, (7, 8), callback=A.append)
    r.wait()

    r = pool.map_async(pow3, range(10), callback=A.extend)
    r.wait()

    if A == B:
        print ‘\tcallbacks succeeded\n‘
    else:
        print ‘\t*** callbacks failed\n\t\t%s != %s\n‘ % (A, B)

    #
    # Check there are no outstanding tasks
    #

    assert not pool._cache, ‘cache = %r‘ % pool._cache

    #
    # Check close() methods
    #

    print ‘Testing close():‘

    for worker in pool._pool:
        assert worker.is_alive()

    result = pool.apply_async(time.sleep, [0.5])
    pool.close()
    pool.join()

    assert result.get() is None

    for worker in pool._pool:
        assert not worker.is_alive()

    print ‘\tclose() succeeded\n‘

    #
    # Check terminate() method
    #

    print ‘Testing terminate():‘

    pool = multiprocessing.Pool(2)
    DELTA = 0.1
    ignore = pool.apply(pow3, [2])
    results = [pool.apply_async(time.sleep, [DELTA]) for i in range(100)]
    pool.terminate()
    pool.join()

    for worker in pool._pool:
        assert not worker.is_alive()

    print ‘\tterminate() succeeded\n‘

    #
    # Check garbage collection
    #

    print ‘Testing garbage collection:‘

    pool = multiprocessing.Pool(2)
    DELTA = 0.1
    processes = pool._pool
    ignore = pool.apply(pow3, [2])
    results = [pool.apply_async(time.sleep, [DELTA]) for i in range(100)]

    results = pool = None

    time.sleep(DELTA * 2)

    for worker in processes:
        assert not worker.is_alive()

    print ‘\tgarbage collection succeeded\n‘


if __name__ == ‘__main__‘:
    multiprocessing.freeze_support()

    assert len(sys.argv) in (1, 2)

    if len(sys.argv) == 1 or sys.argv[1] == ‘processes‘:
        print ‘ Using processes ‘.center(79, ‘-‘)
    elif sys.argv[1] == ‘threads‘:
        print ‘ Using threads ‘.center(79, ‘-‘)
        import multiprocessing.dummy as multiprocessing
    else:
        print ‘Usage:\n\t%s [processes | threads]‘ % sys.argv[0]
        raise SystemExit(2)

    test()
Synchronization types like locks, conditions and queues:

#
# A test file for the `multiprocessing` package
#
# Copyright (c) 2006-2008, R Oudkerk
# All rights reserved.
#

import time, sys, random
from Queue import Empty

import multiprocessing               # may get overwritten


#### TEST_VALUE

def value_func(running, mutex):
    random.seed()
    time.sleep(random.random()*4)

    mutex.acquire()
    print ‘\n\t\t\t‘ + str(multiprocessing.current_process()) + ‘ has finished‘
    running.value -= 1
    mutex.release()

def test_value():
    TASKS = 10
    running = multiprocessing.Value(‘i‘, TASKS)
    mutex = multiprocessing.Lock()

    for i in range(TASKS):
        p = multiprocessing.Process(target=value_func, args=(running, mutex))
        p.start()

    while running.value > 0:
        time.sleep(0.08)
        mutex.acquire()
        print running.value,
        sys.stdout.flush()
        mutex.release()

    print
    print ‘No more running processes‘


#### TEST_QUEUE

def queue_func(queue):
    for i in range(30):
        time.sleep(0.5 * random.random())
        queue.put(i*i)
    queue.put(‘STOP‘)

def test_queue():
    q = multiprocessing.Queue()

    p = multiprocessing.Process(target=queue_func, args=(q,))
    p.start()

    o = None
    while o != ‘STOP‘:
        try:
            o = q.get(timeout=0.3)
            print o,
            sys.stdout.flush()
        except Empty:
            print ‘TIMEOUT‘

    print


#### TEST_CONDITION

def condition_func(cond):
    cond.acquire()
    print ‘\t‘ + str(cond)
    time.sleep(2)
    print ‘\tchild is notifying‘
    print ‘\t‘ + str(cond)
    cond.notify()
    cond.release()

def test_condition():
    cond = multiprocessing.Condition()

    p = multiprocessing.Process(target=condition_func, args=(cond,))
    print cond

    cond.acquire()
    print cond
    cond.acquire()
    print cond

    p.start()

    print ‘main is waiting‘
    cond.wait()
    print ‘main has woken up‘

    print cond
    cond.release()
    print cond
    cond.release()

    p.join()
    print cond


#### TEST_SEMAPHORE

def semaphore_func(sema, mutex, running):
    sema.acquire()

    mutex.acquire()
    running.value += 1
    print running.value, ‘tasks are running‘
    mutex.release()

    random.seed()
    time.sleep(random.random()*2)

    mutex.acquire()
    running.value -= 1
    print ‘%s has finished‘ % multiprocessing.current_process()
    mutex.release()

    sema.release()

def test_semaphore():
    sema = multiprocessing.Semaphore(3)
    mutex = multiprocessing.RLock()
    running = multiprocessing.Value(‘i‘, 0)

    processes = [
        multiprocessing.Process(target=semaphore_func,
                                args=(sema, mutex, running))
        for i in range(10)
        ]

    for p in processes:
        p.start()

    for p in processes:
        p.join()


#### TEST_JOIN_TIMEOUT

def join_timeout_func():
    print ‘\tchild sleeping‘
    time.sleep(5.5)
    print ‘\n\tchild terminating‘

def test_join_timeout():
    p = multiprocessing.Process(target=join_timeout_func)
    p.start()

    print ‘waiting for process to finish‘

    while 1:
        p.join(timeout=1)
        if not p.is_alive():
            break
        print ‘.‘,
        sys.stdout.flush()


#### TEST_EVENT

def event_func(event):
    print ‘\t%r is waiting‘ % multiprocessing.current_process()
    event.wait()
    print ‘\t%r has woken up‘ % multiprocessing.current_process()

def test_event():
    event = multiprocessing.Event()

    processes = [multiprocessing.Process(target=event_func, args=(event,))
                 for i in range(5)]

    for p in processes:
        p.start()

    print ‘main is sleeping‘
    time.sleep(2)

    print ‘main is setting event‘
    event.set()

    for p in processes:
        p.join()


#### TEST_SHAREDVALUES

def sharedvalues_func(values, arrays, shared_values, shared_arrays):
    for i in range(len(values)):
        v = values[i][1]
        sv = shared_values[i].value
        assert v == sv

    for i in range(len(values)):
        a = arrays[i][1]
        sa = list(shared_arrays[i][:])
        assert a == sa

    print ‘Tests passed‘

def test_sharedvalues():
    values = [
        (‘i‘, 10),
        (‘h‘, -2),
        (‘d‘, 1.25)
        ]
    arrays = [
        (‘i‘, range(100)),
        (‘d‘, [0.25 * i for i in range(100)]),
        (‘H‘, range(1000))
        ]

    shared_values = [multiprocessing.Value(id, v) for id, v in values]
    shared_arrays = [multiprocessing.Array(id, a) for id, a in arrays]

    p = multiprocessing.Process(
        target=sharedvalues_func,
        args=(values, arrays, shared_values, shared_arrays)
        )
    p.start()
    p.join()

    assert p.exitcode == 0


####

def test(namespace=multiprocessing):
    global multiprocessing

    multiprocessing = namespace

    for func in [ test_value, test_queue, test_condition,
                  test_semaphore, test_join_timeout, test_event,
                  test_sharedvalues ]:

        print ‘\n\t######## %s\n‘ % func.__name__
        func()

    ignore = multiprocessing.active_children()      # cleanup any old processes
    if hasattr(multiprocessing, ‘_debug_info‘):
        info = multiprocessing._debug_info()
        if info:
            print info
            raise ValueError(‘there should be no positive refcounts left‘)


if __name__ == ‘__main__‘:
    multiprocessing.freeze_support()

    assert len(sys.argv) in (1, 2)

    if len(sys.argv) == 1 or sys.argv[1] == ‘processes‘:
        print ‘ Using processes ‘.center(79, ‘-‘)
        namespace = multiprocessing
    elif sys.argv[1] == ‘manager‘:
        print ‘ Using processes and a manager ‘.center(79, ‘-‘)
        namespace = multiprocessing.Manager()
        namespace.Process = multiprocessing.Process
        namespace.current_process = multiprocessing.current_process
        namespace.active_children = multiprocessing.active_children
    elif sys.argv[1] == ‘threads‘:
        print ‘ Using threads ‘.center(79, ‘-‘)
        import multiprocessing.dummy as namespace
    else:
        print ‘Usage:\n\t%s [processes | manager | threads]‘ % sys.argv[0]
        raise SystemExit(2)

    test(namespace)
An example showing how to use queues to feed tasks to a collection of worker processes and collect the results:

#
# Simple example which uses a pool of workers to carry out some tasks.
#
# Notice that the results will probably not come out of the output
# queue in the same in the same order as the corresponding tasks were
# put on the input queue.  If it is important to get the results back
# in the original order then consider using `Pool.map()` or
# `Pool.imap()` (which will save on the amount of code needed anyway).
#
# Copyright (c) 2006-2008, R Oudkerk
# All rights reserved.
#

import time
import random

from multiprocessing import Process, Queue, current_process, freeze_support

#
# Function run by worker processes
#

def worker(input, output):
    for func, args in iter(input.get, ‘STOP‘):
        result = calculate(func, args)
        output.put(result)

#
# Function used to calculate result
#

def calculate(func, args):
    result = func(*args)
    return ‘%s says that %s%s = %s‘ %         (current_process().name, func.__name__, args, result)

#
# Functions referenced by tasks
#

def mul(a, b):
    time.sleep(0.5*random.random())
    return a * b

def plus(a, b):
    time.sleep(0.5*random.random())
    return a + b

#
#
#

def test():
    NUMBER_OF_PROCESSES = 4
    TASKS1 = [(mul, (i, 7)) for i in range(20)]
    TASKS2 = [(plus, (i, 8)) for i in range(10)]

    # Create queues
    task_queue = Queue()
    done_queue = Queue()

    # Submit tasks
    for task in TASKS1:
        task_queue.put(task)

    # Start worker processes
    for i in range(NUMBER_OF_PROCESSES):
        Process(target=worker, args=(task_queue, done_queue)).start()

    # Get and print results
    print ‘Unordered results:‘
    for i in range(len(TASKS1)):
        print ‘\t‘, done_queue.get()

    # Add more tasks using `put()`
    for task in TASKS2:
        task_queue.put(task)

    # Get and print some more results
    for i in range(len(TASKS2)):
        print ‘\t‘, done_queue.get()

    # Tell child processes to stop
    for i in range(NUMBER_OF_PROCESSES):
        task_queue.put(‘STOP‘)


if __name__ == ‘__main__‘:
    freeze_support()
    test()
An example of how a pool of worker processes can each run a SimpleHTTPServer.HttpServer instance while sharing a single listening socket.

#
# Example where a pool of http servers share a single listening socket
#
# On Windows this module depends on the ability to pickle a socket
# object so that the worker processes can inherit a copy of the server
# object.  (We import `multiprocessing.reduction` to enable this pickling.)
#
# Not sure if we should synchronize access to `socket.accept()` method by
# using a process-shared lock -- does not seem to be necessary.
#
# Copyright (c) 2006-2008, R Oudkerk
# All rights reserved.
#

import os
import sys

from multiprocessing import Process, current_process, freeze_support
from BaseHTTPServer import HTTPServer
from SimpleHTTPServer import SimpleHTTPRequestHandler

if sys.platform == ‘win32‘:
    import multiprocessing.reduction    # make sockets pickable/inheritable


def note(format, *args):
    sys.stderr.write(‘[%s]\t%s\n‘ % (current_process().name, format%args))


class RequestHandler(SimpleHTTPRequestHandler):
    # we override log_message() to show which process is handling the request
    def log_message(self, format, *args):
        note(format, *args)

def serve_forever(server):
    note(‘starting server‘)
    try:
        server.serve_forever()
    except KeyboardInterrupt:
        pass


def runpool(address, number_of_processes):
    # create a single server object -- children will each inherit a copy
    server = HTTPServer(address, RequestHandler)

    # create child processes to act as workers
    for i in range(number_of_processes-1):
        Process(target=serve_forever, args=(server,)).start()

    # main process also acts as a worker
    serve_forever(server)


def test():
    DIR = os.path.join(os.path.dirname(__file__), ‘..‘)
    ADDRESS = (‘localhost‘, 8000)
    NUMBER_OF_PROCESSES = 4

    print ‘Serving at http://%s:%d using %d worker processes‘ %           (ADDRESS[0], ADDRESS[1], NUMBER_OF_PROCESSES)
    print ‘To exit press Ctrl-‘ + [‘C‘, ‘Break‘][sys.platform==‘win32‘]

    os.chdir(DIR)
    runpool(ADDRESS, NUMBER_OF_PROCESSES)


if __name__ == ‘__main__‘:
    freeze_support()
    test()
Some simple benchmarks comparing multiprocessing with threading:

#
# Simple benchmarks for the multiprocessing package
#
# Copyright (c) 2006-2008, R Oudkerk
# All rights reserved.
#

import time, sys, multiprocessing, threading, Queue, gc

if sys.platform == ‘win32‘:
    _timer = time.clock
else:
    _timer = time.time

delta = 1


#### TEST_QUEUESPEED

def queuespeed_func(q, c, iterations):
    a = ‘0‘ * 256
    c.acquire()
    c.notify()
    c.release()

    for i in xrange(iterations):
        q.put(a)

    q.put(‘STOP‘)

def test_queuespeed(Process, q, c):
    elapsed = 0
    iterations = 1

    while elapsed < delta:
        iterations *= 2

        p = Process(target=queuespeed_func, args=(q, c, iterations))
        c.acquire()
        p.start()
        c.wait()
        c.release()

        result = None
        t = _timer()

        while result != ‘STOP‘:
            result = q.get()

        elapsed = _timer() - t

        p.join()

    print iterations, ‘objects passed through the queue in‘, elapsed, ‘seconds‘
    print ‘average number/sec:‘, iterations/elapsed


#### TEST_PIPESPEED

def pipe_func(c, cond, iterations):
    a = ‘0‘ * 256
    cond.acquire()
    cond.notify()
    cond.release()

    for i in xrange(iterations):
        c.send(a)

    c.send(‘STOP‘)

def test_pipespeed():
    c, d = multiprocessing.Pipe()
    cond = multiprocessing.Condition()
    elapsed = 0
    iterations = 1

    while elapsed < delta:
        iterations *= 2

        p = multiprocessing.Process(target=pipe_func,
                                    args=(d, cond, iterations))
        cond.acquire()
        p.start()
        cond.wait()
        cond.release()

        result = None
        t = _timer()

        while result != ‘STOP‘:
            result = c.recv()

        elapsed = _timer() - t
        p.join()

    print iterations, ‘objects passed through connection in‘,elapsed,‘seconds‘
    print ‘average number/sec:‘, iterations/elapsed


#### TEST_SEQSPEED

def test_seqspeed(seq):
    elapsed = 0
    iterations = 1

    while elapsed < delta:
        iterations *= 2

        t = _timer()

        for i in xrange(iterations):
            a = seq[5]

        elapsed = _timer()-t

    print iterations, ‘iterations in‘, elapsed, ‘seconds‘
    print ‘average number/sec:‘, iterations/elapsed


#### TEST_LOCK

def test_lockspeed(l):
    elapsed = 0
    iterations = 1

    while elapsed < delta:
        iterations *= 2

        t = _timer()

        for i in xrange(iterations):
            l.acquire()
            l.release()

        elapsed = _timer()-t

    print iterations, ‘iterations in‘, elapsed, ‘seconds‘
    print ‘average number/sec:‘, iterations/elapsed


#### TEST_CONDITION

def conditionspeed_func(c, N):
    c.acquire()
    c.notify()

    for i in xrange(N):
        c.wait()
        c.notify()

    c.release()

def test_conditionspeed(Process, c):
    elapsed = 0
    iterations = 1

    while elapsed < delta:
        iterations *= 2

        c.acquire()
        p = Process(target=conditionspeed_func, args=(c, iterations))
        p.start()

        c.wait()

        t = _timer()

        for i in xrange(iterations):
            c.notify()
            c.wait()

        elapsed = _timer()-t

        c.release()
        p.join()

    print iterations * 2, ‘waits in‘, elapsed, ‘seconds‘
    print ‘average number/sec:‘, iterations * 2 / elapsed

####

def test():
    manager = multiprocessing.Manager()

    gc.disable()

    print ‘\n\t######## testing Queue.Queue\n‘
    test_queuespeed(threading.Thread, Queue.Queue(),
                    threading.Condition())
    print ‘\n\t######## testing multiprocessing.Queue\n‘
    test_queuespeed(multiprocessing.Process, multiprocessing.Queue(),
                    multiprocessing.Condition())
    print ‘\n\t######## testing Queue managed by server process\n‘
    test_queuespeed(multiprocessing.Process, manager.Queue(),
                    manager.Condition())
    print ‘\n\t######## testing multiprocessing.Pipe\n‘
    test_pipespeed()

    print

    print ‘\n\t######## testing list\n‘
    test_seqspeed(range(10))
    print ‘\n\t######## testing list managed by server process\n‘
    test_seqspeed(manager.list(range(10)))
    print ‘\n\t######## testing Array("i", ..., lock=False)\n‘
    test_seqspeed(multiprocessing.Array(‘i‘, range(10), lock=False))
    print ‘\n\t######## testing Array("i", ..., lock=True)\n‘
    test_seqspeed(multiprocessing.Array(‘i‘, range(10), lock=True))

    print

    print ‘\n\t######## testing threading.Lock\n‘
    test_lockspeed(threading.Lock())
    print ‘\n\t######## testing threading.RLock\n‘
    test_lockspeed(threading.RLock())
    print ‘\n\t######## testing multiprocessing.Lock\n‘
    test_lockspeed(multiprocessing.Lock())
    print ‘\n\t######## testing multiprocessing.RLock\n‘
    test_lockspeed(multiprocessing.RLock())
    print ‘\n\t######## testing lock managed by server process\n‘
    test_lockspeed(manager.Lock())
    print ‘\n\t######## testing rlock managed by server process\n‘
    test_lockspeed(manager.RLock())

    print

    print ‘\n\t######## testing threading.Condition\n‘
    test_conditionspeed(threading.Thread, threading.Condition())
    print ‘\n\t######## testing multiprocessing.Condition\n‘
    test_conditionspeed(multiprocessing.Process, multiprocessing.Condition())
    print ‘\n\t######## testing condition managed by a server process\n‘
    test_conditionspeed(multiprocessing.Process, manager.Condition())

    gc.enable()

if __name__ == ‘__main__‘:
    multiprocessing.freeze_support()
    test()

  

以上是关于multiprocessing的主要内容,如果未能解决你的问题,请参考以下文章

multiprocessing:maxtasksperchild和chunksize冲突?

如何在继续之前等待所有 multiprocessing.Processes 完成?

进程之multiprocessing模块代码篇

multiprocessing_generator 模块触发权限错误

multiprocessing模块

如何在 Python 中使用 multiprocessing.pool 创建全局锁/信号量?