EffectivePython并发及并行

Posted LinBupt

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了EffectivePython并发及并行相关的知识,希望对你有一定的参考价值。

第40条:考虑用协程来并发地运行多个函数

线程有三个显著的缺点:

为了确保数据安全,我们必须使用特殊的工具来协调这些线程。这使得多线程的代码,要比单线程的过程式代码更加难懂。这种复杂的多线程代码,会逐渐另程序变得难于扩展和维护。

线程需要占用大量内存,每个正在执行的线程,大约占据8MB内存。

线程启动时的开销比较大。如果程序不停地依靠创建新线程来同时执行多个函数,并等待这些线程结束,那么使用线程所引发的开销,就会拖慢整个程序的速度。

Python的协程(coroutine)可以避免上述问题,它使得Python程序看上去好像是在同时运行多个函数。写成的实现方式,实际上是对生成器的一种扩展。启动生成器协程所需的开销,与调用函数的开销相仿。处于活跃状态的协程,在其耗尽之前,只会占用不到1KB的内存。

协程的工作原理:每当生成器函数执行到yield表达式的时候,消耗生成器的那段代码,就通过send方法给生成器回传一个值。而生成器在收到了经由send函数所传进来的这个值之后,会将其视为yield表达式的执行结果。

 1 def my_coroutine():
 2     while True:
 3         received = yield
 4         print(‘Received:‘, received)
 5 it = my_coroutine()
 6 next(it)  # Prime the coroutine
 7 it.send(‘First‘)
 8 it.send(‘Second‘)
 9 
10 >>>
11 Received:First
12 Received:Second

评估完当前的yield表达式之后,生成器还会继续推进到下一个yield表达式那里,并把那个yield关键字右侧的内容,当成send方法的返回值,返回给外界。

在生成器上面调用send方法之前,我们要先调用一次next函数,以便将生成器推进到第一条yield表达式那里。此后,我们可以把yield操作与send操作结合起来,令生成器能够根据外界所输入的数据,用一套标准的流程来产生对应的输出值。

def minimize():
    current = yield
    while True:
        value = yield current
        current = min(value, current)

it = minimize()
next(it) # Prime the generator
print(it.send(10))
print(it.send(4))
print(it.send(22))
print(it.send(-1))

与线程类似,协程也是独立的函数,它可以消耗由外部环境所传进来的输入数据,并产生相应的输出结果。但与线程不同的是,协程会在生成器函数中的每个yield表达式那里暂停,等到外界再次调用send方法之后,它才会继续执行到下一个yield表达式。

这种奇妙的机制,使得消耗生成器的那段代码,可以在每执行完协程中的一条yield表达式之后,就进行相应的处理。例如,那段代码可以用生成器所产生的输出值,来调用其他函数,并更新程序的数据结构。更为重要的是,它可以通过这个输出值,来推进其他的生成器函数,使得那些生成器函数也执行到它们各自的下一条yield表达式处。接连推进多个独立的生成器,即可模拟出Python线程的并发行为,令程序看上去好像是在同时运行多个函数。

Query-->向生成器协程提供一种手段,使得协程能够借此向外围环境查询相关的信息。

下面这个协程,会针对本细胞的每一个相邻细胞,来产生与之对应的Query对象。每个yield表达式的结果,要么是ALIVE,要么是EMPTY。这就是协程与消费代码之间接口契约。其后,count_neighbors生成器会根据相邻细胞的生存状态,来返回本细胞周边的存活细胞个数。

 1 from collections import namedtuple
 2 def count_neighbors(y, x): # 获取相邻细胞的生存状态
 3     Query = namedtuple(‘Query‘, (‘y‘, ‘x‘))
 4     n_ = yield Query(y + 1, x + 0)  # North
 5     ne = yield Query(y + 1, x + 1)  # Northeast
 6     e_ = yield Query(y + 0, x + 1)
 7     se = yield Query(y - 1, x + 1)
 8     s_ = yield Query(y - 1, x + 0)
 9     sw = yield Query(y - 1, x - 1)
10     w_ = yield Query(y + 0, x - 1)
11     nw = yield Query(y + 1, x - 1)
12 
13     neighbor_status = [n_, ne, e_, se, s_, sw, w_, nw]
14     count = 0
15     for state in neighbor_status:
16         if state == ‘ALIVE‘:
17             count += 1
18     return count
19 
20 it = count_neighbors(10, 5)
21 q1 = next(it)
22 print(‘First yield: ‘, q1)
23 q2 = it.send(‘ALIVE‘)
24 print(‘Second yield:‘, q2)
25 q3 = it.send(‘ALIVE‘)
26 print(‘Third yield: ‘, q3)
27 q4 = it.send(‘ALIVE‘)
28 print(‘4th yield:‘, q4)
29 q5 = it.send(‘ALIVE‘)
30 print(‘5th yield: ‘, q5)
31 q6 = it.send(‘ALIVE‘)
32 print(‘6th yield:‘, q6)
33 q7 = it.send(‘ALIVE‘)
34 print(‘7th yield: ‘, q7)
35 q8 = it.send(‘ALIVE‘)
36 print(‘8th yield:‘, q8)
37 
38 try:
39     count = it.send(‘ALIVE‘)
40 except StopIteration as e:
41     print(‘Count: ‘, e.value)  # Value from return statement
42 
43 >>>
44 First yield:  Query(y=11, x=5)
45 Second yield: Query(y=11, x=6)
46 Third yield:  Query(y=10, x=6)
47 4th yield: Query(y=9, x=6)
48 5th yield:  Query(y=9, x=5)
49 6th yield: Query(y=9, x=4)
50 7th yield:  Query(y=10, x=4)
51 8th yield: Query(y=11, x=4)
52 Count:  8

用虚构的数据测试这个count_neighbors协程。针对本细胞的每个相邻细胞,向生成器索要一个Query对象,并根据该对象给出那个相邻细胞的存活状态。然后,通过send方法把状态发给协程,使count_neighbors协程可以收到上一个Query对象所对应的状态。最后,由于协程中的return语句会把生成器耗竭,所以程序届时将抛出StopIteration异常,而我们可以在处理该异常的过程中,得知本细胞周边的存活细胞数量。

count_neighbors协程把相邻的存活细胞数量统计出来之后,我们必须根据这个数量来更新本细胞的状态,于是,就需要用一种方式来表示状态的迁移。-->step_cell协程,这个生成器会产生Transition对象,用以表示本细胞的状态迁移。

step_cell协程会通过参数来接收当前细胞的网格坐标。它会根据此坐标产生Query对象,以查询本细胞的初始状态。接下来,它运行count_neighbors协程,以检视本细胞周边的其他细胞。此后,它运行game_logic函数,以判断本细胞在下一轮应该处于何种状态。最后,它生成Transition对象,把本细胞在下一轮所应有的状态,告诉外部代码。

1 def game_logic(state, neighbors):
2     pass
3 
4 def step_cell(y, x):
5     state = yield Query(y, x)
6     neighbors = yield from count_neighbors(y, x)
7     next_state = game_logic(state, neighbors)
8     yield Transition(y, x, next_state)

请注意,step_cell协程用yield from表达式来调用count_neighbors。在Python程序中,这种表达式可以把生成器协程组合起来,使开发者能够更加方便地复用小段的功能代码,并通过简单的协程来构建复杂的协程。count_neighbors协程耗竭之后,其最终的返回值(也就是return语句的返回值)会作为yield from表达式的结果,传给step_cell。

 1 # 测试step_cell协程
 2 from collections import namedtuple
 3 Query = namedtuple(Query, (y, x))
 4 Transition = namedtuple(Transition, (y, x, state))
 5 def count_neighbors(y, x):
 6     n_ = yield Query(y + 1, x + 0)  # North
 7     ne = yield Query(y + 1, x + 1)  # Northeast
 8     e_ = yield Query(y + 0, x + 1)
 9     se = yield Query(y - 1, x + 1)
10     s_ = yield Query(y - 1, x + 0)
11     sw = yield Query(y - 1, x - 1)
12     w_ = yield Query(y + 0, x - 1)
13     nw = yield Query(y + 1, x - 1)
14 
15     neighbor_status = [n_, ne, e_, se, s_, sw, w_, nw]
16     count = 0
17     for state in neighbor_status:
18         if state == ALIVE:
19             count += 1
20     return count
21 
22 def game_logic(state, neighbors):
23     if state == ALIVE:
24         if neighbors < 2:
25             return EMPTY
26         elif neighbors > 3:
27             return EMPTY
28     else:
29         if neighbors == 3:
30             return ALIVE
31     return state
32 
33 def step_cell(y, x):
34     state = yield Query(y, x)
35     neighbors = yield from count_neighbors(y, x)
36     next_state = game_logic(state, neighbors)
37     yield Transition(y, x, next_state)
38 
39 it = step_cell(10, 5)
40 q0 = next(it)  # initial location query
41 print(Me: , q0)
42 q1 = it.send(ALIVE)  #send my status, get neighbor query
43 print(Q1:, q1)
44 q2 = it.send(ALIVE)
45 print(Q2: , q2)
46 q3 = it.send(ALIVE)
47 print(Q3:, q3)
48 q4 = it.send(ALIVE)
49 print(Q4: , q4)
50 q5 = it.send(ALIVE)
51 print(Q5: , q5)
52 q6 = it.send(ALIVE)
53 print(Q6: , q6)
54 q7 = it.send(ALIVE)
55 print(Q7:, q7)
56 try:
57     count = it.send(ALIVE)
58 except StopIteration as e:
59     print(Count: , e.value)  # Value from return statement
60 
61 t1 = it.send(EMPTY)
62 print(Outcome: , t1)

生命游戏的目标,是要同时在网格中的每个细胞上面,运行刚才编写的那套游戏逻辑。为此,我们把step_cell协程组合到新的simulate协程之中。新的协程,会多次通过yield from表达式,来推进网格中的每一个细胞。把每个坐标点中的细胞都处理完之后,simulate协程会产生TICK对象,用以表示当前这代的细胞已经全部迁移完毕。

 

以上是关于EffectivePython并发及并行的主要内容,如果未能解决你的问题,请参考以下文章

goroutine简介

Python面试简介及并行并发

网络编程进阶及并发编程

程序进程和线程及并行和并发的区别

如何在 python 中并行化以下代码片段?

Python:多任务,并发,并行的理解及线程进程的对比