Flask 启动时的源码简析

Posted neozheng

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flask 启动时的源码简析相关的知识,希望对你有一定的参考价值。

以前看Flask 上下文管理的时候记得新的请求到来的时候 app 会被执行,但一直不太明白 app 被执行的代码在哪里,这两天又大致看了一下Flask 相关的源码。

 

Flask 的启动入口是函数:

if __name__ == "__main__":
    app.run()

点进去可以看到 run() 方法的源码如下:(只截取了部分相关代码)

def run(self, host=None, port=None, debug=None, load_dotenv=True, **options):
    """Runs the application on a local development server.
    """
    
    """
    ...
    ...
    省略...
    """

    from werkzeug.serving import run_simple

    try:
        run_simple(host, port, self, **options)  # 第三个参数 self 即是 app (Flask 的实例对象)
    finally:
        # reset the first request information if the development server
        # reset normally.  This makes it possible to restart the server
        # without reloader and that stuff from an interactive shell.
        self._got_first_request = False

 

从以上代码可以看到, app.run() 实际上调用了 werkzeug serving 中的 run_simple 方法。下面看 run_simple 方法做了些什么。

def run_simple(
    hostname, port, application, use_reloader=False,
    use_debugger=False, use_evalex=True, extra_files=None,
    reloader_interval=1, reloader_type="auto", threaded=False,
    processes=1, request_handler=None, static_files=None, passthrough_errors=False, ssl_context=None,
):
    """Start a WSGI application. 

    :param hostname: The host to bind to, for example ``\'localhost\'``.
        If the value is a path that starts with ``unix://`` it will bind
        to a Unix socket instead of a TCP socket..
    :param port: The port for the server.  eg: ``8080``
    :param application: the WSGI application to execute
    :param use_reloader: should the server automatically restart the python
                         process if modules were changed?
    
    """
    # ...
    # 省略无关代码...

    def inner():
        try:
            fd = int(os.environ["WERKZEUG_SERVER_FD"])
        except (LookupError, ValueError):
            fd = None
        srv = make_server(
            hostname,
            port,
            application,
            threaded,
            processes,
            request_handler,
            passthrough_errors,
            ssl_context,
            fd=fd,
        )
        if fd is None:
            log_startup(srv.socket)
        srv.serve_forever()

    if use_reloader:
        # 省略无关代码 ...
    else:
        inner()

# 从上面代码可以看到,run_simple() 方法实际上调用了其中的 inner 方法, innner 方法执行了 srv.serve_forever()

 

下面我们看下 srv = make_server(...) 得到的是什么,以及 srv.serve_forever() 做了什么。

先看 srv = make_server(...) 的大致代码:

def make_server(
    host=None, port=None, app=None,
    threaded=False, processes=1, request_handler=None, passthrough_errors=False, ssl_context=None, fd=None,
):
    """Create a new server instance that is either threaded, or forks
    or just processes one request after another.
    """
    if threaded and processes > 1:
        raise ValueError("cannot have a multithreaded and multi process server.")
    elif threaded:
        return ThreadedWSGIServer(
            host, port, app, request_handler, passthrough_errors, ssl_context, fd=fd
        )
    elif processes > 1:
        return ForkingWSGIServer(
            host,
            port,
            app,
            processes,
            request_handler,
            passthrough_errors,
            ssl_context,
            fd=fd,
        )
    else:
        return BaseWSGIServer(
            host, port, app, request_handler, passthrough_errors, ssl_context, fd=fd
        )

# 从上述代码可以看出, make_server 函数得到了 一个 WSGIServer 的实例

 

以 BaseWSGIServer 为例,我们看下 BaseWSGIServer 实例化得到了什么结果:

class BaseWSGIServer(HTTPServer, object):

    """Simple single-threaded, single-process WSGI server."""

    multithread = False
    multiprocess = False
    request_queue_size = LISTEN_QUEUE

    def __init__(
        self,
        host,
        port,
        app,
        handler=None,
        passthrough_errors=False,
        ssl_context=None,
        fd=None,
    ):
        if handler is None:
            handler = WSGIRequestHandler    # 默认的请求处理器 handler 为 WSGIRequestHandler

        self.address_family = select_address_family(host, port)

        if fd is not None:
            # 省略 ...

        server_address = get_sockaddr(host, int(port), self.address_family)

        # remove socket file if it already exists
        if self.address_family == af_unix and os.path.exists(server_address):
            os.unlink(server_address)

        HTTPServer.__init__(self, server_address, handler)

        self.app = app      # 把 Flask 的实例对象 app 添加到了 WSGIServer 实例对象的app 属性中

        self.passthrough_errors = passthrough_errors
        self.shutdown_signal = False
        self.host = host
        self.port = self.socket.getsockname()[1]

        # 省略 ...

    
    def serve_forever(self):
        self.shutdown_signal = False
        try:
            HTTPServer.serve_forever(self)
        except KeyboardInterrupt:
            pass
        finally:
            self.server_close()

    def get_request(self):
        con, info = self.socket.accept()
        return con, info

 

从上面代码看出, BaseWSGIServer 实例化的时候大致做的事情:

  1.  确定了请求处理器 handler 默认为 WSGIRequestHandler ; 
  2. BaseWSGIServer的父类 HTTPServer 利用 server_address 和 请求处理器 handler 进行实例化;
  3. 把 Flask 的实例对象app 添加到 BaseWSGIServer 实例对象的 app 属性中。

下面我们看下 HTTPServer.__init__(self, server_address, handler) 这行代码大概做了什么:(HTTPServer没有自己的 __init__ 方法,所以调用其父类 socketserver.TCPServer 的 __init__ 方法)

class TCPServer(BaseServer):

    address_family = socket.AF_INET

    socket_type = socket.SOCK_STREAM

    request_queue_size = 5

    allow_reuse_address = False

    def __init__(self, server_address, RequestHandlerClass, bind_and_activate=True):
        """Constructor.  May be extended, do not override."""
BaseServer.__init__(self, server_address, RequestHandlerClass) # 执行了一些操作
self.socket
= socket.socket(self.address_family, self.socket_type) if bind_and_activate: try: # 绑定IP和端口,并且开始监听 self.server_bind() self.server_activate() except: self.server_close() raise def server_bind(self): """Called by constructor to bind the socket. May be overridden. """ if self.allow_reuse_address: self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.socket.bind(self.server_address) self.server_address = self.socket.getsockname() def server_activate(self): """Called by constructor to activate the server. May be overridden. """ self.socket.listen(self.request_queue_size)

由上可以看出, HTTPServer 利用 server_address 和 请求处理器 handler 进行实例化的时候,做了两件事:

  1. HTTPServer 的父类 BaseServer 利用 server_address 和 请求处理器 handler 的一个类(默认为 WSGIRequestHandler ),进行了一次实例化(得到的效果我们下面会接着分析);
  2. 绑定IP端口,并且开始监听

我们下面看下  BaseServer.__init__(self, server_address, RequestHandlerClass) 做了什么:

class BaseServer:
    
    def __init__(self, server_address, RequestHandlerClass):
        """Constructor.  May be extended, do not override."""
        self.server_address = server_address    # 把 server_address 添加到 WSGIServer 实例对象的 server_address 属性中
        self.RequestHandlerClass = RequestHandlerClass    # 把 RequestHandlerClass 添加到WSGIServer 实例对象的 RequestHandlerClass 属性中,默认为 WSGIRequestHandler
        self.__is_shut_down = threading.Event()
        self.__shutdown_request = False

BaseServer.__init__(self, server_address, RequestHandlerClass) 做了一个很重要的事就是在  WSGIServer 的实例对象中添加了 请求处理器的属性 WSGIRequestHandler

 

由上可知, run_simple 中做了一件很重要的事就是: srv = make_server(...); 

srv = make_server(...) 实现的主要作用有:

  1. make_server() 返回了一个WSGIServer的实例化对象;
  2. BaseWSGIServer 利用 host, port, app, request_handler 等参数进行实例化;
  3. 把 Flask 的实例化对象 app 添加到 BaseWSGIServer 的实例化对象的 app 属性中;
  4. 绑定IP和端口,并且开始监听;
  5. 给 WSGIServer 的实例化对象添加了 请求处理器的属性 RequestHandlerClass

 

下面我再看下 srv.serve_forever() 都做了些什么。

serve_forever 方法的代码如下:

BaseWSGIServer 中的 serve_forever 方法的代码如下:

class BaseWSGIServer(HTTPServer, object):
    def serve_forever(self):
        self.shutdown_signal = False
        try:
            HTTPServer.serve_forever(self)
        except KeyboardInterrupt:
            pass
        finally:
            self.server_close()

# srv 是 BaseWSGIServer 的实例化对象,srv在调用 serve_forever 的时候,实际上执行的是  HTTPServer.serve_forever(self)

 

HTTPServer.serve_forever(self) 的代码如下:
class BaseServer:
    def serve_forever(self, poll_interval=0.5):
        """Handle one request at a time until shutdown.

        Polls for shutdown every poll_interval seconds. Ignores
        self.timeout. If you need to do periodic tasks, do them in
        another thread.
        """
        self.__is_shut_down.clear()
        try:
            
            with _ServerSelector() as selector:
                selector.register(self, selectors.EVENT_READ)

                while not self.__shutdown_request:
                    # 这里有个 while 的死循环,就是 block 住在等待新请求的到来
                    ready = selector.select(poll_interval)
                    if ready:     # 如果来了新的请求,就执行 _handle_request_noblock 方法
                        self._handle_request_noblock()

                    self.service_actions()
        finally:
            self.__shutdown_request = False
            self.__is_shut_down.set()

下面我们看下 _handle_request_noblock 方法相关的代码:

class BaseServer:

    def _handle_request_noblock(self):
        """Handle one request, without blocking.

        I assume that selector.select() has returned that the socket is
        readable before this function was called, so there should be no risk of
        blocking in get_request().
        """
        try:
            request, client_address = self.get_request()    # self.get_request() 就相当于 socket.accept(),所以得到的 request 是一个 connection
        except OSError:
            return
        if self.verify_request(request, client_address):
            try:
                self.process_request(request, client_address)   # 这个是处理请求的代码 
            except Exception:
                self.handle_error(request, client_address)
                self.shutdown_request(request)
            except:
                self.shutdown_request(request)
                raise
        else:
            self.shutdown_request(request)

    def process_request(self, request, client_address):
        """Call finish_request.

        Overridden by ForkingMixIn and ThreadingMixIn.

        """
        self.finish_request(request, client_address)    # finish_request是具体处理请求的方法
        self.shutdown_request(request)

    def finish_request(self, request, client_address):
        """Finish one request by instantiating RequestHandlerClass."""
        # 由于执行的是 srv.serve_forever()方法,所以此处的 self 指的是 BaseWSGIServer 的实例化对象;
        # 由上面的代码分析可知, self.RequestHandlerClass 是一个请求处理的类,默认是 WSGIRequestHandler;
        # 所以,请求的处理过程实际上就是 WSGIRequestHandler 的实例化过程
        self.RequestHandlerClass(request, client_address, self)

 

下面我们看下 WSGIRequestHandler 的实例化过程都做了哪些事情,这些事情即是请求的处理过程:

WSGIRequestHandler类的继承关系如下:

class WSGIRequestHandler(BaseHTTPRequestHandler, object): # WSGIRequestHandler 继承 BaseHTTPRequestHandler

class BaseHTTPRequestHandler(socketserver.StreamRequestHandler): # BaseHTTPRequestHandler 继承 socketserver.StreamRequestHandler

class StreamRequestHandler(BaseRequestHandler): # StreamRequestHandler 继承 BaseRequestHandler

# 这几个类中只有 BaseRequestHandler 有 __init__ 方法,所以 WSGIRequestHandler 在实例化的时候会调用 BaseRequestHandler 的 __init__ 方法

 

BaseRequestHandler 的 __init__ 方法 代码如下:
class BaseRequestHandler:
    """Base class for request handler classes.
    """

    def __init__(self, request, client_address, server):
        self.request = request
        self.client_address = client_address
        self.server = server    # server 即为 BaseWSGIServer 的实例化对象 srv
        self.setup()
        try:
            self.handle()     # 实例化的过程中主要是执行了这个方法
        finally:
            self.finish()

    def setup(self):
        pass

    def handle(self):
        pass

    def finish(self):
        pass

从上面可知,  WSGIRequestHandler(request, client_address, self) (self 为 BaseWSGIServer 的实例化对象 srv)在实例化的过程中,主要是执行了其 handle() 方法。

下面我们看下 handle 方法的代码:

class WSGIRequestHandler(BaseHTTPRequestHandler, object):
    def handle(self):
        """Handles a request ignoring dropped connections."""
        rv = None
        try:
            rv = BaseHTTPRequestHandler.handle(self)    # 1. 让其父类 BaseHTTPRequestHandler 调用 handle 方法;如下是 BaseHTTPRequestHandler 的 handle 方法
        except (_ConnectionError, socket.timeout) as e:
            self.connection_dropped(e)
        except Exception as e:
            if self.server.ssl_context is None or not is_ssl_error(e):
                raise
        if self.server.shutdown_signal:
            self.initiate_shutdown()
        return rv

    def handle_one_request(self):
        """Handle a single HTTP request."""
        self.raw_requestline = self.rfile.readline()
        if not self.raw_requestline:
            self.close_connection = 1
        elif self.parse_request():
            return self.run_wsgi()      # 3. 所以 WSGIRequestHandler 在实例化的过程中,实质上是调用了其 run_wsgi() 方法

    def run_wsgi(self):
        
        # 省略无关代码 ...

        def execute(app):
            application_iter = app(environ, start_response)     # 5. 重头戏:这行代码就是 app 被执行的代码 !!! app() 在执行的过程中传参为 environ 和 start_response
            try:
                for data in application_iter:
                    write(data)
                if not headers_sent:
                    write(b"")
            finally:
                if hasattr(application_iter, "close"):
                    application_iter.close()
                application_iter = None

        try:
            execute(self.server.app)    # 4. run_wsgi 调用了 execute 方法; self.server 指的是 BaseWSGIServer 的实例化 srv,而且 BaseWSGIServer 在实例化的过程中也把 Flask 的实例化对象 app 添加到了 srv.app 属性中
        except (_ConnectionError, socket.timeout) as e:
            self.connection_dropped(e, environ)
        except Exception:
            # 省略无关代码...


class BaseHTTPRequestHandler(socketserver.StreamRequestHandler):
    def handle(self):
        """Handle multiple requests if necessary."""
        self.close_connection = True

        self.handle_one_request()   # 2. 根据继承关系,调用其子类WSGIRequestHandler handle_one_request 方法来处理一次请求 
        while not self.close_connection:
            self.handle_one_request()

 

从上面的源码的第5步可以看出,app() 在执行的过程中传入的参数是 environ 和 start_response,app() 执行会调用 Flask 的 __call__ 方法,而且 Flask 的 __call__(...) 需要传入的参数也是 environ 和 start_response,如下所示:

class Flask(_PackageBoundObject):
    def __call__(self, environ, start_response):
        """The WSGI server calls the Flask application object as the
        WSGI application. This calls :meth:`wsgi_app` which can be
        wrapped to applying middleware."""
        return self.wsgi_app(environ, start_response) 

然后,由上面的 wsgi_app ,就引出了 flask 的上下文管理机制。

 

以上是关于Flask 启动时的源码简析的主要内容,如果未能解决你的问题,请参考以下文章

80Flask用法简析

ManActivity + 启动时的片段差异

Tomcat启动流程简析

Flask的key与pin安全简析

flask-web ——RPC实际项目业务简析

第五节:JQuery框架源码简析