Python 协程

Posted onetoinf

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Python 协程相关的知识,希望对你有一定的参考价值。

协程历史

Python由于众所周知的GIL的原因,导致其线程无法发挥多核的并行计算能力(当然,后来有了multiprocessing,可以实现多进程并行),显得比较鸡肋。既然在GIL之下,同一时刻只能有一个线程在运行,那么对于CPU密集的程序来说,线程之间的切换开销就成了拖累,而以I/O为瓶颈的程序正是协程所擅长的:

多任务并发(非并行),每个任务在合适的时候挂起(发起I/O)和恢复(I/O结束)

协程,又称微线程,纤程。英文名Coroutine。

根据维基百科给出的定义,“协程 是为非抢占式多任务产生子程序的计算机程序组件,协程允许不同入口点在不同位置暂停或开始执行程序”。从技术的角度来说,“协程就是你可以暂停执行的函数”。如果你把它理解成“就像生成器一样”,那么你就想对了。

Python中的协程经历了很长的一段发展历程。其大概经历了如下三个阶段:

  1. 最初的生成器变形yield/send
  2. 引入@asyncio.coroutine和yield from
  3. 在最近的Python3.5版本中引入async/await关键字

协程结构

协程一般分为主程序(调用方)和子程序(生成器)

主程序很好理解,不多说;子程序,或者称为函数,在所有语言中都是层级调用,比如A调用B,B在执行过程中又调用了C,C执行完毕返回,B执行完毕返回,最后是A执行完毕。所以子程序调用是通过栈实现的,一个线程就是执行一个子程序。子程序调用总是一个入口,一次返回,调用顺序是明确的。而协程的调用和子程序不同。

协程看上去也是子程序,但执行过程中,在子程序内部可中断,然后转而执行别的子程序,在适当的时候再返回来接着执行。

注意,在一个子程序中中断,去执行其他子程序,不是函数调用,有点类似CPU的中断。比如子程序A、B:

# encoding:utf-8


'''消费者生产者模型

生产者 -- 生产东西 -- 主程 -- 不能写死循环
消费者 -- 消费东西 -- 协程 -- 往往写死循环
'''


def producer(c):
    c.send(None)
    n = 0
    while n < 5:
        n += 1
        print('生产者,生产了... %s' % n)
        r = c.send(n)
        print('生产者, 收到了回馈... %s' % r)
    c.close()


def consumer():
    r = ''
    while True:
        n = yield r
        if not n:
            return
        else:
            print('消费者,消费了... %s' % n)


if __name__ == '__main__':
    g = generator()
    print(g)
    print(g.send(None))
    print(g.send(1))
    print(g.send(2))
    # c = consumer()
    # producer(c)

运行结果:

生产者,生产了... 1
消费者,消费了... 1
生产者, 收到了回馈... 
生产者,生产了... 2
消费者,消费了... 2
生产者, 收到了回馈... 
生产者,生产了... 3
消费者,消费了... 3
生产者, 收到了回馈... 
生产者,生产了... 4
消费者,消费了... 4
生产者, 收到了回馈... 
生产者,生产了... 5
消费者,消费了... 5
生产者, 收到了回馈... 
[Finished in 0.1s]

协程状态

协程当前状态可以使用inspect.getgeneratorstate(…) 函数确定,该函数会返回下述字符串中的一个:

  1. GEN_CREATED:等待开始执行
  2. GEN_RUNNING:解释器正在执行
  3. GEN_SUSPENED:在yield表达式处暂停
  4. GEN_CLOSED:执行结束

最先调用 next(sc) 函数这一步通常称为“预激”(prime)协程==(即,让协程向前执行到第一个 yield 表达式,准备好作为活跃的协程使用)。

>>> import inspect
>>> def simple_coroutine(a):
...     print('-> start')
...     
...     b = yield a
...     print('-> recived', a, b)
...     
...     c = yield a + b
...     print('-> recived', a, b, c)
...     
... 
>>> 
>>> sc = simple_coroutine(5)
>>> type(sc)
<class 'generator'>
>>> inspect.getgeneratorstate(sc)
'GEN_CREATED'
>>> next(sc)
-> start
5
>>> inspect.getgeneratorstate(sc)
'GEN_SUSPENDED'
>>> sc.send(6)
-> recived 5 6
11
>>> inspect.getgeneratorstate(sc)
'GEN_SUSPENDED'
>>> sc.send(7)
-> recived 5 6 7
Traceback (most recent call last):
  File "<input>", line 1, in <module>
    sc.send(7)
StopIteration
>>> inspect.getgeneratorstate(sc)
'GEN_CLOSED'
>>>

yield/send

  1. 第一次执行next(generator)时,会执行完yield语句后程序进行挂起,所有的参数和状态会进行保存。再一次执行next(generator)时,会从挂起的状态开始往后执行。在遇到程序的结尾或者遇到StopIteration时,循环结束。
  2. 可以通过generator.send(arg)来传入参数,这是协程模型。
  3. 可以通过generator.throw(exception)来传入一个异常。throw语句会消耗掉一个yield。可以通过generator.close()来手动关闭生成器。
  4. next()等价于send(None)

yield from

yield from用于重构生成器,简单的,可以这么使用:

def fib(n):
    index = 0
    a = 0
    b = 1
    while index < n:
        yield b
        a, b = b, a + b
        index += 1
        
def copy_fib(n):
    print('I am copy from fib')
    yield from fib(n)
    print('Copy end')
print('-'*10 + 'test yield from' + '-'*10)
for fib_res in copy_fib(20):
    print(fib_res)

这种使用方式很简单,但远远不是yield from的全部。yield from 的主要功能是打开双向通道,像一个管道一样把最外层的调用方与最内层的子生成器连接起来,这样二者可以直接发送(send信息传递给内层协程)和产出值(yield from),还可以直接传入异常,而不用在位于中间的协程中添加大量处理异常的样板代码

def stupid_fib(n):
    index = 0
    a = 0
    b = 1
    while index < n:
        sleep_cnt = yield b
        print('let me think {0} secs'.format(sleep_cnt))
        time.sleep(sleep_cnt)
        a, b = b, a + b
        index += 1
        
def copy_stupid_fib(n):
    print('I am copy from stupid fib')
    yield from stupid_fib(n)
    print('Copy end')
print('-'*10 + 'test yield from and send' + '-'*10)
N = 20
csfib = copy_stupid_fib(N)
fib_res = next(csfib)
while True:
    print(fib_res)
    try:
        fib_res = csfib.send(random.uniform(0, 0.5))
    except StopIteration:
        break

如果没有yield from,这里的copy_yield_from将会特别复杂(因为要自己处理各种异常)。

asyncio.coroutine

yield from在asyncio模块中得以发扬光大。先看示例代码:

@asyncio.coroutine
def smart_fib(n):
    index = 0
    a = 0
    b = 1
    while index < n:
        sleep_secs = random.uniform(0, 0.2)
        yield from asyncio.sleep(sleep_secs)
        print('Smart one think {} secs to get {}'.format(sleep_secs, b))
        a, b = b, a + b
        index += 1
 
@asyncio.coroutine
def stupid_fib(n):
    index = 0
    a = 0
    b = 1
    while index < n:
        sleep_secs = random.uniform(0, 0.4)
        yield from asyncio.sleep(sleep_secs)
        print('Stupid one think {} secs to get {}'.format(sleep_secs, b))
        a, b = b, a + b
        index += 1
 
if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    tasks = [
        asyncio.async(smart_fib(10)),
        asyncio.async(stupid_fib(10)),
    ]
    loop.run_until_complete(asyncio.wait(tasks))
    print('All fib finished.')
    loop.close()

asyncio是一个基于事件循环的实现异步I/O的模块。通过yield from,我们可以将协程asyncio.sleep的控制权交给事件循环,然后挂起当前协程;之后,由事件循环决定何时唤醒asyncio.sleep,接着向后执行代码。

这样说可能比较抽象,好在asyncio是一个由python实现的模块,那么我们来看看asyncio.sleep中都做了些什么:

@coroutine
def sleep(delay, result=None, *, loop=None):
    """Coroutine that completes after a given time (in seconds)."""
    future = futures.Future(loop=loop)
    h = future._loop.call_later(delay,
                                future._set_result_unless_cancelled, result)
    try:
        return (yield from future)
    finally:
        h.cancel()

首先,sleep创建了一个Future对象,作为更内层的协程对象,通过yield from交给了事件循环;其次,它通过调用事件循环的call_later函数,注册了一个回调函数。

通过查看Future类的源码,可以看到,Future是一个实现了__iter__对象的生成器:

class Future:
    #blabla...
    def __iter__(self):
        if not self.done():
            self._blocking = True
            yield self  # This tells Task to wait for completion.
        assert self.done(), "yield from wasn't used with future"
        return self.result()  # May raise too.

那么当我们的协程yield from asyncio.sleep时,事件循环其实是与Future对象建立了练习。每次事件循环调用send(None)时,其实都会传递到Future对象的__iter__函数调用;而当Future尚未执行完毕的时候,就会yield self,也就意味着暂时挂起,等待下一次send(None)的唤醒。

当我们包装一个Future对象产生一个Task对象时,在Task对象初始化中,就会调用Future的send(None),并且为Future设置好回调函数。

class Task(futures.Future):
    #blabla...
    def _step(self, value=None, exc=None):
        #blabla...
        try:
            if exc is not None:
                result = coro.throw(exc)
            elif value is not None:
                result = coro.send(value)
            else:
                result = next(coro)
        #exception handle
        else:
            if isinstance(result, futures.Future):
                # Yielded Future must come from Future.__iter__().
                if result._blocking:
                    result._blocking = False
                    result.add_done_callback(self._wakeup)
        #blabla...
 
    def _wakeup(self, future):
        try:
            value = future.result()
        except Exception as exc:
            # This may also be a cancellation.
            self._step(None, exc)
        else:
            self._step(value, None)
        self = None  # Needed to break cycles when an exception occurs.

预设的时间过后,事件循环将调用Future._set_result_unless_cancelled:

class Future:
    #blabla...
    def _set_result_unless_cancelled(self, result):
        """Helper setting the result only if the future was not cancelled."""
        if self.cancelled():
            return
        self.set_result(result)
 
    def set_result(self, result):
        """Mark the future done and set its result.
 
        If the future is already done when this method is called, raises
        InvalidStateError.
        """
        if self._state != _PENDING:
            raise InvalidStateError('{}: {!r}'.format(self._state, self))
        self._result = result
        self._state = _FINISHED
        self._schedule_callbacks()

这将改变Future的状态,同时回调之前设定好的Tasks._wakeup;在_wakeup中,将会再次调用Tasks._step,这时,Future的状态已经标记为完成,因此,将不再yield self,而return语句将会触发一个StopIteration异常,此异常将会被Task._step捕获用于设置Task的结果。同时,整个yield from链条也将被唤醒,协程将继续往下执行。

async和await

弄清楚了asyncio.coroutine和yield from之后,在Python3.5中引入的async和await就不难理解了:可以将他们理解成asyncio.coroutine/yield from的完美替身。当然,从Python设计的角度来说,async/await让协程表面上独立于生成器而存在,将细节都隐藏于asyncio模块之下,语法更清晰明了。

async def smart_fib(n):
    index = 0
    a = 0
    b = 1
    while index < n:
        sleep_secs = random.uniform(0, 0.2)
        await asyncio.sleep(sleep_secs)
        print('Smart one think {} secs to get {}'.format(sleep_secs, b))
        a, b = b, a + b
        index += 1
 
async def stupid_fib(n):
    index = 0
    a = 0
    b = 1
    while index < n:
        sleep_secs = random.uniform(0, 0.4)
        await asyncio.sleep(sleep_secs)
        print('Stupid one think {} secs to get {}'.format(sleep_secs, b))
        a, b = b, a + b
        index += 1
 
if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    tasks = [
        asyncio.ensure_future(smart_fib(10)),
        asyncio.ensure_future(stupid_fib(10)),
    ]
    loop.run_until_complete(asyncio.wait(tasks))
    print('All fib finished.')
    loop.close()

使用案例

协程能自然地表述很多算法,例如仿真、游戏、异步 I/O,以及其他事件驱动型编程形式或协作式多任务。协程是 asyncio 包的基础构建。通过仿真系统能说明如何使用协程代替线程实现并发的活动。

在仿真领域,进程这个术语指代模型中某个实体的活动,与操作系统中的进程无关。仿真系统中的一个进程可以使用操作系统中的一个进程实现,但是通常会使用一个线程或一个协程实现。

出租车示例

import collections

# time 字段是事件发生时的仿真时间,
# proc 字段是出租车进程实例的编号,
# action 字段是描述活动的字符串。
Event = collections.namedtuple('Event', 'time proc action')


def taxi_process(proc_num, trips_num, start_time=0):
    """
    每次改变状态时创建事件,把控制权让给仿真器
    :param proc_num:
    :param trips_num:
    :param start_time:
    :return:
    """
    time = yield Event(start_time, proc_num, 'leave garage')

    for i in range(trips_num):
        time = yield Event(time, proc_num, 'pick up people')
        time = yield Event(time, proc_num, 'drop off people')

    yield Event(time, proc_num, 'go home')

# run
t1 = taxi_process(1, 1)
a = next(t1)    
print(a)    # Event(time=0, proc=1, action='leave garage')
b = t1.send(a.time + 6)
print(b)    # Event(time=6, proc=1, action='pick up people')
c = t1.send(b.time + 12)
print(c)    # Event(time=18, proc=1, action='drop off people')
d = t1.send(c.time + 1)
print(d)    # Event(time=19, proc=1, action='go home')

模拟控制台控制3个出租车异步

import collections
import queue
import random

# time 字段是事件发生时的仿真时间,
# proc 字段是出租车进程实例的编号,
# action 字段是描述活动的字符串。
Event = collections.namedtuple('Event', 'time proc action')


def taxi_process(proc_num, trips_num, start_time=0):
    """
    每次改变状态时创建事件,把控制权让给仿真器
    :param proc_num:
    :param trips_num:
    :param start_time:
    :return:
    """
    time = yield Event(start_time, proc_num, 'leave garage')

    for i in range(trips_num):
        time = yield Event(time, proc_num, 'pick up people')
        time = yield Event(time, proc_num, 'drop off people')

    yield Event(time, proc_num, 'go home')


class SimulateTaxi(object):
    """
    模拟出租车控制台
    """

    def __init__(self, proc_map):
        # 保存排定事件的 PriorityQueue 对象,
        # 如果进来的是tuple类型,则默认使用tuple[0]做排序
        self.events = queue.PriorityQueue()
        # procs_map 参数是一个字典,使用dict构建本地副本
        self.procs = dict(proc_map)

    def run(self, end_time):
        """
        排定并显示事件,直到时间结束
        :param end_time:
        :return:
        """
        for _, taxi_gen in self.procs.items():
            leave_evt = next(taxi_gen)
            self.events.put(leave_evt)

        # 仿真系统的主循环
        simulate_time = 0
        while simulate_time < end_time:
            if self.events.empty():
                print('*** end of events ***')
                break

            # 第一个事件的发生
            current_evt = self.events.get()
            simulate_time, proc_num, action = current_evt
            print('taxi:', proc_num, ', at time:', simulate_time, ', ', action)

            # 准备下个事件的发生
            proc_gen = self.procs[proc_num]
            next_simulate_time = simulate_time + self.compute_duration()

            try:
                next_evt = proc_gen.send(next_simulate_time)
            except StopIteration:
                del self.procs[proc_num]
            else:
                self.events.put(next_evt)
        else:
            msg = '*** end of simulation time: {} events pending ***'
            print(msg.format(self.events.qsize()))

    @staticmethod
    def compute_duration():
        """
        随机产生下个事件发生的时间
        :return:
        """
        duration_time = random.randint(1, 20)
        return duration_time


# 生成3个出租车,现在全部都没有离开garage
taxis = {i: taxi_process(i, (i + 1) * 2, i * 5)
         for i in range(3)}

# 模拟运行
st = SimulateTaxi(taxis)
st.run(100)

以上是关于Python 协程的主要内容,如果未能解决你的问题,请参考以下文章

Python中的协程与asyncio原理

Python中的协程与asyncio原理

Python中的协程与asyncio原理

python3之协程

python协程

python协程