Python实战之用内置模块来构建REST服务RPC服务

Posted 山河已无恙

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Python实战之用内置模块来构建REST服务RPC服务相关的知识,希望对你有一定的参考价值。

写在前面


  • 和小伙伴们分享一些Python 网络编程的一些笔记,博文为《Python Cookbook》读书后笔记整理

  • 博文涉及内容包括:

    • TCP/UDP服务构建
    • 不使用框架创建一个REST风格的web服务
    • 基于XML-RPC实现简单的RPC
    • 基于multiprocessing.connection实现简单的RPC
    • python实现作为客户端与HTTP服务交互
  • 理解不足小伙伴帮忙指正

傍晚时分,坐在屋檐下,看着天慢慢地黑下去,心里寂寞而凄凉,感到自己的生命被剥夺了。当时我是个年轻人,但我害怕这样生活下去,衰老下去。在我看来,这是比死亡更可怕的事。--------王小波


在Python中,构建一个静态Web服务器,只需要 python3 -m http.server 端口号( 端口号不指定默认是8000) 这一条命令就可以搞定了,之前也有看到有公司内网中,一些安装包放到服务器上每次FTP麻烦,用http模块的方式很方便。

python在网络方面封装一些内置模块,可以用很简洁的代码实现端到端的通信,比如HTTP、RPC服务等。

在编写RPC和REST服务之前,先来温习一下常见的的基于Socket模块的一些端到端的通信协议。不管是RPC还是REST都需要底层的通信协议来支持。

对于TCP和UPD协议,在常见的网络通信中,浏览器,邮件等一般应用程序在收发数据时都是通过TCP协议的,DNS等收发较短的控制数据时一般会使用UDP。

创建TCP服务

实现一个服务器,通过 TCP 协议和客户端通信。

创建一个 TCP 服务器的一个简单方法是使用socketserver库。一起来温习下面这个简单的TCP服务器

from socketserver import BaseRequestHandler, TCPServer

class EchoHandler(BaseRequestHandler):
    def handle(self):
        print('Got connection from', self.client_address)
        while True:
            #接收客户端发送的数据, 这次接收数据的最大字节数是8192
            msg = self.request.recv(8192)
            # 接收的到数据在发送回去
            if not msg:
                break
            self.request.send(msg)

if __name__ == '__main__':
    # 20000端口,默认IP为本地IP,监听到消息交个EchoHandler处理器
    serv = TCPServer(('', 20000), EchoHandler)
    serv.serve_forever()

代码很简单,指定IP暴露对应的端口,这里通过serv.serve_forever()来保证连接一直存在。

Got connection from ('127.0.0.1', 1675)

建立好服务端之后看下客户端

  • AF_INET:表示ipv4
  • SOCK_STREAM: tcp传输协议
>>> from socket import socket, AF_INET, SOCK_STREAM
>>> s = socket(AF_INET, SOCK_STREAM)
>>> s.connect(('localhost', 20000))
>>> s.send(b'Hello')
5
>>> s.recv(8192)
b'Hello'
>>>

socketserver 默认情况下这种服务器是单线程的,一次只能为一个客户端连接服务。如果想通过多线程处理多个客户端,可以初始化一个ForkingTCPServer 或者是ThreadingTCPServer对象。

from socketserver import ThreadingTCPServer

if __name__ == '__main__':
    serv = ThreadingTCPServer(('', 20000), EchoHandler)
    serv.serve_forever()

使用 fork 或线程服务器有个潜在问题就是它们会为每个客户端连接创建一个新的进程或线程。由于客户端连接数是没有限制的,因此一个恶意的黑客可以同时发送大量的连接让的服务器奔溃

可以创建一个预先分配大小的 工作线程池或进程池,先创建一个普通的非线程服务器,然后在一个线程池中使用serve forever()方法来启动它们。

if __name__ == '__main__':
    from threading import Thread
    NWORKERS = 16
    serv = TCPServer(('', 20000), EchoHandler)
    for n in range(NWORKERS):
        t = Thread(target=serv.serve_forever)
        t.daemon = True
        t.start()
    serv.serve_forever()

一般来讲,一个TCPServer在实例化的时候会绑定并激活相应的socket,如果 bind_and_activate 为真,则构造方法会自动调用server_bind() 和 server_activate()方法。其余参数传给基类 BaseServer,有时候想通过设置某些选项去调整底下的socket,可以设置参数bind_and_activate=False

if __name__ == '__main__':
    serv = TCPServer(('', 20000), EchoHandler, bind_and_activate=False)
    # Set up various socket options
    serv.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True)
    # Bind and activate
    serv.server_bind()
    serv.server_activate()
    serv.serve_forever()

socket.SO_REUSEADDR允许服务器重新绑定一个之前使用过的端口号。由于要被经常使用到,它被放置到类变量中,可以直接在 TCPServer上面设置

当然,也可以不使用一个工具类,直接使用原生的Socket的编写TCP服务端

from socket import socket, AF_INET, SOCK_STREAM


def echo_handler(address, client_sock):
    print('Got connection from '.format(address))
    while True:
        msg = client_sock.recv(8192)
        if not msg:
            break
        client_sock.sendall(msg)
    client_sock.close()

def echo_server(address, backlog=5):
    sock = socket(AF_INET, SOCK_STREAM)
    sock.bind(address)
    sock.listen(backlog)
    while True:
        client_sock, client_addr = sock.accept()
        echo_handler(client_addr, client_sock)

if __name__ == '__main__':
    echo_server(('', 20000))

了解了TCP的服务通信,在来看一下UDP的

创建UDP服务

实现一个基于 UDP 协议的服务器来与客户端通信。

跟 TCP 一样,UDP 服务器也可以通过使用socketserver库很容易的被创建。例如,下面是一个简单的时间服务器:

from socketserver import BaseRequestHandler, UDPServer
import time
class TimeHandler(BaseRequestHandler):
    def handle(self):
        print('Got connection from', self.client_address)
        # Get message and client socket request 属性是一个包含了数据报和底层 socket 对象的元组
        msg, sock = self.request
        resp = time.ctime()
        sock.sendto(resp.encode('ascii'), self.client_address)

if __name__ == '__main__':
    serv = UDPServer(('', 20000), TimeHandler)
    serv.serve_forever()

测试一下

>>> from socket import socket, AF_INET, SOCK_DGRAM
>>> s = socket(AF_INET, SOCK_DGRAM)
>>> s.sendto(b'', ('localhost', 20000))
0
>>> s.recvfrom(8192)
(b'Tue May  3 11:48:53 2022', ('127.0.0.1', 20000))
>>>

对于UPD协议而言,对于数据报的传送,应该使用 socket 的sendto() 和 recvfrom() 方法,因为是面向无连接的,没有建立连接的步骤,但是要在发生时跟着接受方

了解基本的通信协议之后,回到今天要讲的ERST接口。REST接口是基于HTTP协议的,而HTTP是直接依赖TCP的协议栈,负责约束表示层

创建一个简单的REST接口

使用一个简单的 REST 接口通过网络远程控制或访问的应用程序,但是又不想自己去安装一个完整的 web 框架。

可以构建一个 REST 风格的接口,最简单的方法是创建一个基于 WSGI 标准(Web服务网关接口,PEP 3333)的很小的库。类似支持REST风格的Python Web框架 Flask。

#!/usr/bin/env python
# -*- encoding: utf-8 -*-
"""
@File    :   app.py
@Time    :   2022/05/03 14:43:56
@Author  :   Li Ruilong
@Version :   1.0
@Contact :   1224965096@qq.com
@Desc    :   None
"""

# here put the import lib

import time
import cgi


def notfound_404(environ, start_response):
    start_response('404 Not Found', [('Content-type', 'text/plain')])
    return [b'Not Found']

# 核心控制器,用于路由注册
class PathDispatcher:
    def __init__(self):
        # 映射字典
        self.pathmap = 
    # 核心控制器的回调
    def __call__(self, environ, start_response):
        # 获取路由
        path = environ['PATH_INFO']
        # 获取请求参数
        params = cgi.FieldStorage(environ['wsgi.input'],
                                  environ=environ)
        # 获取请求方法
        method = environ['REQUEST_METHOD'].lower()
        environ['params'] = key: params.getvalue(key) for key in params
        # 找到映射的函数
        handler = self.pathmap.get((method, path), notfound_404)
        # 返回函数
        return handler(environ, start_response)

    def register(self, method, path, function):
        # 请求方法和路由作为K,执行函数为V
        self.pathmap[method.lower(), path] = function
        return function


_hello_resp = "wo jiao name"


def hello_world(environ, start_response):
    start_response('200 OK', [('Content-type', 'text/html')])
    params = environ['params']
    resp = _hello_resp.format(name=params.get('name'))
    yield resp.encode('utf-8')


_localtime_resp = "dang qian shjian t"

# 路由的回调
def localtime(environ, start_response):
    start_response('200 OK', [('Content-type', 'application/xml')])
    resp = _localtime_resp.format(t=time.localtime())
    yield resp.encode('utf-8')


if __name__ == '__main__':
    from wsgiref.simple_server import make_server
    # 创建一个核心控制器,用于路由注册
    dispatcher = PathDispatcher()
    # 注册路由,对应的回调方法
    dispatcher.register('GET', '/hello', hello_world)
    dispatcher.register('GET', '/localtime', localtime)
    # Launch a basic server 监听8080端口,注入核心控制器
    httpd = make_server('', 8080, dispatcher)
    print('Serving on port 8080...')
    httpd.serve_forever()

测试一下

┌──[root@liruilongs.github.io]-[~]
└─$coproc (./app.py)
[2] 130447

curl localhost:8080/hello

┌──[root@liruilongs.github.io]-[~]
└─$curl  localhost:8080/hello
127.0.0.1 - - [03/May/2022 16:09:12] "GET /hello HTTP/1.1" 200 12
wo jiao None

curl localhost:8080/hello?name=liruilong

┌──[root@liruilongs.github.io]-[~]
└─$curl  localhost:8080/hello?name=liruilong
127.0.0.1 - - [03/May/2022 16:09:47] "GET /hello?name=liruilong HTTP/1.1" 200 17
wo jiao liruilong
┌──[root@liruilongs.github.io]-[~]
└─$jobs
....
[2]-  运行中               coproc COPROC ( ./app.py ) &

实现一个简单的 REST 接口,只需让的程序代码满足 Python 的 WSGI标准即可。WSGI 被标准库支持,同时也被绝大部分第三方 web 框架支持

这里感觉Python WebWSGI标准Java Web 体系的Servlet规范特别接近,但是Servlet是侵入式的,同时需要特定的Web容器(Tomcat)支持,而WSGI好像对代码的影响很少…感兴趣小伙伴可以研究下.

另一方面,通过上面的代码,可以对当下这种Web端MVC的设计模式流程(Flask,Django,SpringMVC)有一个基本的认识,当然实际的框架要复杂的多。但是基本构建思路一样。

关于WSGI标准,简单来分析一下

以一个可调用对象形式来实现路由匹配要操作的方法

import cgi

def wsgi_app(environ, start_response):
    pass

environ 属性是一个字典,包含了从 web 服务器如 Apache[参考 Internet RFC 3875]提供的 CGI 接口中获取的值。要将这些不同的值提取出来,可以像这么这样写:

def wsgi_app(environ, start_response):
    method = environ['REQUEST_METHOD']
    path = environ['PATH_INFO']
    # Parse the query parameters
    params = cgi.FieldStorage(environ['wsgi.input'], environ=environ)

start_response 参数是一个为了初始化一个请求对象而必须被调用的函数。第一个参数是返回的 HTTP 状态值,第二个参数是一个 (名, 值) 元组列表,用来构建返回的 HTTP 头

def wsgi_app(environ, start_response):
    pass
    start_response('200 OK', [('Content-type', 'text/plain')])

为了返回数据,一个 WSGI 程序必须返回一个字节字符串序列。可以像下面这样使用一个列表来完成

def wsgi_app(environ, start_response):
    pass
    start_response('200 OK', [('Content-type', 'text/plain')])
    resp = []
    resp.append(b'Hello World\\n')
    resp.append(b'Goodbye!\\n')
    return resp

或者,还可以使用 yield

def wsgi_app(environ, start_response):
    pass
    start_response('200 OK', [('Content-type', 'text/plain')])
    yield b'Hello World\\n'
    yield b'Goodbye!\\n'

最后返回的必须是字节字符串。如果返回结果包含文本字符串,必须先将其编码成字节。图片也是OK的

class WSGIApplication:
    def __init__(self):
        ...
    def __call__(self, environ, start_response)
        ...

PathDispatcher 类。这个分发器仅仅只是管理一个字典,将 (方法, 路径) 对映射到处理器函数上面。当一个请求到来时,它的方法和路径被提取出来,然后被分发到对应的处理器上面去。

    dispatcher = PathDispatcher()
    # 注册路由,对应的回调
    dispatcher.register('GET', '/hello', hello_world)
    dispatcher.register('GET', '/localtime', localtime)

任何查询变量会被解析后放到一个字典中,以 environ['params'] 形式存储。后面这个步骤太常见,所以建议在分发器里面完成,这样可以省掉很多重复代码。使用分发器的时候,只需简单的创建一个实例,然后通过它注册各种 WSGI 形式的函数。编写这些函数应该超级简单了,只要遵循 start_response() 函数的编写规则,并且最后返回字节字符串即可。

WSGI 本身是一个很小的标准。因此它并没有提供一些高级的特性比如认证、cookies、重定向、全局的异常处理等。这些自己实现起来也不难。不过如果想要更多的支持,可以考虑第三方库

上面服务端的构建,我们使用了curl工具来访问,那么作为客户端Python有哪些交互方式?

作为客户端与HTTP服务交互

需要通过 HTTP 协议以客户端的方式访问多种服务。例如,下载数据或者与基于 REST 的 API 进行交互。

对于简单的事情来说,通常使用urllib.request模块就够了.一个Get请求的Demo

┌──[root@liruilongs.github.io]-[~]
└─$python3
Python 3.6.8 (default, Nov 16 2020, 16:55:22)
[GCC 4.8.5 20150623 (Red Hat 4.8.5-44)] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> from urllib import request, parse
>>> url = 'http://httpbin.org/get'
>>> parms = 
...     'name1': 'value1',
...     'name2': 'value2'
... 
>>> querystring = parse.urlencode(parms)
>>> querystring
'name1=value1&name2=value2'
>>> request.urlopen(url+'?' + querystring)
<http.client.HTTPResponse object at 0x7ffa0ef0f710>
>>> u = request.urlopen(url+'?' + querystring)
>>> u.read()
b'
    "args": 
        "name1": "value1",
        "name2": "value2"
    ,
    "headers": 
        "Accept-Encoding": "identity",
        "Host": "httpbin.org",
        "User-Agent": "Python-urllib/3.6",
        "X-Amzn-Trace-Id": "Root=1-62707b15-41a1169c0897c9001a07f948"
    ,
    "origin": "39.154.13.139",
    "url": "http://httpbin.org/get?name1=value1&name2=value2"
'

如果需要使用POST方法在请求主体中发送查询参数,可以将参数编码后作为可选参数提供给urlopen()函数,就像这样:

>>> from urllib import request, parse
>>> url = 'http://httpbin.org/post'
>>> parms = 
... 'name1' : 'value1',
... 'name2' : 'value2'
... 
>>> querystring = parse.urlencode(parms)
>>> querystring.encode('ascii')
b'name1=value1&name2=value2'
>>> u = request.urlopen(url, querystring.encode('ascii'))
>>> resp = u.read()
>>> resp
b'
    "args": ,
    "data": "",
    "files": ,
    "form": 
        "name1": "value1",
        "name2": "value2"
    ,
    "headers": 
        "Accept-Encoding": "identity",
        "Content-Length": "25",
        "Content-Type": "application/x-www-form-urlencoded",
        "Host": "httpbin.org",
        "User-Agent": "Python-urllib/3.6",
        "X-Amzn-Trace-Id": "Root=1-62707d24-15e9944760d3bbaa36c3714a"
    ,
    "json": null,
    "origin": "39.154.13.139",
    "url": "http://httpbin.org/post"
'
>>>

在发出的请求中提供一些自定义的 HTTP 请求首部,创建一个 Request 实例然后将其传给urlopen()

>>> from urllib import request, parse
>>> headers = 
... 'User-agent' : 'none/ofyourbusiness',
... 'Spam' : 'Eggs'
... 
>>> req = request.Request(url, querystring.encode('ascii'), headers=headers)
>>> u = request.urlopen(req)
>>> u.read()
b'
    "args": ,
    "data": "",
    "files": ,
    "form": 
        "name1": "value1",
        "name2": "value2"
    ,
    "headers": 
        "Accept-Encoding": "identity",
        "Content-Length": "25",
        "Content-Type": "application/x-www-form-urlencoded",
        "Host": "httpbin.org",
        "Spam": "Eggs",
        "User-Agent": "none/ofyourbusiness",
        "X-Amzn-Trace-Id": "Root=1-62707f0e-308a8137555e15797d950018"
    ,
    "json": null,
    "origin": "39.154.13.139",
    "url"以上是关于Python实战之用内置模块来构建REST服务RPC服务的主要内容,如果未能解决你的问题,请参考以下文章

Python实战之用内置模块来构建REST服务RPC服务

基于kafka rest实现资源访问服务化(实战)

Rest微服务构建 案例工程模块

Springcloud-Rest微服务构建

SpringCloud学习系列-Rest微服务构建-整体父工程Project

SpringCloud-2.0(4. Rest工程构建)