10 分钟入门 python rpc 实现
Posted 游戏不存在
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了10 分钟入门 python rpc 实现相关的知识,希望对你有一定的参考价值。
XML-RPC 是一种远程过程调用方法,它使用通过 HTTP 传递的 XML 作为载体。有了它,客户端可以在远程服务器上调用带参数的服务器方法(服务器以 URI 命名)并获取结构化的数据。python自带xmlrpc实现,学习xmlrpc,可以让我们快速了解rpc的实现及原理,本文包括下面几个部分:
-
xmlrpc演示 -
xmlrpc-API -
xmlrpc-server实现 -
xmlrpc-client实现 -
xmlrpc序列化/反序列化 -
小结
xmlrpc演示
xmlrpc可以直接运行,启动rpc服务:
# python3 -m xmlrpc.server
Serving XML-RPC on localhost port 8000
It is advisable to run this example server within a secure, closed network.
127.0.0.1 - - [05/May/2021 18:03:16] "POST /RPC2 HTTP/1.1" 200 -
127.0.0.1 - - [05/May/2021 18:03:16] "POST /RPC2 HTTP/1.1" 200 -
启动rpc客户端:
# python3 -m xmlrpc.client
20210505T18:03:16
42
512
3
从服务端可以看到2个http请求。我们先看抓包得到的第一个http请求报文:
METHOD: POST
URL: http://localhost:8000/RPC2
HEADERS
accept-encoding: gzip
content-length: 120
content-type: text/xml
host: localhost:8000
user-agent: Python-xmlrpc/3.8
请求的数据是:
<?xml version='1.0'?>
<methodCall>
<methodName>
currentTime.getCurrentTime
</methodName>
<params>
</params>
</methodCall>
http响应报文:
STATUS: 200 OK
HEADERS
content-length: 163
content-type: text/xml
date: Wed, 05 May 2021 09:40:57 GMT
server: BaseHTTP/0.6 Python/3.8.5
响应的数据是:
<?xml version='1.0'?>
<methodResponse>
<params>
<param>
<value>
<dateTime.iso8601>
20210505T18:03:16
</dateTime.iso8601>
</value>
</param>
</params>
</methodResponse>
第二个请求的数据报文我就不贴出来了,下面是请求xml:
<?xml version='1.0'?>
<methodCall>
<methodName>
system.multicall
</methodName>
<params>
<param>
<value>
<array>
<data>
<value>
<struct>
<member>
<name>
methodName
</name>
<value>
<string>
getData
</string>
</value>
</member>
<member>
<name>
params
</name>
<value>
<array>
<data>
</data>
</array>
</value>
</member>
</struct>
</value>
<value>
<struct>
<member>
<name>
methodName
</name>
<value>
<string>
pow
</string>
</value>
</member>
<member>
<name>
params
</name>
<value>
<array>
<data>
<value>
<int>
2
</int>
</value>
<value>
<int>
9
</int>
</value>
</data>
</array>
</value>
</member>
</struct>
</value>
<value>
<struct>
<member>
<name>
methodName
</name>
<value>
<string>
add
</string>
</value>
</member>
<member>
<name>
params
</name>
<value>
<array>
<data>
<value>
<int>
1
</int>
</value>
<value>
<int>
2
</int>
</value>
</data>
</array>
</value>
</member>
</struct>
</value>
</data>
</array>
</value>
</param>
</params>
</methodCall>
下面是响应xml:
<?xml version='1.0'?>
<methodResponse>
<params>
<param>
<value>
<array>
<data>
<value>
<array>
<data>
<value>
<string>
42
</string>
</value>
</data>
</array>
</value>
<value>
<array>
<data>
<value>
<int>
512
</int>
</value>
</data>
</array>
</value>
<value>
<array>
<data>
<value>
<int>
3
</int>
</value>
</data>
</array>
</value>
</data>
</array>
</value>
</param>
</params>
</methodResponse>
注: 为了完整展示xmlrpc数据,所以我贴了xml全文,导致内容有点长。
从演示可以看到xmlrpc下面几个特点:
-
使用http协议进行数据传输。使用 POST
方法,url是RPC2
,content-type是text/xml
。 -
使用xml对请求/响应进行编码。请求使用 methodCall
标签, 响应使用methodResponse
标签。 -
使用xml数据层层嵌套,冗余较多,看起来非常烦琐 (这应该是xmlrpc没流行起来的原因之一)。
xmlrpc-API
继续查看xmlrpc的API使用,服务端代码:
class ExampleService:
def getData(self):
return '42'
class currentTime:
@staticmethod
def getCurrentTime():
return datetime.datetime.now()
with SimpleXMLRPCServer(("localhost", 8000)) as server:
server.register_function(pow)
server.register_function(lambda x,y: x+y, 'add')
server.register_instance(ExampleService(), allow_dotted_names=True)
server.register_multicall_functions()
print('Serving XML-RPC on localhost port 8000')
print('It is advisable to run this example server within a secure, closed network.')
try:
server.serve_forever()
except KeyboardInterrupt:
print("\nKeyboard interrupt received, exiting.")
sys.exit(0)
服务端做了下面几件事:
-
在8000端口创建SimpleXMLRPCServer的实例server -
向server注册 pow
和名为add
的lambda函数接口 -
向server注册服务实例instance(可能叫app更合适),instance带有2个函数实现: getData
和currentTime.getCurrentTime
-
向server注册system.multicall实现,这个实现支持多个rpc请求合并使用一个http请求 -
启动server
客户端是这样使用的:
server = ServerProxy("http://localhost:8000")
print(server.currentTime.getCurrentTime())
multi = MultiCall(server)
multi.getData()
multi.pow(2,9)
multi.add(1,2)
try:
for response in multi():
print(response)
except Error as v:
print("ERROR", v)
-
创建了一个服务代理(可以理解为rpc-client) -
调用服务端的实现 currentTime.getCurrentTime
-
使用MultiCall的方式调用 getData
,pow
和add
三个rpc接口 -
发送multicall请求,并循环打印服务调用结果
可以很明显的对比出xmlrpc服务和http服务的不同:
-
rpc服务接口都是普通的函数,比如pow,getData和getCurrentTime;这些接口和http的request和response是隔离的 -
客户端需要额外实现,并不是直接发送的http请求
同时大家对 RPC(remote procedure call) 应该也有直观了解,简单的解释就是远程函数调用。所谓远程:跨机器是远程,我们这里的跨进程也是远程。至于如何实现远程函数调用,就是各个RPC框架的功能了,今天我们先看看xmlrpc的实现。
xmlrpc-server的实现
服务端http协议实现
xmlrpc中http协议由SimpleXMLRPCServer和SimpleXMLRPCRequestHandler实现:
class SimpleXMLRPCServer(socketserver.TCPServer,
SimpleXMLRPCDispatcher):
allow_reuse_address = True
_send_traceback_header = False
def __init__(self, addr, requestHandler=SimpleXMLRPCRequestHandler,
logRequests=True, allow_none=False, encoding=None,
bind_and_activate=True, use_builtin_types=False):
self.logRequests = logRequests
SimpleXMLRPCDispatcher.__init__(self, allow_none, encoding, use_builtin_types)
socketserver.TCPServer.__init__(self, addr, requestHandler, bind_and_activate)
SimpleXMLRPCServer的父类TCPServer在之前的博文中有介绍,提供tcp服务的实现。SimpleXMLRPCRequestHandler负责http协议部分的实现,而xmlrpc规范是必须使用POST请求到 /RPC2
重点就在 do_POST 方法:
class SimpleXMLRPCRequestHandler(BaseHTTPRequestHandler):
# rpc-url
rpc_paths = ('/', '/RPC2')
def do_POST(self):
...
max_chunk_size = 10*1024*1024
size_remaining = int(self.headers["content-length"])
L = []
while size_remaining:
chunk_size = min(size_remaining, max_chunk_size)
chunk = self.rfile.read(chunk_size)
if not chunk:
break
L.append(chunk)
size_remaining -= len(L[-1])
data = b''.join(L)
...
response = self.server._marshaled_dispatch(
data, getattr(self, '_dispatch', None), self.path
)
...
self.send_response(200)
self.send_header("Content-type", "text/xml")
self.send_header("Content-length", str(len(response)))
self.end_headers()
self.wfile.write(response)
do_POST方法分三段:
-
从http请求上读取请求数据,数据长度由 content-length 决定 -
使用server的_marshaled_dispatch方法调用rpc接口 -
将接口返回值包装成http响应返回
服务端rpc协议实现
SimpleXMLRPCServer的另外一个父类SimpleXMLRPCDispatcher提供了rpc协议的实现:
class SimpleXMLRPCDispatcher:
def __init__(self, allow_none=False, encoding=None,
use_builtin_types=False):
# 接口函数字典
self.funcs = {}
# 服务实例 (app)
self.instance = None
self.allow_none = allow_none
self.encoding = encoding or 'utf-8'
self.use_builtin_types = use_builtin_types
def register_instance(self, instance, allow_dotted_names=False):
# 注册服务实例对象
self.instance = instance
self.allow_dotted_names = allow_dotted_names
def register_function(self, function=None, name=None):
# 注册接口方法
if name is None:
name = function.__name__
self.funcs[name] = function
return function
def register_multicall_functions(self):
"""Registers the XML-RPC multicall method in the system
namespace."""
# 复合调用
self.funcs.update({'system.multicall' : self.system_multicall})
instace和function的注册比较简单,我们可以跳过实现会略微复杂一点的multical,先看看注册的接口如何在_marshaled_dispatch中调用:
def _marshaled_dispatch(self, data, dispatch_method = None, path = None):
try:
# 解析rpc接口和参数
params, method = loads(data, use_builtin_types=self.use_builtin_types)
# generate response
response = self._dispatch(method, params)
# wrap response in a singleton tuple
response = (response,)
# 生成xml响应
response = dumps(response, methodresponse=1,
allow_none=self.allow_none, encoding=self.encoding)
except Fault as fault:
...
except:
...
return response.encode(self.encoding, 'xmlcharrefreplace')
def _dispatch(self, method, params):
try:
# call the matching registered function
# 查找接口
func = self.funcs[method]
except KeyError:
pass
else:
if func is not None:
# 执行接口
return func(*params)
...
if self.instance is not None:
if hasattr(self.instance, '_dispatch'):
# call the `_dispatch` method on the instance
return self.instance._dispatch(method, params)
# call the instance's method directly
try:
func = resolve_dotted_attribute(
self.instance,
method,
self.allow_dotted_names
)
except AttributeError:
pass
else:
if func is not None:
return func(*params)
...
代码比较长,主要做了2件事:
-
从请求中解析 params 和 method -
根据method从func或者instance中调用方法并返回
服务端multi-call实现
了解 single-call 后,再回头看 multi-call 的实现,就比较容易。注册接口:
def register_multicall_functions(self):
self.funcs.update({'system.multicall' : self.system_multicall})
funcs字典中会增加一个名为 system.multicall ,处理函数为system_multicall的调用:
def system_multicall(self, call_list):
"""system.multicall([{'methodName': 'add', 'params': [2, 2]}, ...]) => \
[[4], ...]
Allows the caller to package multiple XML-RPC calls into a single
request.
See http://www.xmlrpc.com/discuss/msgReader$1208
"""
results = []
# 顺序执行多个call
for call in call_list:
method_name = call['methodName']
params = call['params']
...
results.append([self._dispatch(method_name, params)])
...
return results
system_multicall和注释介绍一样,就是从请求中接受多个请求,然后逐一调用执行。system.multicall的调用数据示例:
<methodName>
system.multicall
</methodName>
<params>
...
<member>
<name>
methodName
</name>
<value>
<string>
getData
</string>
</value>
</member>
...
<params>
xmlrpc-client实现
客户端http协议实现
客户端也需要实现http协议,主要在ServerProxy和Transport中(SafeTransport实现https)。ServerProxy包装Transport对象:
class ServerProxy:
def __init__(self, uri, transport=None, encoding=None, verbose=False,
allow_none=False, use_datetime=False, use_builtin_types=False,
*, headers=(), context=None):
# get the url
type, uri = urllib.parse._splittype(uri)
self.__host, self.__handler = urllib.parse._splithost(uri)
..
handler = Transport
extra_kwargs = {}
transport = handler(use_datetime=use_datetime,
use_builtin_types=use_builtin_types,
headers=headers,
**extra_kwargs)
self.__transport = transport
...
def __request(self, methodname, params):
# call a method on the remote server
# 接口调用转为xml请求数据
request = dumps(params, methodname, encoding=self.__encoding,
allow_none=self.__allow_none).encode(self.__encoding, 'xmlcharrefreplace')
response = self.__transport.request(
self.__host,
self.__handler,
request,
verbose=self.__verbose
)
return response
Transport实现http细节:
class Transport:
"""Handles an HTTP transaction to an XML-RPC server."""
def __init__(self, use_datetime=False, use_builtin_types=False,
*, headers=()):
self._use_datetime = use_datetime
self._use_builtin_types = use_builtin_types
self._connection = (None, None)
self._headers = list(headers)
self._extra_headers = []
def request(self, host, handler, request_body, verbose=False):
http_conn = self.send_request(host, handler, request_body, verbose)
resp = http_conn.getresponse()
if resp.status == 200:
self.verbose = verbose
return self.parse_response(resp)
def send_request(self, host, handler, request_body, debug):
connection = self.make_connection(host)
headers = self._headers + self._extra_headers
...
connection.putrequest("POST", handler)
headers.append(("Content-Type", "text/xml"))
headers.append(("User-Agent", self.user_agent))
self.send_headers(connection, headers)
self.send_content(connection, request_body)
return connection
def make_connection(self, host):
if self._connection and host == self._connection[0]:
return self._connection[1]
# create a HTTP connection object from a host descriptor
chost, self._extra_headers, x509 = self.get_host_info(host)
self._connection = host, http.client.HTTPConnection(chost)
return self._connection[1]
def parse_response(self, response):
stream = response
p, u = self.getparser()
while 1:
data = stream.read(1024)
if not data:
break
if self.verbose:
print("body:", repr(data))
p.feed(data)
if stream is not response:
stream.close()
p.close()
return u.close()
-
使用http.client创建http连接 -
使用send_request发送http请求 -
使用parse_response解析http请求
客户端rpc协议实现
在http协议上使用_Method包装请求:
class ServerProxy:
def __getattr__(self, name):
# magic method dispatcher
return _Method(self.__request, name)
class _Method:
# some magic to bind an XML-RPC method to an RPC server.
# supports "nested" methods (e.g. examples.getStateName)
def __init__(self, send, name):
self.__send = send
self.__name = name
def __getattr__(self, name):
return _Method(self.__send, "%s.%s" % (self.__name, name))
def __call__(self, *args):
return self.__send(self.__name, args)
可以使用 server.currentTime.getCurrentTime()
发送请求,这是一个链式调用。server.currentTime会调用 ServerProxy.__getattr__
得到一个_Method对象;继续调用getCurrentTime会执行 _Method.__getattr__
又得到一个_Method对象,最后使用 getCurrentTime()
执行这个method对象的call方法,会使用ServerProxy的call方法将请求发送出去。
客户端multi-call实现
了解客户端 single-call 实现后,继续查看 multi-call,主要涉及下面3个类:
class _MultiCallMethod:
def __init__(self, call_list, name):
self.__call_list = call_list
self.__name = name
def __getattr__(self, name):
return _MultiCallMethod(self.__call_list, "%s.%s" % (self.__name, name))
def __call__(self, *args):
self.__call_list.append((self.__name, args)) # 添加一个call
class MultiCallIterator:
def __init__(self, results):
self.results = results
def __getitem__(self, i):
item = self.results[i]
if type(item) == type({}):
raise Fault(item['faultCode'], item['faultString'])
elif type(item) == type([]):
return item[0]
else:
raise ValueError("unexpected type in multicall result")
class MultiCall:
def __init__(self, server):
self.__server = server
self.__call_list = []
def __getattr__(self, name):
return _MultiCallMethod(self.__call_list, name)
def __call__(self):
marshalled_list = []
for name, args in self.__call_list:
marshalled_list.append({'methodName' : name, 'params' : args})
# 最后执行system.multical
return MultiCallIterator(self.__server.system.multicall(marshalled_list))
代码行数比较多,和Method一样都是使用python的魔法函数:call, getattr和getitem, 可以对比调用示例体会:
multi = MultiCall(server)
multi.getData()
multi.pow(2,9)
multi.add(1,2)
for response in multi():
print(response)
xmlrpc序列化/反序列化
rpc服务需要跨网络传输,server和client之间的数据还需要进行序列化/反序列化。主要由Marshaller和Unmarshaller两个类实现:
# client.py
class Marshaller:
...
class Unmarshaller:
...
xmlrpc支持下面9种数据类型:
-
array -
base64 -
boolean -
date/time -
double -
integer -
string -
struct -
nil
一些数据类型的,比如double和nil在python中是不存在的。这2种数据的编/解码如下:
class Marshaller:
def dump_double(self, value, write):
write("<value><double>")
write(repr(value))
write("</double></value>\n")
dispatch[float] = dump_double
def dump_nil (self, value, write):
if not self.allow_none:
raise TypeError("cannot marshal None unless allow_none is enabled")
write("<value><nil/></value>")
dispatch[type(None)] = dump_nil
class Unmarshaller:
def end_double(self, data):
self.append(float(data)) # float
self._value = 0
dispatch["double"] = end_double
dispatch["float"] = end_double
def end_nil (self, data):
self.append(None) # None
self._value = 0
dispatch["nil"] = end_nil
小结
xmlrpc不考虑tcp协议的情况下,主要是2层模型,底层是http协议,上层是xmlrpc协议。http协议负责网络传输;xmlrpc协议负责将rpc请求转换成xml数据,然后再反序列化成请求执行。
参考链接
-
https://en.wikipedia.org/wiki/XML-RPC
以上是关于10 分钟入门 python rpc 实现的主要内容,如果未能解决你的问题,请参考以下文章
快速入门:十分钟学会PythonTutorial - Learn Python in 10 minutes