asyncio事件循环原理

Posted wodeboke-y

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了asyncio事件循环原理相关的知识,希望对你有一定的参考价值。

asyncio事件循环原理


1. 总体

1.1. 任务创建

任务创建使用create_task方法。

  
    def create_task(self, coro):  
        """Schedule a coroutine object.  

        Return a task object.  
        """  
        self._check_closed()  
        if self._task_factory is None:  
            task = tasks.Task(coro, loop=self)  
            if task._source_traceback:  
                del task._source_traceback[-1]  
        else:  
            task = self._task_factory(self, coro)  
        return task  

Task()实例化。

  
    def __init__(self, coro, *, loop=None):  
        super().__init__(loop=loop)  
        if self._source_traceback:  
            del self._source_traceback[-1]  
        if not coroutines.iscoroutine(coro):  
            # raise after Future.__init__(), attrs are required for __del__  
            # prevent logging for pending task in __del__  
            self._log_destroy_pending = False  
            raise TypeError(f"a coroutine was expected, got {coro!r}")  

        self._must_cancel = False  
        self._fut_waiter = None  
        self._coro = coro  
        self._context = contextvars.copy_context()  

        self._loop.call_soon(self.__step, context=self._context)  
        _register_task(self)  

self._loop.call_soon(self.__step, context=self._context)

call_soon的作用是把方法添加到loop的预执行队列中。
也就是loop._ready,它是collections.deque()
具体实现后文单列章节。

它添加的是Task._step()方法

  
    def __step(self, exc=None):  
        if self.done():  
            raise futures.InvalidStateError(  
                f‘_step(): already done: {self!r}, {exc!r}‘)  
        if self._must_cancel:  
            if not isinstance(exc, futures.CancelledError):  
                exc = futures.CancelledError()  
            self._must_cancel = False  
        coro = self._coro  
        self._fut_waiter = None  

        _enter_task(self._loop, self)  
        # Call either coro.throw(exc) or coro.send(None).  
        try:  
            if exc is None:  
                # We use the `send` method directly, because coroutines  
                # don‘t have `__iter__` and `__next__` methods.  
                result = coro.send(None)  
            else:  
                result = coro.throw(exc)  
        except StopIteration as exc:  
            if self._must_cancel:  
                # Task is cancelled right before coro stops.  
                self._must_cancel = False  
                super().set_exception(futures.CancelledError())  
            else:  
                super().set_result(exc.value)  
        except futures.CancelledError:  
            super().cancel()  # I.e., Future.cancel(self).  
        except Exception as exc:  
            super().set_exception(exc)  

去掉不太重要的部分后,核心就这么几句:
coro = self._coro = 传入的协程对象
try, send, throw, StopIteration,CancelledError
就就比较熟悉了,基本就是send值,再根据

1.2. 循环及任务执行

  
    def run_forever(self):  
        """Run until stop() is called."""  
        self._check_closed()  
        self._check_runnung()  
        self._set_coroutine_origin_tracking(self._debug)  
        self._thread_id = threading.get_ident()  

        old_agen_hooks = sys.get_asyncgen_hooks()  
        sys.set_asyncgen_hooks(firstiter=self._asyncgen_firstiter_hook,  
                               finalizer=self._asyncgen_finalizer_hook)  
        try:  
            events._set_running_loop(self) # 将loop,pid保存到_RunningLoop()类中,主要是保存相关信息  
            while True:  
                self._run_once() # 核心句  
                if self._stopping:  
                    break  
        finally:  
            self._stopping = False  
            self._thread_id = None  
            events._set_running_loop(None)  
            self._set_coroutine_origin_tracking(False)  
            sys.set_asyncgen_hooks(*old_agen_hooks)  

其它的开始循环基本都是调用这个方法。
核心就是while循环了,关键句是self._run_once()

  
    def _run_once(self):  
        """Run one full iteration of the event loop.  

        This calls all currently ready callbacks, polls for I/O,  
        schedules the resulting callbacks, and finally schedules  
        ‘call_later‘ callbacks.  
        """  



        ntodo = len(self._ready)  
        for i in range(ntodo):  
            handle = self._ready.popleft()  
            if handle._cancelled:  
                continue  
            if self._debug:  
                try:  
                    self._current_handle = handle  
                    t0 = self.time()  
                    handle._run()  
                    dt = self.time() - t0  
                    if dt >= self.slow_callback_duration:  
                        logger.warning(‘Executing %s took %.3f seconds‘,  
                                       _format_handle(handle), dt)  
                finally:  
                    self._current_handle = None  
            else:  
                handle._run()  #核心句  

运行loop的全部迭代。它调用就绪态的callbacks,轮询I/O,安排回调,最终执行call_later。

最终做的事有两件:

  • 从_ready中取出popleft元素,是handle对象。
  • 执行handle._run()

1.3. call_soon

  
    def call_soon(self, callback, *args, context=None):  
        """Arrange for a callback to be called as soon as possible.  

        This operates as a FIFO queue: callbacks are called in the  
        order in which they are registered.  Each callback will be  
        called exactly once.  

        Any positional arguments after the callback will be passed to  
        the callback when it is called.  
        """  
        self._check_closed()  
        if self._debug:  
            self._check_thread()  
            self._check_callback(callback, ‘call_soon‘)  
        handle = self._call_soon(callback, args, context)  
        if handle._source_traceback:  
            del handle._source_traceback[-1]  
        return handle  

    def _call_soon(self, callback, args, context):  
        handle = events.Handle(callback, args, self, context)  
        if handle._source_traceback:  
            del handle._source_traceback[-1]  
        self._ready.append(handle)  
        return handle  

关键句
构建 handle = events.Handle(callback, args, self, context)
添加到待执行队列 self._ready.append(handle)

1.4. 总结

总结一下asyncio的实现思路
有一个任务调度器event loop,我们可以把需要执行的coroutine打包成task加入到event loop的调度列表里面(以Handle形式)。

在event loop的每个帧里面,它会检查需要执行那些task,然后运行这些task,可能拿到最终结果,也可能执行一半继续await别的任务,任务之间互相wait,通过回调来把任务串联起来(后面常用接口会继续深入介绍,实现细节见附录2)。

任务可能会依赖别的IO消息,在每一帧,event loop都会用selector处理相应的消息,执行相应的callback函数。

我们当前的介绍里,只有一个event loop,这个event loop跑在主线程里面。当然,event loop还可以开线程池处理别的任务,或者,多个线程里执行多个event loop,他们之间还有交互,我们这里不在介绍。

单个event loop跑在单个线程有个好处,只要自己不主动await,就会一直占有主线程,换句话说,同步函数一定没有数据冲突(data racing)。对比多线程方案,如果需要处理数据冲突,就需要加锁了,这在很多情况下会降低程序的性能。所以协程这种设计思路,非常适合有多个用户、但是每个用户之间没有共享数据的场景。如果需要实现并行,多开几个进程就行了。










以上是关于asyncio事件循环原理的主要内容,如果未能解决你的问题,请参考以下文章

自己手写调度器,理解Python中的asyncio异步事件循环与协程

自己手写调度器,理解Python中的asyncio异步事件循环与协程

自己手写调度器,理解Python中的asyncio异步事件循环与协程

9Python Asyncio异步编程-事件循环详解

8Python Asyncio异步编程-事件循环详解

Asyncio之EventLoop笔记