python-sproto使用实践分享
Posted 游戏测试技术分享
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了python-sproto使用实践分享相关的知识,希望对你有一定的参考价值。
前言
我上篇文章有简单的介绍了sproto协议>>跳转链接[1],不过不是用sproto原生的打包解包方式,适合用sproto的格式文档的自定义协议,这篇来实践下用原生方式来进行打包解包,文章适合咋们测试非专业程序猿阅读。
介绍
Python-sproto也是上一篇解析库的作者spin6lock,这里跳转到他的GitHub[2],我这里介绍的是Python3版本的,要Python2版本切换到主干就OK,先看看目录有哪些文件。
一开始我看到这么多文件有点懵,大家别慌,其实作者都封装好了,我们要关注的文件并不多,接下来我把我的使用方法给大家来演示。
安装
Python版本
这里要用的Python版本:3.6 我之前的Python3.7并不支持,所以切换回3.6版本
build动态链接库
从GitHub下载后,目录我们有看到一个setup.py文件,我们要在去对应的目录下执行一个命令:
python3 setup.py build
执行正常的话,之后会生成一个build文件夹,里面有两个文件夹,找到pysproto.cpython-36m-darwin.so,复制或移动到setup.py同一目录下,Win系统后缀是 .dll,Linux的是 .so。
如果执行build指令过程中出现没有找到**.../include/python3.6m/Python.h文件,可以把Python3目录下的Python.h复制到.../include/python3.6m目录下,比如我Python3的目录是:**.../.pyenv/versions/3.6.9/include/python3.6m/Python.h。
测试动态库调用
作者提供了几个测试文件,我这里拿test.py来演示,test.py的代码有些多,我复制了部分出来。
import pysproto
from sys import getrefcount
from pysproto import sproto_create, sproto_type, sproto_encode, sproto_decode, sproto_pack, sproto_unpack, sproto_protocol, sproto_dump, sproto_name
from binascii import b2a_hex, a2b_hex
import unittest
class TestPySproto(unittest.TestCase):
def get_st_sp(self):
with open("person.spb", "rb") as fh:
content = fh.read()
sp = sproto_create(content)
st = sproto_type(sp, "Person")
return st, sp
def test_basic_encode_decode(self):
st, sp = self.get_st_sp()
source = {
"name": "crystal",
"id":1001,
"email":"crystal@example.com",
"phone":[
{
"type" : 1,
"number": "10086",
},
{
"type" : 2,
"number":"10010",
},
],
}
result = sproto_encode(st, source)
expected = a2b_hex("04000000d40700000000070000006372797374616c130000006372797374616c406578616d706c652e636f6d260000000f0000000200000004000500000031303038360f000000020000000600050000003130303130")
self.assertEqual(result, expected)
#print b2a_hex(result)
dest, r = sproto_decode(st, result)
self.assertEqual(source, dest)
self.assertEqual(r, len(expected))
def test_refcount(self):
st, sp = self.get_st_sp()
result = a2b_hex("04000000d40700000000070000006372797374616c130000006372797374616c406578616d706c652e636f6d260000000f0000000200000004000500000031303038360f000000020000000600050000003130303130")
decode_ret, r = sproto_decode(st, result)
self.assertEqual(getrefcount(decode_ret["name"]) - 1, 1)#extra 1 for used in temp args
self.assertEqual(getrefcount(decode_ret["phone"]) - 1, 1)#extra 1 for used in temp args
self.assertEqual(getrefcount(decode_ret["id"]) - 1, 1)#extra 1 for used in temp args
self.assertEqual(getrefcount(decode_ret) - 1, 1)#extra 1 for used in temp args
def test_sproto_pack(self):
result = a2b_hex("04000000d40700000000070000006372797374616c130000006372797374616c406578616d706c652e636f6d260000000f0000000200000004000500000031303038360f000000020000000600050000003130303130")
pack_result = sproto_pack(result)
#print len(pack_result)
#print b2a_hex(pack_result)
expected = a2b_hex("3104d407c40763723f797374616c13fe6372797374616cff00406578616d706c651f2e636f6d26110f02c5040531308f3038360f022806053e3130303130")
self.assertEqual(expected, pack_result)
if __name__ == "__main__":
unittest.main()
运行结果都断言通过,表示调用没问题。
接下来看看作者为我们封装好的接口。
实践
Example
作者他给出的例子,填入对应的参数就可以使用,这里的主要是调用SprotoRpc。
with open('your.spb', 'rb') as fh:
content = fh.read()
sproto_obj = SprotoRpc(content, content, base_package) #base_package is your package struct name
p = sproto_obj.request('client.hello', {'foo':'bar'})
sock.send(p)
SprotoRpc
上面的例子调用的方法在sproto.py文件里,代码就不全展示了,我挑客户端使用部分来说明。
导入pysproto库
import pysproto as core
这个pysproto就是之前用build命令生成的动态链接库
SprotoRpc初始化
作者把整个sproto的encode、decode、pack、unpack都封装在SprotoRpc,例子就是从这里调用的。
class SprotoRpc(object):
def __init__(self, c2s_chunk, s2c_chunk, packagename):
self._c2s = Sproto(c2s_chunk)
self._s2c = Sproto(s2c_chunk)
self._package = packagename
self._session = {}
例子:xxx.spb文件需要用sproto_dump方法把原来的xxx.sproto协议文件来转换,sproto_dump要用到lua,使用方法在下文。
with open('sproto_C2S.spb','rb') as f1:
content = f1.read()
sp = sproto_create(content)
st = sproto_type(sp, "package")
with open('sproto_S2C.spb','rb') as f2:
content1 = f2.read()
self.SprotoRpc = SprotoRpc(content, content1, st)
到这里初始化已完成
Request
参考协议
Account 1 {
request {
accountname 0 : string
password 1 : string
token 2 : string
serverid 3 : integer
}
response {
result 0 : integer
code 1 : boolean
actorid 2 : integer
actorlist 3 : *actorlist
level 4 : integer
}
}
我们要发送数据给服务端就要调用request,参数说明,参考上面的协议:
protoname: cs_card_buy
args: {id : 1001}
session: 服务端返回数据的需要填入,如果上面的协议没有定义response,则不需填入session,session一般是递增的规则。
def request(self, protoname, args = None, session = 0):
sp = self._c2s
tag, req, resp = sp.protocol(protoname)
header = sp.encode(self._package, {"type":tag, "session":session})
if session and not resp:
raise ValueError("proto no response")
if session:
self._session[session] = resp or True
content = sp.encode(req, args) if args else ""
return sp.pack(header + content)
我们转入对应的参数后就可以利用socket.send把打包好的数据发送给服务端 SprotoRpc的Response方法这里就不展开介绍,用法一致。
Dispatch
从服务端接收数据用Dispatch方法来解包,data参数传 socket.recv() 接收到数据
def dispatch(self, data):
sp = self._s2c
data = sp.unpack(data)
header,size = sp.decode(self._package, data)
content = data[size:]
if header.get("type", 0):
# request
protoname, req, resp = sp.protocol(header["type"])
result,_ = sp.decode(req, content) if req else None
ret = {"type":"REQUEST", "proto": protoname, "msg":result, "session":None}
if header.get("session", 0):
ret["session"] = header["session"]
else:
# response
session = header["session"]
if not session in self._session:
raise ValueError("unknown session", session)
response = self._session[session]
del self._session[session]
ret = {"type":"RESPONSE", "session":session, "msg":None}
if response != True:
ret["msg"], _ = sp.decode(response, content)
return ret
实例
这里以上面的checkAccount协议来进行一个打包
sproto_obj = SprotoRpc.request('checkAccount', {
'accountname':'ajian',
'password':'GameTester',
'token': 'bug',
'serverid':10001,
})
打包结果:
b'Ux02x04x02x05x10x01xc7$Nx05ajx8fian Gxffx00ameTestexe3rx03bug'
b'55020402051001c7244e05616a8f69616e0a47ff00616d655465737465e37203627567'
用socket.send把数据发送给服务端,服务端回复的数据:
buff = b'Ux02x01x02x05Ex02x04x01xc7xe2x02x081x04x07 x0b='
利用dispatch来解析:
decode_obj = sproto_obj.dispatch(buff)
print(decode_obj)
输出结果:
{'tag': 1, 'type': 'RESPONSE', 'session': 0, 'msg': {'result': 0, 'code': True, 'actorid': 262179652657, 'level': 368}}
生成spb文件
GitHub上的文件目录,下载链接:sprotodump的GitHub[3]
需要安装lua,安装方法比较简单,度娘一下就有,就不写出来了,安装完之后先检查下lua能不能正常运行,没问题之后在命令行CD到sprotodump.lua目录下再执行命令,比如:
lua sprotodump.lua -spb yourFile.proto -o yourFile.spb
使用方法
usage: lua sprotodump.lua <option> <sproto_file1 sproto_file2 ...> [[<out_option> <outfile>] ...] [namespace_option]
option:
-cs dump to cSharp code file
-spb dump to binary spb file
-go dump to go code file
-md dump to markdown file
out_option:
-d <dircetory> dump to speciffic dircetory
-o <file> dump to speciffic file
-p <package name> set package name(only cSharp code use)
namespace_option:
-namespace add namespace to type and protocol
结语
sproto协议对比之前我接触的自定义协议,在协议打包解包这块,我之前要自己去写一套打包解包的方法来处理数据,毕竟各公司和项目都会有差异,有用txt文本,也有用xml,还有粘包的,协议头部构造不同等等,花费的精力相对于sproto来说多出了不少,在接触sproto之后,协议这块的业务基本不用去管,复用性很高,一句话,真香!
引用链接
[1]
跳转链接: https://blog.csdn.net/qq_32557025/article/details/112425576[2]
这里跳转到他的GitHub: https://github.com/spin6lock/python-sproto/tree/py3_test[3]
sprotodump的GitHub: https://github.com/lvzixun/sprotodump
以上是关于python-sproto使用实践分享的主要内容,如果未能解决你的问题,请参考以下文章