python-sproto使用实践分享

Posted 游戏测试技术分享

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了python-sproto使用实践分享相关的知识,希望对你有一定的参考价值。

前言

我上篇文章有简单的介绍了sproto协议>>跳转链接[1],不过不是用sproto原生的打包解包方式,适合用sproto的格式文档的自定义协议,这篇来实践下用原生方式来进行打包解包,文章适合咋们测试非专业程序猿阅读。

介绍

Python-sproto也是上一篇解析库的作者spin6lock这里跳转到他的GitHub[2],我这里介绍的是Python3版本的,要Python2版本切换到主干就OK,先看看目录有哪些文件。

开源目录

一开始我看到这么多文件有点懵,大家别慌,其实作者都封装好了,我们要关注的文件并不多,接下来我把我的使用方法给大家来演示。

安装

Python版本

这里要用的Python版本:3.6 我之前的Python3.7并不支持,所以切换回3.6版本

build动态链接库

从GitHub下载后,目录我们有看到一个setup.py文件,我们要在去对应的目录下执行一个命令:

python3 setup.py build

执行正常的话,之后会生成一个build文件夹,里面有两个文件夹,找到pysproto.cpython-36m-darwin.so,复制或移动到setup.py同一目录下,Win系统后缀是 .dll,Linux的是 .so。

python-sproto使用实践分享
build文件

如果执行build指令过程中出现没有找到**.../include/python3.6m/Python.h文件,可以把Python3目录下的Python.h复制到.../include/python3.6m目录下,比如我Python3的目录是:**.../.pyenv/versions/3.6.9/include/python3.6m/Python.h。

测试动态库调用

作者提供了几个测试文件,我这里拿test.py来演示,test.py的代码有些多,我复制了部分出来。

import pysprotofrom sys import getrefcountfrom pysproto import sproto_create, sproto_type, sproto_encode, sproto_decode, sproto_pack, sproto_unpack, sproto_protocol, sproto_dump, sproto_namefrom binascii import b2a_hex, a2b_heximport unittestclass TestPySproto(unittest.TestCase): def get_st_sp(self): with open("person.spb", "rb") as fh: content = fh.read() sp = sproto_create(content) st = sproto_type(sp, "Person") return st, sp def test_basic_encode_decode(self): st, sp = self.get_st_sp() source = { "name": "crystal", "id":1001, "email":"crystal@example.com", "phone":[ { "type" : 1, "number": "10086", }, { "type" : 2, "number":"10010", }, ], } result = sproto_encode(st, source) expected = a2b_hex("04000000d40700000000070000006372797374616c130000006372797374616c406578616d706c652e636f6d260000000f0000000200000004000500000031303038360f000000020000000600050000003130303130") self.assertEqual(result, expected) #print b2a_hex(result) dest, r = sproto_decode(st, result) self.assertEqual(source, dest) self.assertEqual(r, len(expected)) def test_refcount(self): st, sp = self.get_st_sp() result = a2b_hex("04000000d40700000000070000006372797374616c130000006372797374616c406578616d706c652e636f6d260000000f0000000200000004000500000031303038360f000000020000000600050000003130303130") decode_ret, r = sproto_decode(st, result) self.assertEqual(getrefcount(decode_ret["name"]) - 1, 1)#extra 1 for used in temp args self.assertEqual(getrefcount(decode_ret["phone"]) - 1, 1)#extra 1 for used in temp args self.assertEqual(getrefcount(decode_ret["id"]) - 1, 1)#extra 1 for used in temp args self.assertEqual(getrefcount(decode_ret) - 1, 1)#extra 1 for used in temp args def test_sproto_pack(self): result = a2b_hex("04000000d40700000000070000006372797374616c130000006372797374616c406578616d706c652e636f6d260000000f0000000200000004000500000031303038360f000000020000000600050000003130303130") pack_result = sproto_pack(result) #print len(pack_result) #print b2a_hex(pack_result) expected = a2b_hex("3104d407c40763723f797374616c13fe6372797374616cff00406578616d706c651f2e636f6d26110f02c5040531308f3038360f022806053e3130303130") self.assertEqual(expected, pack_result)if __name__ == "__main__": unittest.main()

运行结果都断言通过,表示调用没问题。

python-sproto使用实践分享
测试结果

接下来看看作者为我们封装好的接口。

实践

Example

作者他给出的例子,填入对应的参数就可以使用,这里的主要是调用SprotoRpc。

 with open('your.spb', 'rb') as fh: content = fh.read() sproto_obj = SprotoRpc(content, content, base_package) #base_package is your package struct name p = sproto_obj.request('client.hello', {'foo':'bar'}) sock.send(p)

SprotoRpc

上面的例子调用的方法在sproto.py文件里,代码就不全展示了,我挑客户端使用部分来说明。

导入pysproto库

import pysproto as core

这个pysproto就是之前用build命令生成的动态链接库

SprotoRpc初始化

作者把整个sproto的encode、decode、pack、unpack都封装在SprotoRpc,例子就是从这里调用的。

class SprotoRpc(object): def __init__(self, c2s_chunk, s2c_chunk, packagename): self._c2s = Sproto(c2s_chunk) self._s2c = Sproto(s2c_chunk) self._package = packagename self._session = {}

例子:xxx.spb文件需要用sproto_dump方法把原来的xxx.sproto协议文件来转换,sproto_dump要用到lua,使用方法在下文。

 with open('sproto_C2S.spb','rb') as f1: content = f1.read() sp = sproto_create(content) st = sproto_type(sp, "package") with open('sproto_S2C.spb','rb') as f2: content1 = f2.read() self.SprotoRpc = SprotoRpc(content, content1, st)

到这里初始化已完成

Request

参考协议

Account 1 { request { accountname 0 : string password 1 : string token 2 : string serverid 3 : integer } response { result 0 : integer code 1 : boolean actorid 2 : integer actorlist 3 : *actorlist level 4 : integer }}

我们要发送数据给服务端就要调用request,参数说明,参考上面的协议:

protoname: cs_card_buy

args: {id : 1001}

session: 服务端返回数据的需要填入,如果上面的协议没有定义response,则不需填入session,session一般是递增的规则。

 def request(self, protoname, args = None, session = 0): sp = self._c2s tag, req, resp = sp.protocol(protoname) header = sp.encode(self._package, {"type":tag, "session":session}) if session and not resp: raise ValueError("proto no response") if session: self._session[session] = resp or True content = sp.encode(req, args) if args else "" return sp.pack(header + content)

我们转入对应的参数后就可以利用socket.send把打包好的数据发送给服务端 SprotoRpc的Response方法这里就不展开介绍,用法一致。

Dispatch

从服务端接收数据用Dispatch方法来解包,data参数传 socket.recv() 接收到数据

 def dispatch(self, data): sp = self._s2c data = sp.unpack(data) header,size = sp.decode(self._package, data) content = data[size:] if header.get("type", 0): # request protoname, req, resp = sp.protocol(header["type"]) result,_ = sp.decode(req, content) if req else None ret = {"type":"REQUEST", "proto": protoname, "msg":result, "session":None} if header.get("session", 0): ret["session"] = header["session"] else: # response session = header["session"] if not session in self._session: raise ValueError("unknown session", session) response = self._session[session] del self._session[session] ret = {"type":"RESPONSE", "session":session, "msg":None} if response != True: ret["msg"], _ = sp.decode(response, content) return ret

实例

这里以上面的checkAccount协议来进行一个打包

sproto_obj = SprotoRpc.request('checkAccount', { 'accountname':'ajian', 'password':'GameTester', 'token': 'bug', 'serverid':10001, })

打包结果:

b'Ux02x04x02x05x10x01xc7$Nx05ajx8fian
Gxffx00ameTestexe3rx03bug'b'55020402051001c7244e05616a8f69616e0a47ff00616d655465737465e37203627567'

用socket.send把数据发送给服务端,服务端回复的数据:

buff = b'Ux02x01x02x05Ex02x04x01xc7xe2x02x081x04x07 x0b='

利用dispatch来解析:

decode_obj = sproto_obj.dispatch(buff)print(decode_obj)

输出结果:

{'tag': 1, 'type': 'RESPONSE', 'session': 0, 'msg': {'result': 0, 'code': True, 'actorid': 262179652657, 'level': 368}}

生成spb文件

GitHub上的文件目录,下载链接:sprotodump的GitHub[3]

sprotodump

需要安装lua,安装方法比较简单,度娘一下就有,就不写出来了,安装完之后先检查下lua能不能正常运行,没问题之后在命令行CD到sprotodump.lua目录下再执行命令,比如:

lua sprotodump.lua -spb yourFile.proto -o yourFile.spb

使用方法

usage: lua sprotodump.lua <option> <sproto_file1 sproto_file2 ...> [[<out_option> <outfile>] ...] [namespace_option] option:  -cs dump to cSharp code file -spb dump to binary spb file -go dump to go code file -md dump to markdown file out_option: -d <dircetory> dump to speciffic dircetory -o <file> dump to speciffic file -p <package name> set package name(only cSharp code use) namespace_option: -namespace add namespace to type and protocol

结语

sproto协议对比之前我接触的自定义协议,在协议打包解包这块,我之前要自己去写一套打包解包的方法来处理数据,毕竟各公司和项目都会有差异,有用txt文本,也有用xml,还有粘包的,协议头部构造不同等等,花费的精力相对于sproto来说多出了不少,在接触sproto之后,协议这块的业务基本不用去管,复用性很高,一句话,真香!

微信二维码

引用链接

[1] 跳转链接: https://blog.csdn.net/qq_32557025/article/details/112425576
[2] 这里跳转到他的GitHub: https://github.com/spin6lock/python-sproto/tree/py3_test
[3] sprotodump的GitHub: https://github.com/lvzixun/sprotodump


以上是关于python-sproto使用实践分享的主要内容,如果未能解决你的问题,请参考以下文章

Java技术jQuery自定义插件开发实践

分享前端开发常用代码片段

收藏|分享前端开发常用代码片段

关于js----------------分享前端开发常用代码片段

微信小程序代码片段分享

分享几个实用的代码片段(附代码例子)