Python多进程multiprocessing使用示例

Posted 水·域

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Python多进程multiprocessing使用示例相关的知识,希望对你有一定的参考价值。

mutilprocess简介

像线程一样管理进程,这个是mutilprocess的核心,他与threading很是相像,对多核CPU的利用率会比threading好的多。

import multiprocessing

def worker(num):
    """thread worker function"""
    print \'Worker:\', num
    return

if __name__ == \'__main__\':
    jobs = []
    for i in range(5):
        p = multiprocessing.Process(target=worker, args=(i,))
        jobs.append(p)
        p.start()
简单的创建进程

确定当前的进程,即是给进程命名,方便标识区分,跟踪

import multiprocessing
import time

def worker():
    name = multiprocessing.current_process().name
    print name, \'Starting\'
    time.sleep(2)
    print name, \'Exiting\'

def my_service():
    name = multiprocessing.current_process().name
    print name, \'Starting\'
    time.sleep(3)
    print name, \'Exiting\'

if __name__ == \'__main__\':
    service = multiprocessing.Process(name=\'my_service\',
                                      target=my_service)
    worker_1 = multiprocessing.Process(name=\'worker 1\',
                                       target=worker)
    worker_2 = multiprocessing.Process(target=worker) # default name

    worker_1.start()
    worker_2.start()
    service.start()
View Code

守护进程就是不阻挡主程序退出,自己干自己的 mutilprocess.setDaemon(True)就这句等待守护进程退出,要加上join,join可以传入浮点数值,等待n久就不等了

import multiprocessing
import time
import sys

def daemon():
    name = multiprocessing.current_process().name
    print \'Starting:\', name
    time.sleep(2)
    print \'Exiting :\', name

def non_daemon():
    name = multiprocessing.current_process().name
    print \'Starting:\', name
    print \'Exiting :\', name

if __name__ == \'__main__\':
    d = multiprocessing.Process(name=\'daemon\',
                                target=daemon)
    d.daemon = True

    n = multiprocessing.Process(name=\'non-daemon\',
                                target=non_daemon)
    n.daemon = False

    d.start()
    n.start()

    d.join(1)
    print \'d.is_alive()\', d.is_alive()
    n.join()
守护进程

最好使用 poison pill,强制的使用terminate()注意 terminate之后要join,使其可以更新状态

import multiprocessing
import time

def slow_worker():
    print \'Starting worker\'
    time.sleep(0.1)
    print \'Finished worker\'

if __name__ == \'__main__\':
    p = multiprocessing.Process(target=slow_worker)
    print \'BEFORE:\', p, p.is_alive()

    p.start()
    print \'DURING:\', p, p.is_alive()

    p.terminate()
    print \'TERMINATED:\', p, p.is_alive()

    p.join()
    print \'JOINED:\', p, p.is_alive()
终止进程
  1. == 0 未生成任何错误  
  2. 0 进程有一个错误,并以该错误码退出
  3. < 0 进程由一个-1 * exitcode信号结束
import multiprocessing
import sys
import time

def exit_error():
    sys.exit(1)

def exit_ok():
    return

def return_value():
    return 1

def raises():
    raise RuntimeError(\'There was an error!\')

def terminated():
    time.sleep(3)

if __name__ == \'__main__\':
    jobs = []
    for f in [exit_error, exit_ok, return_value, raises, terminated]:
        print \'Starting process for\', f.func_name
        j = multiprocessing.Process(target=f, name=f.func_name)
        jobs.append(j)
        j.start()

    jobs[-1].terminate()

    for j in jobs:
        j.join()
        print \'%15s.exitcode = %s\' % (j.name, j.exitcode)
进程的退出状态

方便的调试,可以用logging

import multiprocessing
import logging
import sys

def worker():
    print \'Doing some work\'
    sys.stdout.flush()

if __name__ == \'__main__\':
    multiprocessing.log_to_stderr()
    logger = multiprocessing.get_logger()
    logger.setLevel(logging.INFO)
    p = multiprocessing.Process(target=worker)
    p.start()
    p.join()
日志

利用class来创建进程,定制子类

import multiprocessing

class Worker(multiprocessing.Process):

    def run(self):
        print \'In %s\' % self.name
        return

if __name__ == \'__main__\':
    jobs = []
    for i in range(5):
        p = Worker()
        jobs.append(p)
        p.start()
    for j in jobs:
        j.join()
派生进程
import multiprocessing

class MyFancyClass(object):

    def __init__(self, name):
        self.name = name

    def do_something(self):
        proc_name = multiprocessing.current_process().name
        print \'Doing something fancy in %s for %s!\' % \\
            (proc_name, self.name)

def worker(q):
    obj = q.get()
    obj.do_something()

if __name__ == \'__main__\':
    queue = multiprocessing.Queue()

    p = multiprocessing.Process(target=worker, args=(queue,))
    p.start()

    queue.put(MyFancyClass(\'Fancy Dan\'))

    # Wait for the worker to finish
    queue.close()
    queue.join_thread()
    p.join()

import multiprocessing
import time

class Consumer(multiprocessing.Process):

    def __init__(self, task_queue, result_queue):
        multiprocessing.Process.__init__(self)
        self.task_queue = task_queue
        self.result_queue = result_queue

    def run(self):
        proc_name = self.name
        while True:
            next_task = self.task_queue.get()
            if next_task is None:
                # Poison pill means shutdown
                print \'%s: Exiting\' % proc_name
                self.task_queue.task_done()
                break
            print \'%s: %s\' % (proc_name, next_task)
            answer = next_task()
            self.task_queue.task_done()
            self.result_queue.put(answer)
        return

class Task(object):
    def __init__(self, a, b):
        self.a = a
        self.b = b
    def __call__(self):
        time.sleep(0.1) # pretend to take some time to do the work
        return \'%s * %s = %s\' % (self.a, self.b, self.a * self.b)
    def __str__(self):
        return \'%s * %s\' % (self.a, self.b)

if __name__ == \'__main__\':
    # Establish communication queues
    tasks = multiprocessing.JoinableQueue()
    results = multiprocessing.Queue()

    # Start consumers
    num_consumers = multiprocessing.cpu_count() * 2
    print \'Creating %d consumers\' % num_consumers
    consumers = [ Consumer(tasks, results)
                  for i in xrange(num_consumers) ]
    for w in consumers:
        w.start()

    # Enqueue jobs
    num_jobs = 10
    for i in xrange(num_jobs):
        tasks.put(Task(i, i))

    # Add a poison pill for each consumer
    for i in xrange(num_consumers):
        tasks.put(None)

    # Wait for all of the tasks to finish
    tasks.join()

    # Start printing results
    while num_jobs:
        result = results.get()
        print \'Result:\', result
        num_jobs -= 1
python进程间传递消息

Event提供一种简单的方法,可以在进程间传递状态信息。事件可以切换设置和未设置状态。通过使用一个可选的超时值,时间对象的用户可以等待其状态从未设置变为设置。

import multiprocessing
import time

def wait_for_event(e):
    """Wait for the event to be set before doing anything"""
    print \'wait_for_event: starting\'
    e.wait()
    print \'wait_for_event: e.is_set()->\', e.is_set()

def wait_for_event_timeout(e, t):
    """Wait t seconds and then timeout"""
    print \'wait_for_event_timeout: starting\'
    e.wait(t)
    print \'wait_for_event_timeout: e.is_set()->\', e.is_set()

if __name__ == \'__main__\':
    e = multiprocessing.Event()
    w1 = multiprocessing.Process(name=\'block\', 
                                 target=wait_for_event,
                                 args=(e,))
    w1.start()

    w2 = multiprocessing.Process(name=\'nonblock\', 
                                 target=wait_for_event_timeout, 
                                 args=(e, 2))
    w2.start()

    print \'main: waiting before calling Event.set()\'
    time.sleep(3)
    e.set()
    print \'main: event is set\'
进程间信号传递

Python多进程,一般的情况是Queue来传递。

from multiprocessing import Process, Queue

def f(q):
    q.put([42, None, \'hello\'])

if __name__ == \'__main__\':
    q = Queue()
    p = Process(target=f, args=(q,))
    p.start()
    print q.get()    # prints "[42, None, \'hello\']"
    p.join()
Queue
import Queue
import threading
import time

exitFlag = 0

class myThread (threading.Thread):
    def __init__(self, threadID, name, q):
        threading.Thread.__init__(self)
        self.threadID = threadID
        self.name = name
        self.q = q
    def run(self):
        print "Starting " + self.name
        process_data(self.name, self.q)
        print "Exiting " + self.name

def process_data(threadName, q):
    while not exitFlag:
        queueLock.acquire()
        if not workQueue.empty():
            data = q.get()
            queueLock.release()
            print "%s processing %s" % (threadName, data)
        else:
            queueLock.release()
        time.sleep(1)

threadList = ["Thread-1", "Thread-2", "Thread-3"]
nameList = ["One", "Two", "Three", "Four", "Five"]
queueLock = threading.Lock()
workQueue = Queue.Queue(10)
threads = []
threadID = 1

# Create new threads
for tName in threadList:
    thread = myThread(threadID, tName, workQueue)
    thread.start()
    threads.append(thread)
    threadID += 1

# Fill the queue
queueLock.acquire()
for word in nameList:
    workQueue.put(word)
queueLock.release()

# Wait for queue to empty
while not workQueue.empty():
    pass

# Notify threads it\'s time to exit
exitFlag = 1

# Wait for all threads to complete
for t in threads:
    t.join()
print "Exiting Main Thread"
多线程优先队列Queue

多进程使用Queue通信的例子

import time
from multiprocessing import Process,Queue

MSG_QUEUE = Queue(5)

def startA(msgQueue):
    while True:
        if msgQueue.empty() > 0:
            print (\'queue is empty %d\' % (msgQueue.qsize()))
        else:
            msg = msgQueue.get()
            print( \'get msg %s\' % (msg,))
        time.sleep(1)

def startB(msgQueue):
    while True:
        msgQueue.put(\'hello world\')
        print( \'put hello world queue size is %d\' % (msgQueue.qsize(),))
        time.sleep(3)

if __name__ == \'__main__\':
    processA = Process(target=startA,args=(MSG_QUEUE,))
    processB = Process(target=startB,args=(MSG_QUEUE,))

    processA.start()
    print( \'processA start..\')
View Code

主进程定义了一个Queue类型的变量,并作为Process的args参数传给子进程processA和processB,两个进程一个向队列中写数据,一个读数据。

以上是关于Python多进程multiprocessing使用示例的主要内容,如果未能解决你的问题,请参考以下文章

python ---多进程 Multiprocessing

python多进程-----multiprocessing包

python多进程multiprocessing

python 多进程multiprocessing 模块

python 3 编程之多进程 multiprocessing模块

Python多进程(multiprocessing)