多进程 — 信号传递与进程控

Posted 有雨敲窗2017

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了多进程 — 信号传递与进程控相关的知识,希望对你有一定的参考价值。

内容目录:

  1. multiprocessing.Queue()
  2. JoinableQueue
  3. 进程间的信号传递 Event
  4. 控制对资源的访问 Lock
  5. 同步操作 Condition
  6. 控制对资源的并发访问 Semaphore
  7. 管理共享状态 Manager
  8. 共享命名空间 mgr.Namespace()
  9. 进程池 multiprocessing.Pool

1. multiprocessing.Queue()

和线程一样,多进程的一个常见的使用模式是将一个任务划分为几个worker,以便并行运行。有效地使用多进程通常需要它们之间的一些通信,这样工作就可以被分割,结果可以被聚合。一种简单方法是使用队列multiprocessing.Queue()来回传递消息。任何可以用pickle序列化的对象都可以通过队列。

import multiprocessing


class MyFancyClass:

    def __init__(self, name):
        self.name = name

    def do_something(self):
        proc_name = multiprocessing.current_process().name
        print(‘Doing something fancy in {} for {}!‘.format(
            proc_name, self.name))


def worker(q):
    obj = q.get()
    obj.do_something()



if __name__ == ‘__main__‘:
    queue = multiprocessing.Queue()

    p = multiprocessing.Process(target=worker, args=(queue,))
    p.start()

    queue.put(MyFancyClass(‘Fancy Dan‘))

    # Wait for the worker to finish
    queue.close()
    queue.join_thread()
    p.join()

结果:当q是空的时候,q.get()会等。

Doing something fancy in Process-1 for Fancy Da

2. JoinableQueue

JoinableQueue的实例p除了与Queue对象相同的方法之外还具有:

  • q.task_done():使用者使用此方法发出信号,表示q.get()的返回项目已经被处理。如果调用此方法的次数大于从队列中删除项目的数量,将引发ValueError异常
  • q.join():生产者调用此方法进行阻塞,直到队列中所有的项目均被处理。阻塞将持续到队列中的每个项目均调用q.task_done()方法为止

使用None这个特殊值来判断是否结束Worker

import multiprocessing
import time


class Consumer(multiprocessing.Process):

    def __init__(self, task_queue, result_queue):
        multiprocessing.Process.__init__(self)
        self.task_queue = task_queue
        self.result_queue = result_queue

    def run(self):
        proc_name = self.name
        while True:
            next_task = self.task_queue.get()
            if next_task is None:
                # Poison pill means shutdown
                print(‘{}: Exiting‘.format(proc_name))
                self.task_queue.task_done()
                break
            # next_task是Task()的一个实例,打印next_task会输出__str__
            print(‘{}: {}‘.format(proc_name, next_task))
            # 执行next_task()会执行__call__
            answer = next_task()
            self.task_queue.task_done()
            self.result_queue.put(answer)


class Task:

    def __init__(self, a, b):
        self.a = a
        self.b = b

    def __call__(self):
        time.sleep(0.1)  # pretend to take time to do the work
        return {self.a} * {self.b} = {product}.format(
            self=self, product=self.a * self.b)

    def __str__(self):
        return {self.a} * {self.b}.format(self=self)


if __name__ == ‘__main__‘:
    # Establish communication queues
    tasks = multiprocessing.JoinableQueue()
    results = multiprocessing.Queue()

    # Start consumers
    num_consumers = multiprocessing.cpu_count() * 2
    print(‘Creating {} consumers‘.format(num_consumers))
    consumers = [
        Consumer(tasks, results)
        for i in range(num_consumers)
    ]
    for w in consumers:
        w.start()

    # Enqueue jobs
    num_jobs = 10
    for i in range(num_jobs):
        tasks.put(Task(i, i))

    # Add a poison pill for each consumer
    for i in range(num_consumers):
        tasks.put(None)

    # Wait for all of the tasks to finish
    tasks.join()

    # Start printing results
    while num_jobs:
        result = results.get()
        print(‘Result:‘, result)
        num_jobs -= 1

执行结果:

Creating 8 consumers
Consumer-4: 0 * 0
Consumer-1: 1 * 1
Consumer-2: 2 * 2
Consumer-4: 3 * 3
Consumer-1: 4 * 4
Consumer-2: 5 * 5
Consumer-1: 6 * 6
Consumer-6: 7 * 7
Consumer-4: 8 * 8
Consumer-2: 9 * 9
Consumer-1: Exiting
Consumer-4: Exiting
Consumer-6: Exiting
Consumer-2: Exiting
Consumer-5: Exiting
Consumer-8: Exiting
Consumer-3: Exiting
Consumer-7: Exiting
Result: 0 * 0 = 0
Result: 1 * 1 = 1
Result: 2 * 2 = 4
Result: 4 * 4 = 16
Result: 3 * 3 = 9
Result: 5 * 5 = 25
Result: 6 * 6 = 36
Result: 8 * 8 = 64
Result: 7 * 7 = 49
Result: 9 * 9 = 81

3. 进程间的信号传递 Event

Event类提供了一种简单的方法来在进程之间传递状态信息。
当wait()超时时,它返回时不会出现错误。调用者负责使用is_set()检查事件的状态

import multiprocessing
import time


def wait_for_event(e):
    """Wait for the event to be set before doing anything"""
    print(‘wait_for_event: starting‘)
    e.wait()
    print(‘wait_for_event: e.is_set()->‘, e.is_set())


def wait_for_event_timeout(e, t):
    """Wait t seconds and then timeout"""
    print(‘wait_for_event_timeout: starting‘)
    e.wait(t)
    print(‘wait_for_event_timeout: e.is_set()->‘, e.is_set())


if __name__ == ‘__main__‘:
    e = multiprocessing.Event()
    w1 = multiprocessing.Process(
        name=‘block‘,
        target=wait_for_event,
        args=(e,),
    )
    w1.start()

    w2 = multiprocessing.Process(
        name=‘nonblock‘,
        target=wait_for_event_timeout,
        args=(e, 2),
    )
    w2.start()
    print(‘main: waiting before calling Event.set()‘)
    time.sleep(3)
    e.set()
    print(‘main: event is set‘)

执行结果:

main: waiting before calling Event.set()
wait_for_event: starting
wait_for_event_timeout: starting
wait_for_event_timeout: e.is_set()-> False
main: event is set
wait_for_event: e.is_set()-> True

4. 控制对资源的访问 Lock

在需要在多个进程之间共享单个资源的情况下,可以使用锁来避免冲突的访问。

import multiprocessing
import sys


def worker_with(lock):
    with lock:
        sys.stdout.write(‘Lock acquired via with\n)


def worker_no_with(lock):
    lock.acquire()
    try:
        sys.stdout.write(‘Lock acquired directly\n)
    finally:
        lock.release()

if __name__ == ‘__main__‘:
    lock = multiprocessing.Lock()
    w = multiprocessing.Process(
        target=worker_with,
        args=(lock,),
    )
    nw = multiprocessing.Process(
        target=worker_no_with,
        args=(lock,),
    )

    w.start()
    nw.start()

    w.join()
    nw.join()

运行结果:

Lock acquired via with
Lock acquired directly

5. 同步操作 Condition

cond.wait()等着,cond.notify_all()通知可以往下运行了

import multiprocessing
import time


def stage_1(cond):
    """perform first stage of work,
    then notify stage_2 to continue
    """
    name = multiprocessing.current_process().name
    print(‘Starting‘, name)
    with cond:
        print(‘{} done and ready for stage 2‘.format(name))
        cond.notify_all()


def stage_2(cond):
    """wait for the condition telling us stage_1 is done"""
    name = multiprocessing.current_process().name
    print(‘Starting‘, name)
    with cond:
        cond.wait()
        print(‘{} running‘.format(name))


if __name__ == ‘__main__‘:
    condition = multiprocessing.Condition()
    s1 = multiprocessing.Process(name=‘s1‘,
                                 target=stage_1,
                                 args=(condition,))
    s2_clients = [
        multiprocessing.Process(
            name=‘stage_2[{}]‘.format(i),
            target=stage_2,
            args=(condition,),
        )
        for i in range(1, 3)
    ]

    for c in s2_clients:
        c.start()
        time.sleep(1)
    s1.start()

    s1.join()
    for c in s2_clients:
        c.join()

运行结果:在这个例子中,两个进程并行地运行第二阶段的工作,但是只有在第一个阶段完成之后。

Starting stage_2[1]
Starting stage_2[2]
Starting s1
s1 done and ready for stage 2
stage_2[1] running
stage_2[2] running

6. 控制对资源的并发访问 Semaphore

有时,允许多个worker一次访问一个资源是很有用的,但要限制了数量。

import multiprocessing
import time

def worker(s, i):
    s.acquire()
    print(multiprocessing.current_process().name + "acquire");
    time.sleep(i)
    print(multiprocessing.current_process().name + "release\n");
    s.release()

if __name__ == "__main__":
    s = multiprocessing.Semaphore(2)
    for i in range(5):
        p = multiprocessing.Process(target = worker, args=(s, i*2))
        p.start()

运行结果:

Process-2acquire
Process-3acquire
Process-2release

Process-4acquire
Process-3release

Process-1acquire
Process-1release

Process-5acquire
Process-4release

Process-5release

7. 管理共享状态 Manager

通过Manager共享信息,所有进程都能看得到。

import multiprocessing
import pprint


def worker(d, key, value):
    d[key] = value


if __name__ == ‘__main__‘:
    mgr = multiprocessing.Manager()
    d = mgr.dict()
    jobs = [
        multiprocessing.Process(
            target=worker,
            args=(d, i, i * 2),
        )
        for i in range(10)
    ]
    for j in jobs:
        j.start()
    for j in jobs:
        j.join()
    print(‘Results:‘, d

运行结果:通过Manager创建列表,它是共享的,并且在所有进程中都可以看到更新。字典也支持。

Results: {0: 0, 2: 4, 3: 6, 1: 2, 4: 8, 6: 12, 5: 10, 7: 14, 8: 16, 9: 18}

8. 共享命名空间 Manager

除了字典和列表之外,管理者还可以创建一个共享的名称空间。

import multiprocessing


def producer(ns, event):
    ns.value = ‘This is the value‘
    event.set()


def consumer(ns, event):
    try:
        print(‘Before event: {}‘.format(ns.value))
    except Exception as err:
        print(‘Before event, error:‘, str(err))
    event.wait()
    print(‘After event:‘, ns.value)


if __name__ == ‘__main__‘:
    mgr = multiprocessing.Manager()
    namespace = mgr.Namespace()
    event = multiprocessing.Event()
    p = multiprocessing.Process(
        target=producer,
        args=(namespace, event),
    )
    c = multiprocessing.Process(
        target=consumer,
        args=(namespace, event),
    )

    c.start()
    p.start()

    c.join()
    p.join()

运行结果:可以看到在另一个进程中可以对mgr.Namespace()进行复制,其他进程可以访问。

Before event, error: ‘Namespace‘ object has no attribute ‘value‘
After event: This is the value

重要的是要知道mgr.Namespace()中可变值的内容的更新不会自动传播。

import multiprocessing


def producer(ns, event):
    # DOES NOT UPDATE GLOBAL VALUE!
    ns.my_list.append(‘This is the value‘)
    event.set()


def consumer(ns, event):
    print(‘Before event:‘, ns.my_list)
    event.wait()
    print(‘After event :‘, ns.my_list)


if __name__ == ‘__main__‘:
    mgr = multiprocessing.Manager()
    namespace = mgr.Namespace()
    namespace.my_list = []

    event = multiprocessing.Event()
    p = multiprocessing.Process(
        target=producer,
        args=(namespace, event),
    )
    c = multiprocessing.Process(
        target=consumer,
        args=(namespace, event),
    )

    c.start()
    p.start()

    c.join()
    p.join()

运行结果:

Before event: []
After event : []

9. 进程池 multiprocessing.Pool

池类可用于管理固定数量的worker,用于简单的工作,在这些情况下,可以将工作分解并独立地分配给worker。

import multiprocessing


def do_calculation(data):
    return data * 2


def start_process():
    print(‘Starting‘, multiprocessing.current_process().name)


if __name__ == ‘__main__‘:
    inputs = list(range(10))
    print(‘Input   :‘, inputs)

    builtin_outputs = map(do_calculation, inputs)
    print(‘Built-in:‘, builtin_outputs)

    pool_size = multiprocessing.cpu_count() * 2
    pool = multiprocessing.Pool(
        processes=pool_size,
        initializer=start_process,
    )
    pool_outputs = pool.map(do_calculation, inputs)
    pool.close()  # no more tasks
    pool.join()  # wrap up current tasks

    print(‘Pool    :‘, pool_outputs)

运行结果:进程的返回值被收集并作为一个列表返回。

Input   : [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
Built-in: <map object at 0x000000000256A080>
Starting SpawnPoolWorker-2
Starting SpawnPoolWorker-3
Starting SpawnPoolWorker-4
Starting SpawnPoolWorker-1
Starting SpawnPoolWorker-6
Starting SpawnPoolWorker-5
Starting SpawnPoolWorker-7
Starting SpawnPoolWorker-8
Pool    : [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]

在默认情况下,池会创建固定数量的worker进程,并将作业传递给他们,直到没有更多的工作。设置maxtasksperchild参数告诉池在完成了几个任务后重新启动worker进程,防止长时间运行的worker消耗更多的系统资源。

import multiprocessing


def do_calculation(data):
    return data * 2


def start_process():
    print(‘Starting‘, multiprocessing.current_process().name)


if __name__ == ‘__main__‘:
    inputs = list(range(10))
    print(‘Input   :‘, inputs)

    builtin_outputs = map(do_calculation, inputs)
    print(‘Built-in:‘, builtin_outputs)

    pool_size = multiprocessing.cpu_count() * 2
    pool = multiprocessing.Pool(
        processes=pool_size,
        initializer=start_process,
        maxtasksperchild=2,
    )
    pool_outputs = pool.map(do_calculation, inputs)
    pool.close()  # no more tasks
    pool.join()  # wrap up current tasks

    print(‘Pool    :‘, pool_outputs)

运行结果:当工人完成分配的任务时,即使没有更多的工作,他们也会重新开始工作。在这个输出中,有9个worker被创建,尽管只有10个任务,有的worker一次可以完成其中的两个任务。

Input   : [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
Built-in: <map object at 0x00000000025CA080>
Starting SpawnPoolWorker-4
Starting SpawnPoolWorker-2
Starting SpawnPoolWorker-1
Starting SpawnPoolWorker-5
Starting SpawnPoolWorker-3
Starting SpawnPoolWorker-8
Starting SpawnPoolWorker-6
Starting SpawnPoolWorker-7
Starting SpawnPoolWorker-9
Pool    : [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]

以上是关于多进程 — 信号传递与进程控的主要内容,如果未能解决你的问题,请参考以下文章

多进程(了解),守护进程,互斥锁,信号量,进程Queue与线程queue

第九篇:进程与线程

进程间通信之-----信号量

在 Python 多处理进程中运行较慢的 OpenCV 代码片段

请教一个Linux下C语言的进程间的信号问题

代码片段:Shell脚本实现重复执行和多进程