python--websocket数据解析

Posted 来自东方地灵殿的小提琴手

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了python--websocket数据解析相关的知识,希望对你有一定的参考价值。

楔子

以前写过一篇关于 rpc 相关的博客,但是很浅显,所以近期准备重新翻写一遍。

什么是 rpc

rpc 指的是远程过程调用(Remote Procedure Call),简单理解就是一个节点请求另一个节点提供的服务。

假设有两台服务器 A 和 B,一个部署在 A 服务器上的应用,想要调用 B 服务器上某个应用提供的函数 / 方法。但由于不在同一个内存空间,所以不能直接调用,而是需要通过网络来表达调用的语义和传达调用的数据。

显然与 rpc 对应的则是本地过程调用,我们本地调用一个函数便是最常见的本地过程调用。

但是很明显,将本地过程调用变成远程过程调用会面临各种各样的问题。

我们以一个简单的本地过程调用(Python 函数)为例:

def add(a, b):
    return a + b 

total = add(1, 2)

我们调用了 add 函数,查看它的字节码的话,会发现分为以下几步:

  • 1. LOAD_NAME: 加载变量 add 指向的函数对象
  • 2. LOAD_CONST: 将 1 和 2 两个整数压入运行时栈
  • 3. CALL_FUNCTION: 进行函数调用,将函数返回值保存在栈顶
  • 4. STORE_NAME: 从栈顶弹出返回值,赋值给 total

当然我们这里不是为了介绍函数的执行过程,只是为了表明在本地调用一个函数是极其简单的,但如果是远程过程调用就不一样了。假设我们上面的 add 函数部署在另一个节点上,那么我们在本地要如何去调用呢?显然这么做的话,我们需要面临如下问题:

1. Call 的 id 映射

远程服务中肯定不止一个函数,那么我们要怎么告诉远程机器,我们调用的是 add 函数,而不是 sub 或者其它的函数呢?首先在本地调用中,我们直接通过函数指针即可,编译器或解释器会自动帮我们找到指针指向的函数。但是在远程调用中是不行的,因为它们不在同一个节点,自然更不在同一进程,而两个进程的地址空间是不一样的。所以在 rpc 中,每个函数必须都有一个唯一的 ID,客户端在远程过程调用时,必须要附上这个 ID。然后客户端和服务端还需要各自维护一个 "函数和 Call id 之间的映射关系",相同的函数对应的 Call id 必须一致。当客户端需要进程远程调用时,根据映射关系找到函数对应的 Call  id,传递给服务端;然后服务端再根据 Call id 找到要调用的函数,然后进行调用。


2. 序列化和反序列化

这个相信你很熟悉,在做 web 开发的时候我们经常会用到。比如 Python 编写的 web 服务返回一个字典,那么它要如何变成 Go 的 map 呢?显然是先将 Python 的字典序列化成 json,然后 Go 再将 json 反序列化成 map。而 json 便是两者之间的媒介,它是一种数据格式,也是一种协议。这在 rpc 中也是同理,因为是远程调用,那么必然要涉及的数据的传输。那么问题来了,我们调用的时候肯定是需要传递参数的,那么这些参数要怎么传递呢?而且客户端和服务端使用的语言也是可以不一样的,比如客户端使用 Python,服务端使用 C++、Java 等等,而不同语言对应的数据结构不同,例如我们不可能在 C++、Java 里面操作 Python 中的字典、类实例等等。

所以还是协议,这是显而易见、而且最直接的解决办法。我们在传递参数的时候可以将内存中的对象序列化成一个可以在网络中传输的二进制对象,这个对象不是某个语言独有的,而是大家都认识。然后传输之后,服务端再将这个对象反序列化成对应语言的数据结构,同理服务端返回内容给客户端也是相同的过程。所以我们还是想到了 http + json,因为 它们用的太广泛了,客户端发送 http 请求,通过 json 传递参数;然后服务端处理来自客户端的请求,并将传递的 json 反序列化成对应的数据结构,并执行相应的逻辑;执行完毕之后,再将返回的结果也序列化成 json 交给客户端,客户端再将其反序列化。显然这是一个非常非常非常通用的流程,而实现了 rpc 的框架(gRPC)也是同样的套路,只不过它没有采用 http + json 的方式,因为这种协议是非常松散的,至于 gRPC 到底用的是什么协议我们后面说。


3. 网络传输

因为是远程调用,那么必然涉及到网络的传输,因此就需要有一个网络传输层。网络传输层需要把 Call id 和序列化的参数字节流返回传递给服务端,服务端逻辑执行完毕之后再将结果序列化并返回给客户端。只要能完成这个过程,那么都可以作为传输层使用。因此 rpc 所使用的协议是可以有多种的,只要能完成传输即可,尽管大部分 rpc 框架使用的都是 TCP 协议,但其实 UDP 也可以,而 gRPC 则直接使用了 HTTP2。另外,Java 的 Netty 也属于这层的东西。

补充一下,为什么 gRPC 没有采用 HTTP 协议。首先在 OSI 网络模型中,TCP 是属于传输层,而 HTTP 是基于 TCP 之上的应用层,所以 HTTP 协议基于 TCP 协议。如果是通过 HTTP 连接能够发起一个请求,那么使用 TCP 连接同样可以,当然我们也可以基于 TCP 协议自己封装一个类似于 HTTP 的应用层协议。

而 HTTP 协议有一个问题,那就是连接是一次性的,服务端一旦返回结果那么连接就断开了。所以很多时候我们更愿意基于 TCP 连接自己去实现,而不是使用 HTTP,因此对于性能要求高的场景,使用 HTTP 是很麻烦的。所以 HTTP 后来发展到了 2.0 版本,它是可以保持长连接的,我们目前用的都是 1.1。而 gRPC 便是基于 HTTP 2.0 设计的,由于 HTTP 2.0 能完成长连接,那么使用它的好处会非常多,因为它是兼容 HTTP 1.1 的,不需要我们自己再基于 TCP 去封装了。

rpc、http 以及 restful 之间的区别

可能这三者之间的概念容易让人产生混淆,下面来区分一下。

rpc 和 http

首先我们说如果想实现 rpc,那么必须要先解决两个问题:

  • 1. 数据的序列化和反序列化
  • 2. 网络传输协议

而 http 协议本身属于网络传输协议的一种,想实现 rpc 也需要依赖网络传输协议。而 rpc 不仅可以使用 http 协议,也可以使用 tcp 协议。所以结论很清晰了,http 协议只是实现 rpc 框架的一种选择,你可以选择它,也可以不选择它,因此这两者不是竞争关系。

rpc 和 restful

rpc 和 restful 之间也不是互斥的,我们通常对外提供服务的时候一般都是通过 http 请求的方式。而任何一种请求都是要有一种规范的,正所谓无规矩不成方圆嘛,而是 restful 便是相应的规范。既然是规范,那么我们完全可以不遵守,所以 restful 只是一种规范而已,它和 rpc 之间实际上是没有太大关系的。

使用 Python 开发 rpc

rpc 技术在架构设计上由四部分组成,分别是:客户端、客户端存根、服务端、服务端存根。

客户端(client):服务调用发起方,也称为服务消费者。

客户端存根(client stub):该程序运行在客户端所在的计算机上,主要用来存储要调用的服务器的地址。另外该程序还负责将客户端请求远程服务器程序的数据信息打包成数据包,通过网络发送给服务端的 stub 程序;其实还要接收服务端 stub 程序发送的调用结果的数据包,并解析返回给客户端。

服务端(server):远端计算机机器上运行的程序,其中有客户端调用的方法。

服务端存根(server stub):和客户端存根作用类似,负责接收客户端 stub 程序通过网络发送的请求消息数据包,并调用服务端相应的方法,完成功能调用;然后将调用的结果打包成数据包,发送给客户端的 stub 程序、

了解完了 rpc 技术的组成结构,我们来看一下具体是如何实现客户端到服务端的调用的。实际上,如果我们想要在网络中的任意两台计算机之间实现远程过程调用,需要解决很多问题,比如:

  • 两台物理机器在网络中要建立稳定可靠的通信连接
  • 两台服务器的通信协议的定义问题,即两台服务器上的程序要如何识别对方的请求和返回结果。也就是说,两台服务器必须都能够识别对方发来的消息,并能够解析出其中的请求含义或返回含义,然后才能进行处理,这其实就是数据通信协议所需要完成的工作

我们通过流程图来说明 rpc 每一步的调用过程:

整体逻辑还是很简单的,解释一下的话就是:

  • 1. 客户端想要发起一个远程过程调用,首先调用本地客户端 stud 程序
  • 2. 客户端 stub 程序接收了客户端的功能调用请求,将客户端请求调用的方法名、携带的参数等信息做序列化操作,打包成数据包
  • 3. 客户端 stub 程序查找远程服务器程序的 IP 地址,通过网络发送给服务端的 stub 程序
  • 4. 服务端 stub 程序接收到客户端发送的数据包信息,并使用和客户端 stub 相同的协议对数据包进行反序列化,得到请求的方法名和请求参数等信息
  • 5. 服务端 stub 程序准备相关数据,调用本地 server 对应的功能方法,并传入相应的参数,进行业务处理
  • 6. 服务端程序根据已有业务逻辑执行调用过程,待业务执行结束,将执行结果返回给服务端 stub 程序
  • 7. 服务端 stub 程序将调用结果按照约定的协议进行序列化,并通过网络发送给客户端 stub 程序
  • 8. 客户端 stub 程序接收到服务端 stub 发送的返回数据,对数据进行反序列化操作,然后将最终结果再交给客户端
  • 9. 客户端请求发起者得到调用结果,整个 rpc 过程结束

rpc 中你需要知道的东西

通过上面的描述,我们已经了解 rpc 是什么以及它的整个流程,我们可以把 rpc 看成是一系列操作的集合。其中包含了很多对数据的操作,以及网络通信,但是有两个细节我们没有说。

  • 1. 客户端存根和服务端存根要怎么生成
  • 2. 序列化和反序列化使用的是什么数据协议(不是 json)

1. 动态代理技术:我们提到的 client stub  和 server stub,在具体的编码和开发实践中,都是通过动态代理技术自动生成的。

2. 序列化和反序列化:在 rpc 的调用过程中,我们可以看到数据需要在一台机器上传输到另外一台机器上。在互联网上,所有数据都是以字节的形式进行传输的,而我们在编程的过程中,往往都是使用数据对象。因此想要在网络上将数据对象和相关变量进行传输,就需要对数据对象进行序列化和反序列化操作。

  • 序列化:把对象转换成字节序列的过程称为对象的序列化,也就是编码的过程
  • 反序列化:把字节序列恢复成对象的过程称为对象的反序列化,也就是解码的过程

而序列化和反序列化也是要遵循相应的数据协议的,比如 json、xml,而 rpc 框架中使用更为广泛的是 Protobuf,这也是数据编解码的一种协议。

Protobuf(Google Protocol Buffers)是 Google 提供的一个语言无关、平台无关、可扩展的,用于序列化结构数据的工具库,它可用于(数据)通信协议、数据存储等。类似于 json,但是比 json 具有更高的转化效率,时间效率和空间效率都是 json 的 3 到 5 倍。并且具有跨语言性,支持:Python、Go、Java、C++、JavaScript 等等。

基于 xmlrpc 库实现一个 rpc

Python 实际上提供了一个内置的库叫做 xmlrpc,从名字上看也是基于 xml 实现的 rpc,也就是它的数据传输是通过 xml 实现的。

from xmlrpc.server import SimpleXMLRPCServer

# 我们看到这里只需要编写业务逻辑,至于函数映射等逻辑是存根所做的事情
# 所以这和 web 服务是不一样的,并且也没有数据的编码和解码,客户端只需要专注于业务逻辑即可
class Vtuber:

    vtubers = ["神乐七奈", "夏色祭", "凑-阿库娅"]

    def show_vtubers(self):
        return self.vtubers

    def add_vtuber(self, vtuber):
        if vtuber not in self.vtubers:
            self.vtubers.append(vtuber)

    def remove_vtuber(self, vtuber):
        if vtuber in self.vtubers:
            self.vtubers.remove(vtuber)


vtuber = Vtuber()
# 调用 SimpleXMLRPCServer,绑定ip和端口
# 如果服务端某个函数返回了 None,那么需要指定 allow_none=True,否则客户端调用时会报错
server = SimpleXMLRPCServer(("localhost", 6666), allow_none=True)
# 将实例对象注册给 rpc server
server.register_instance(vtuber)
server.serve_forever()

从代码来看,暴露出来的接口逻辑还是非常简单的,没有像 web 框架那样需要进行 url 的映射(Django 是 urlconfig、flask 是 route 等等),也不需要显示地进行数据的序列化和反序列化。那么下面来看看客户端的编写:

from xmlrpc import client

# 这里只需要指定服务端的 ip 地址即可,通过 url 的方式
server = client.ServerProxy("http://localhost:6666")

# 然后可以通过 server 直接调用里面的方法,非常的方便
print(server.show_vtubers())  # [\'神乐七奈\', \'夏色祭\', \'凑-阿库娅\']
server.add_vtuber("时雨羽衣")
print(server.show_vtubers())  # [\'神乐七奈\', \'夏色祭\', \'凑-阿库娅\', \'时雨羽衣\']
server.remove_vtuber("凑-阿库娅")
print(server.show_vtubers())  # [\'神乐七奈\', \'夏色祭\', \'时雨羽衣\']

从这里我们感觉似乎 xmlrpc 挺好用的,但如果你通过 web 框架也是同样可以做到 xmlrpc 的效果,只不过 web 框架的目的不在于此。而且 xmlrpc 还有一个局限性,就是客户端只能通过 xmlrpc 提供的客户端去访问,你像浏览器、requests 包都做不到,因为一个是 xml 协议、一个是 http 协议。而 web 框架是暴露出一个 url,可以支持不同的途径去访问,只要你能发送 http 请求即可。所以 web 框架更强调是灵活性,而 rpc 更强调的是本地调用效果,所以 rpc 更常在内部调用。

xmlrpc 库在数据的序列化和反序列化所使用的协议显然是 xml,而除了 xml 还有 json,只不过 Python 官方没有提供基于 json 进行序列化和反序列化的 rpc 库。

显然 rpc 中一个非常重要的一步就是数据的序列化和反序列化,如果你能实现一个更好的数据序列化和反序列化协议,那么你能实现一个更好的 rpc 框架。

zerorpc 实现 rpc 调用

zerorpc 是利用 zeroMQ 消息队列 + msg 消息序列化(二进制)来实现类似于 gRPC(一个 rpc 框架,我们后面重点介绍)的功能,并且还能够跨语言调用(Nodejs 和 Python)。主要使用到 zeroMQ 的通信模式 ROUTER-DEALER,并模拟 grpc 的请求响应式和应答流式 rpc,而且还支持 PUB-SUB 通信模式的远程调用。

但是注意:zerorpc 并不需要我们安装一个 zeroMQ 消息队列,它是一个 Python 第三方库,只是使用了里面的通信模式而已。zerorpc 依赖 msgpack-python pyzmq future greenlet gevent,直接 pip install zerorpc 即可,会自动解决依赖。另外我们看到它依赖 gevent,说明 zerorpc 是支持并发的。

一元调用

一元调用类似于 xmlrpc,创建一个 rpc 服务,将一个类注册到里面,然后监听端口即可。

import zerorpc

class Vtuber:

    vtubers = ["神乐七奈", "夏色祭", "凑-阿库娅"]

    def show_vtubers(self):
        return self.vtubers

    def add_vtuber(self, vtuber):
        if vtuber not in self.vtubers:
            self.vtubers.append(vtuber)

    def remove_vtuber(self, vtuber):
        if vtuber in self.vtubers:
            self.vtubers.remove(vtuber)


vtuber = Vtuber()
server = zerorpc.Server(vtuber)
server.bind("tcp://0.0.0.0:7777")
server.run()

以上是服务端的代码,可以看到不同的框架虽然调用方式不同,但整体都是大同小异的。然后我们编写客户端的代码:

import zerorpc

client = zerorpc.Client()
client.connect("tcp://127.0.0.1:7777")
print(client.show_vtubers())  # [\'神乐七奈\', \'夏色祭\', \'凑-阿库娅\']

在功能上,没有太大的区别,但是很明显速度要比 xmlrpc 快很多。因此如果我们需要在业务上实现一个简单的 rpc,那么 zerorpc 是我们的首选。

流式调用

什么是流式调用呢?举个简单的例子,我们服务里面的某个函数执行了数据查询,那么可以查询完毕之后一次性返回,也可以查询一部分就返回一部分,像水流一样源源不断。

import zerorpc

class StreamingRPC:

    # 必须使用该装饰器进行装饰,否则会有异常
    @zerorpc.stream
    def streaming_range(self, start, end, step=1):
        return range(start, end, step)


server = zerorpc.Server(StreamingRPC())
server.bind("tcp://0.0.0.0:7777")
server.run()

然后是客户端:

import zerorpc

client = zerorpc.Client()
client.connect("tcp://127.0.0.1:7777")
for item in client.streaming_range(1, 10, 2):
    print(item)
"""
1
3
5
7
9
"""

我们目前的客户端服务端通信方式都是通过 tcp 直连的方式,使用 msgpack 将消息序列化之后直接通过 tcp 将数据传送过去,而 zerorpc 是不需要依赖 zeroMQ 消息队列的。但是它也可以选择使用 zeroMQ 消息队列,客户端将消息发送到队列中,服务端从队列中取出消息,业务逻辑执行完毕之后再将结果序列化并放到队列中,客户端再去取。

因此这种方式就实现了解耦,而且也实现了异步,客户端发送完毕之后就没必要等待了。可以先去做别的事情,没事往队列里面看一看,如果有数据就取出来,没有的话就继续干其它的。此外还可以实现限流,只要给消息队列设置一个容量即可,当然也可以实现复杂均衡等等,这个不是我们的重点,我们的重点的是 gRPC。

所以 zerorpc 相对还是比较完善的,支持 Nodejs 和 Python,但为什么我们不选择它呢?首先是生态,zerorpc 的 生态不如 gRPC;以及语言的支持,gRPC 支持绝大部分的语言,而且它的功能要比 zerorpc 更强大。

使用 Go 开发 rpc

下面我们来看看如何在 Go 中进行 rpc 开发,当然有了 Python 开发 rpc 的经验,在使用 Go 的时候就简单多了。

这里我使用的是 go1.14,采用的是 go module,强烈建议你也使用这种包管理方式。因为早期的 Go 在包管理方面实际上做的不是太完善,特别是需要将整个工程目录放在 GOPATH 的 src 下真的是诟病无数。但是自从引入了 go module 就方便许多了,因此后续在贴代码的时候文件所在的层级目录就不说了。

我们以一个简单的 hello world 为例,看看怎么使用 Go 开进行 rpc 的开发。首先编写服务端:

package main

import (
    "fmt"
    "net"
    "net/rpc"
)

// 定义一个简单的结构体,里面不包含任何的成员,第一个例子越简单越好
// 我们可以把这个结构体想象成 Python 中的类
type HelloService struct {

}

// 然后绑定方法
// 接收两个参数: string *string,返回一个 error,参数和返回值类型比较固定,我们没有选择的权利
func (server *HelloService) Hello (request string, reply *string) error {
    // 函数的返回值是一个 error,但显然该服务返回给客户端的数据不可能是一个 error
    // 没错,返回值是用过 reply 指定的,它是一个指针,我们可以修改它
    *reply = fmt.Sprintf("hello %s", request)
    return nil
}

func main() {
    // 接下来要做什么了?显然是实例化一个 Server,然后将 HelloService 注册到 rpc 服务当中,然后客户端调用它绑定的方法
    // 这里叫 listener
    listener, _ := net.Listen("tcp", ":9999")

    // 注册:第一个参数是名字(很重要,介绍客户端的时候会说);第二个参数结构体实例的指针,相当于 Python 中的实例对象
    _ = rpc.RegisterName("Hello Service", &HelloService{})

    // 启动服务,等待连接的到来
    // 而一旦新的连接进来的时候,会返回一个连接,然后 rpc.ServeConn,后续的一系列操作都由该 rpc 来接管了
    conn, _ := listener.Accept()
    rpc.ServeConn(conn)
}

然后是客户端:

package main

import (
    "fmt"
    "net/rpc"
)

func main() {
    // 1. 建立连接
    client, err := rpc.Dial("tcp", "localhost:9999")
    if err != nil {
        fmt.Println("客户端创建失败,失败原因:", err)
        return
    }
    // 调用
    var reply string
    // 通过 client.Call 来实现,第一个参数就是服务端在注册结构体的时候使用的名字(不是结构体的名字),调用的时候在后面加上 .方法名即可
    // 所以我们在服务端注册的时候,起名字还是要规范一些,尽管我们这里出现了空格也没有什么问题
    // 这里我们调用了里面的 Hello 方法,第二个参数和第三个参数就是服务端的方法接收的参数
    if err := client.Call("Hello Service.Hello", "古明地觉", &reply); err != nil {
        fmt.Println("调用失败,失败原因:", err)
        return
    }
    // reply 就是返回值(对于当前逻辑而言),我们传递指针进去,执行完毕之后发现被修改了
    fmt.Println(reply)  // hello 古明地觉
}

整体还是很简单的,但是我们观察一下服务端的最后两个代码:

	// 程序一直阻塞在这里,等待连接
    conn, _ := listener.Accept()
    // 到来之后进行处理,处理完毕之后程序结束
    rpc.ServeConn(conn)

所以我们这里服务端是一次性的,如果想要一直监听的话应该这么做:

    for {
        conn, _ := listener.Accept()
        go rpc.ServeConn(conn)
    }

无限循环,每一个连接单独开启一个 goroutine 去处理。

替换 rpc 的序列化协议为 json

我们演示了基于 Go 的 rpc 实现,但是它存在一些问题,就是客户端在调用的时候必须要知道服务端在注册结构体时指定的名字,所以它相对于 Python 的开发就显得不是太好用了。

另外,Go 和 rpc 可不可以实现跨语言调用呢,它的序列化协议是什么,能否替换成常见的 json 呢。

首先 Go 采用的序列化协议是其独有的 Gob 协议,因此 Python 是无法调用的,但它是可以被替换成 json 的,下面我们就来实现一下。

package main

import (
    "fmt"
    "net"
    "net/rpc"
    "net/rpc/jsonrpc"
)

type HelloService struct {

}

func (server *HelloService) Hello (request string, reply *string) error {
    *reply = fmt.Sprintf("hello %s", request)
    return nil
}

func main() {
    listener, _ := net.Listen("tcp", ":9999")
    _ = rpc.RegisterName("Hello Service", &HelloService{})
    for {
        conn, _ := listener.Accept()
        // 其它部分不变,这里改成如下
        go rpc.ServeCodec(jsonrpc.NewServerCodec(conn))
    }
}

然后是客户端,也需要做一些略微的改动:

package main

import (
    "fmt"
    "net"
    "net/rpc"
    "net/rpc/jsonrpc"
)

func main() {
    // 这里需要使用 net.Dial 进行连接,之前使用 rpc 是因为需要采用 Gob 协议,但是现在我们不需要了
    // 而 net.Dial 的返回结果是连接,就不再是客户端了,当然我们后面要根据这个连接来创建客户端
    conn, err := net.Dial("tcp", "localhost:9999")
    if err != nil {
        fmt.Println("连接建立失败,失败原因:", err)
        return
    }
    var reply string
    // 服务端是 NewServerCodec,客户端是 NewClientCodec 来进行数据的编解码
    client := rpc.NewClientWithCodec(jsonrpc.NewClientCodec(conn))
    // 然后调用的方式不变,但是序列化之后的数据变了,因为不是同一种协议
    if err := client.Call("Hello Service.Hello", "古明地觉", &reply); err != nil {
        fmt.Println("调用失败,失败原因:", err)
        return
    }
    fmt.Println(reply)  // hello 古明地觉
}

所以此时也是可以实现的,但是问题来了, 既然 Go 的服务端在接收数据和返回数据采用的都是 json,那么就意味着任何支持 json 的语言都是可以调用的。只要我们知道数据的转换格式即可,我们客户端在调用的时候使用的是 client.Call,但是在传输的时候会进行编码,如果能知道数据在编码之后的格式,那么任何支持 json 的语言都可以调用。

而编码格式如下:

{"method": "Hello Service.Hello", "params": ["古明地觉"], "id": 0}

method 是方法;params 是参数,即便只有一个元素也要写成数组,而 id 就是 Call id 了。

既然知道了编码之后的数据结构,那么我们就可以使用 Python 进行调用了。

"""
问一句,我们可以使用 requests 进行发送请求吗?
思考一下就知道是不可以的,因为 requests 发送的请求采用的是 HTTP 协议
发送出去的都是 HTTP 协议的文本,当然本质上也是一个字符串,由 header + 请求内容 组成

但是并不代表我们就不能使用 requests 发送,首先使用 requests 是可以连接到 Go 编写的服务端的
因为 HTTP 连接也是基于 TCP 连接的,只不过 Go 的 tcp 服务只负责解析请求的内容
而 requests 发送的数据除了请求内容之外还有很多的 header
但 Go 的 tcp 服务不负责解析这些 header,它只解析请求内容,所以此时采用 requests 是不行的
并不是它不能够发送请求
"""
from pprint import pprint
import asyncio
import simplejson as json

# 所以下面我们使用 asyncio 来进行模拟,当然你也可以使用 socket
async def f(name: str):
    # 建立 tcp 连接
    reader, writer = await asyncio.open_connection(
        "localhost", 9999)  # type: asyncio.StreamReader, asyncio.StreamWriter
    # 创建请求体,并且需要编码成字节
    payload = json.dumps({"method": "Hello Service.Hello", "params": [name], "id": 0}).encode("utf-8")
    # 发送数据
    writer.write(payload)
    await writer.drain()
    # 读取数据
    data = await reader.readuntil(b"\\n")
    writer.close()
    return json.loads(data)


async def main():
    name_lst = ["古明地觉", "古明地恋", "雾雨魔理沙", "琪露诺", "芙兰朵露"]
    loop = asyncio.get_running_loop()
    task = [loop.create_task(f(name)) for name in name_lst]
    result = await asyncio.gather(*task)
    return result


res = asyncio.run(main())
pprint(res)
"""
[{\'error\': None, \'id\': 0, \'result\': \'hello 古明地觉\'},
 {\'error\': None, \'id\': 0, \'result\': \'hello 古明地恋\'},
 {\'error\': None, \'id\': 0, \'result\': \'hello 雾雨魔理沙\'},
 {\'error\': None, \'id\': 0, \'result\': \'hello 琪露诺\'},
 {\'error\': None, \'id\': 0, \'result\': \'hello 芙兰朵露\'}]
"""

我们看到此时 Python 也是可以调用的,因此我们就基于 rpc 实现了 Python 和 Go 的交互。当然不仅是 Python,你使用其它的任何语言都是可以的,只要它支持 json 协议即可。

当然我们还可以将 rpc 封装成 http 服务,只不过我们需要手动编写客户端存根和服务端存根对应的逻辑,而这些是可以自动生成的。下面我们就来介绍我们重点:gRPC,以及它所使用的数据序列化协议 protobuf。

gRPC 入门

gRPC 是一个高性能、通用的开源 rpc 框架,由 Google 为了面向移动应用开发,而基于 HTTP/2 协议、protobuf(protocol buffers)数据序列化协议所设计。gRPC 是一个通用型框架,适用于微服务开发,支持众多的开发语言,基本上主流语言都支持(都有对应的库)。

gRPC 还提供了一种简单的方法来精确地定义服务,以及为 IOS、Android 和后台支持服务自动生成可靠性很强的客户端功能库。客户端充分利用高级流和链接功能,从而有助于节省带宽、降低 TCP 连接次数、节省 CPU 使用、增加电池寿命等等。

所以 gRPC 是一个 rpc 框架,protobuf 是一个数据序列化反序列化协议,因此 protobuf 是可以独立存在的。比如我们使用 http,我们也可以不返回 json,而是返回一个 protobuf,这也是可以的。

protobuf 简介

  • 1. 习惯了 json、xml 数据存储格式的我们,很少会使用 Protocol Buffer,即便你听说过它,但也很少用它
  • 2. Protocol Buffer 是 Google 开发的一种轻量 & 高效的结构化数据存储格式,性能比 json、xml 强太多
  • 3. protobuf 经历了 protobuf2 和 protobuf3,而 pb3 相比 pb2 简化了许多、变得更加易用,目前主流的版本是 pb3

protobuf 的优缺点

Python 下的 gRPC 初体验

protobuf 还是非常重要的,我们如果想使用 gRPC 服务,就需要先编写一个 protobuf 文件,然后根据这个文件生成对应语言的客户端存根和服务端存根。存根帮我们做好了函数 ID 映射、以及数据序列化反序列化,导入它们即可使用,而我们则只需要专注于业务逻辑即可。

所以使用 gRPC 重点是编写 protobuf 文件,下面我们来感受一下,不过在这之前需要先安装。

pip install grpcio grpcio-tools protobuf -i https://pypi.tuna.tsinghua.edu.cn/simple

下面我们来编写 protobuf 文件,它有自己的语法格式,所以相比 json 它的门槛比较高。我们的文件名就叫 matsuri.proto,protobuf 文件的后缀是 .proto。

// syntax 是指定使用哪一种 protobuf 服务, 现在使用的都是 "proto3"
syntax = "proto3";

// 包名, 这个不是很重要, 你删掉也是无所谓的
package test;

// 编写服务, 每个服务里面有相应的函数(对应 restful 视图函数)
// service 表示创建服务
service Matsuri {
  //使用 rpc 定义函数, 参数名为 matsuri_request, 返回值为 matsuri_response
  rpc hello_matsuri(matsuri_request) returns (matsuri_response){}
}
// 所以我们是创建了一个名为 Matsuri 的服务, 服务里面有一个 hello_matsuri 的函数
// 函数接收一个名为 matsuri_request 的参数, 并返回一个 matsuri_response, 至于结尾的 {} 我们后面再说
// 另外参数 matsuri_request、返回值 matsuri_response 是哪里来的呢? 所以我们还要进行定义

// 注意: matsuri_request 虽然是参数, 但我个人更愿意把它称之为参数的载体
// 比如下面定义两个变量 name 和 age, 客户端会把它们放在 matsuri_request 里面, 在服务端中也会通过 matsuri_request 来获取
message matsuri_request {
  string name = 1; // = 1表示第1个参数
  int32 age = 2;
}

// matsuri_response 同理, 虽然它是返回值, 但我们返回的显然是 result, 只不过需要放在 matsuri_response 里面
// 具体内容在代码中会有体现
message matsuri_response {
  string result = 1;
}

所以有人可能已经发现了,这个 protobuf 文件就是定义一个服务的框架。然后我们就要用这个 protobuf 文件,来生成对应的 Python 服务端和客户端文件。

python -m grpc_tools.protoc --python_out=. --grpc_python_out=. -I. matsuri.proto

--python_out 和 --grpc_python_out 表示输出路径,. 表示输出到当前路径;-I 表示从哪里寻找 protobuf 文件,这里也是当前目录,matsuri.proto 表示转化的 protobuf 文件。

执行完之后我们看到多出了两个文件,这个是自动帮你生成的,matsuri_pb2.py 是给 protobuf 用的,matsuri_pb2_grpc.py 是给 gRPC 用的。而这两个文件可以用来帮助我们编写服务端和客户端,我们来简单尝试一下,具体细节后面会补充。

# 服务端
# 导入 grpc 第三方库
import grpc
# 导入自动生成的两个 py 文件, 还是那句话, matsuri_pb2 是给 protobuf 用的, matsuri_pb2_grpc 是给 grpc 用的
# 这两个文件的名字比较类似, 容易搞混
import matsuri_pb2 as pb2
import matsuri_pb2_grpc as pb2_grpc


# 我们在 protobuf 里面创建的服务叫 Matsuri, 所以 pb2_grpc 会给我们提供一个名为 MatsuriServicer 的类
# 我们直接继承它即可, 当然我们这里的类名叫什么就无所谓了
class Matsuri(pb2_grpc.MatsuriServicer):

    # 我们定义的服务里面有一个 hello_matsuri 的函数
    def hello_matsuri(self, matsuri_request, context):
        """
        matsuri_request 就是相应的参数(载体): name、age都在里面
        当然我们也可以不叫 matsuri_request, 直接叫 request 也是可以的, 它只是一个变量名
        :param request:
        :param context:
        :return:
        """
        name = matsuri_request.name
        age = matsuri_request.age

        # 里面返回是 matsuri_response, 注意: 必须是这个名字, 因为我们在 protobuf 文件中定义的就是 matsuri_response
        # 这个 matsuri_response 内部只有一个字符串类型的 result, result 需要放在 matsuri_response 里面
        return pb2.matsuri_response(result=f"name is {name}, {age} years old")


if __name__ == \'__main__\':
    # 创建一个 gRPC 服务
    # 里面传入一个线程池, 我们这里就启动 4 个线程吧
    from concurrent.futures import ThreadPoolExecutor
    grpc_server = grpc.server(ThreadPoolExecutor(max_workers=4))
    # 将我们定义的类的实例对象注册到 gRPC 服务中, 我们看到这些方法的名字都是基于我们定义 protobuf 文件
    pb2_grpc.add_MatsuriServicer_to_server(Matsuri(), grpc_server)
    # 绑定ip和端口
    grpc_server.add_insecure_port("127.0.0.1:22222")
    # 启动服务
    grpc_server.start()

    # 注意: 如果直接这么启动的话, 会发现程序启动之后就会立刻停止
    # 因为里面的线程应该是守护线程, 主线程一结束服务就没了
    # 所以我们还需要调用一个 wait_fort_termination
    grpc_server.wait_for_termination()

然后我们启动服务端,会发现可以正常启动。

注意:如果你发现报错了,出现如下异常:module \'google.protobuf.descriptor\' has no attribute \'_internal_create_key\',说明是你的 protobuf 版本过低导致的,可以通过升级 protobuf 来解决。

pip install --upgrade protobuf -i https://pypi.tuna.tsinghua.edu.cn/simple

服务端编写完毕,下面我们来编写客户端。

# 客户端
import grpc
import matsuri_pb2 as pb2
import matsuri_pb2_grpc as pb2_grpc


# 定义一个频道, 连接至服务端监听的端口
channel = grpc.insecure_channel("127.0.0.1:22222")
# 生成客户端存根 
client = pb2_grpc.MatsuriStub(channel=channel)

# 然后我们就可以直接调用 Matsuri 服务里面的函数了
print("准备使用服务了~~~~")
while True:
    name, age = input("请输入姓名和年龄, 并使用逗号分割:").split(",")
    # 调用函数, 传入参数 matsuri_request, name 和 age 位于 matsuri_request 中; 因为不能直接发送, 需要序列化成 protobuf
    # 注意: 必须是 matsuri_request, 因为我们在 protobuf 文件定义的就是 matsuri_request
    matsuri_response = client.hello_matsuri(
        pb2.matsuri_request(name=name, age=int(age))
    )
    # result 位于返回值 matsuri_response 中, 直接通过属性访问的形式获取
    # 而之所以能够这么做, 也是客户端存根在背后为我们完成的, 当然这里也可以不叫 matsuri_response, 它只是一个变量名
    print(matsuri_response.result)

下面我们来执行客户端,调用一下试试。

所以整体逻辑还是比较简单的,当然这是因为背后有很多细节都自动帮我们完成了,而核心就是那两个自动生成的文件,我们只需要关注业务逻辑即可。但是注意:自动生成的两个文件,我们不要擅自改动它,除非你对 protobuf 协议非常的了解。

然后问题来了,我们来看看采用 protobuf 协议序列化之后的结果是什么,不是说它比较高效吗?那么我们怎能不看看它序列化之后的结果呢,以及它和 json 又有什么不一样呢?

import matsuri_pb2 as pb2

request = pb2.matsuri_request(name="koishi", age=15)
# 调用 SerializeToString 方法会得到一个二进制的字符串
print(request.SerializeToString())  # b\'\\n\\x07matsuri\\x10\\x10\'

# 这个字符串显然我们看不懂,我们暂时也不去深究它的意义,总之这就是 protobuf 序列化之后的结果
# 而且我们还可以将其反序列化,不然服务端接收到之后也不认识啊
request2 = pb2.matsuri_request()
request2.ParseFromString(b\'\\n\\x06koishi\\x10\\x0f\')
print(request2.name)  # koishi
print(request2.age)  # 15
"""
是可以正常反序列化的,所以我们不认识没关系,protobuf 认识就行
那么 b\'\\n\\x06koishi\\x10\\x0f\' 到底是啥意思呢?
首先里面的 \\x06 表示后面的 6 个字符代表 name 参数的值,而之所以是 name 不是 age
是因为我们在定义 protobuf 文件的时候,name 参数的位置是第 1 个
而 \\x0f 就是 16 进制的 15
"""
# 然后来看看 json
import simplejson as json
print(json.dumps({"name": "koishi", "age": 15}).encode("utf-8"))  # b\'{"name": "koishi", "age": 15}\'

# 可以看到 protobuf 协议序列化之后的结果要比 json 短, 平均能得到一倍的压缩

以上便是 Python 实现简单的 gRPC 服务,可以看到至少在业务层面还是比较简单的。

grpc 文件的 import 问题

我们看一下自动生成的 grpc 文件,里面存在一个问题。

import matsuri_pb2 as matsuri__pb2

在 matsuri_pb2_grpc.py 中导入了 matsuri_pb2.py,因为 grpc 需要使用 protobuf,但是这样导包是不是存在一些问题呢?首先这两个文件必须要在同一个目录,这是毋庸置疑的,否则会导入失败。但是这行代码还隐含了我们的客户端和服务端代码也要和这两个文件在同一个目录,但如果不是这样会有什么后果呢?我们画一张图:

假设我们的工程目录叫 dir1,我们的客户端、服务端代码位于该目录中,但是自动生成的两个 py 文件我们将其放到一个单独的目录 dir2 中。这个时候如果客户端或服务端导入 matsuri_pb2_grpc.py 的时候会发生什么问题?显然会报错,因为客户端或服务端在导入的时候,工作区是 dir1 目录,然后 matsuri_pb2_grpc 中再 import matsuri_pb2 显然就找不到了,因为它位于 dir2 目录中。

所以如果我们要将其放在一个单独的目录中(假设叫 grpc_helper),那么我们应该将 matsuri_pb2_grpc 中的导入逻辑改成这样子:

from . import matsuri_pb2 as matsuri__pb2

然后客户端和服务端使用下面的方式导入:

from grpc_helper import matsuri_pb2 as pb2
from grpc_helper import matsuri_pb2_grpc as pb2_grpc

这样做就没有问题了,所以要注意相应的导包逻辑。但是这并不能算是一个 bug,因为 Python 的导包逻辑就是如此,而在生成文件的时候它也显然不可能猜测出你要把文件放在哪个目录中,所以这种情况我们需要手动设置。

go 下的 gRPC 初体验

看完了 Python 的 gRPC,我们再来看看 Go 的 gRPC,既然了解了 Python 的 gRPC,那么再看 Go 的 gRPC 会简单很多。

首先要安装 protoc,它是 protobuf 的编译工具,它适用于所有的语言,我们去 https://github.com/protocolbuffers/protobuf/releases 下载对应操作系统的 protoc 即可,这里我下载的是 protoc-3.14.0-win64.zip。解压之后,将里面的 bin 目录添加到环境变量中。

我们生成 Python 对应的文件时,是通过 grpc_tools 这个模块来帮我们生成的,它里面实现了 protoc 的功能。但是对于 Go 而言,我们需要使用 protoc 这个编译工具,当然这个工具也适用于 Python。

然后安装 Go 的第三方包:

go env -w GO111MODULE=on
go env -w GOPROXY=https://goproxy.cn,direct
go get -u github.com/golang/protobuf/protoc-gen-go
go get -u google.golang.org/grpc

安装之后,我们来编写 protobuf 文件,我们说 protobuf 是支持多语言的,所以只需要做一些简单的修改即可。

syntax = "proto3";
// 这里需要指定我们生成的是 go 的 package,其它语言也是同理
// 然后里面的 ".;yoyoyo" 是什么鬼? 首先 go 的源文件一定要有一个 package,而 .; 后面的 yoyoyo 便是指定对应的包名
option go_package = ".;yoyoyo";

// 注意:由于 Go 的可导出性,我们需要将名称改成大写,而且最好遵循驼峰命名法
service Matsuri {
  rpc HelloMatsuri(MatsuriRequest) returns (MatsuriResponse){}
}

message MatsuriRequest {
  string name = 1; // = 1表示第1个参数
  int32 age = 2;
}

message MatsuriResponse {
  string result = 1;
}

然后生成 Go 的源文件,命令如下:

protoc --go_out=plugins=grpc:. -I . matsuri.proto

相信这些命令不需要解释了,都是写死的,执行完之后会发现自动帮你生成了一个 matsuri.pb.go 源文件。注意:只有一个文件,不光是 Go,其它语言都只生成一个文件,只有 Python 是两个文件。

所以在 Go 里面不存在类似于 Python 中的导包问题,因为它只有一个文件。

由于我们生成的 Go 文件中的 package 是 yoyoyo,那么我们最好新建一个目录 yoyoyo,然后将其放在里面。然后我们来编写服务端和客户端代码,首先是服务端。

package main

import (
    "context"
    "fmt"
    "google.golang.org/grpc"
    "matsuri/yoyoyo"
    "net"
)

type Matsuri struct {

}
// 接收两个参数,和 Python 类似,Python 的 hello_matsuri 函数的第一个参数是 matsuri_request,第二个参数是 context
// matsuri_request 就是实际数据的载体,它是 matsuri_pb2.matsuri_request(),而 context 我们没有用上,后面会介绍
// 在 Go 中也是如此,只不过这两个参数是相反的。
// 第一个参数是 context(这里叫 ctx),它是 context.Context 类型;第二个参数是 matsuri_request,类型是 *yoyoyo.MatsuriRequest(通过包名去调用)
// 然后返回值为 *yoyoyo.MatsuriResponse 和 error
func (m *Matsuri) HelloMatsuri (ctx context.Context, matsuri_request *yoyoyo.MatsuriRequest) (*yoyoyo.MatsuriResponse, error) {
    // 我们看到在 proto 文件中是小写的 name 和 age,但是在生成文件的时候自动帮我们变成了 Name 和 Age,所以 protoc 在生成文件的时候会自动帮我们处理不同语言的逻辑
    name := matsuri_request.Name
    age := matsuri_request.Age
    return &yoyoyo.MatsuriResponse{Result: fmt.Sprintf("name: %s, age %d", name, age)}, nil
}

func main() {
    // 创建服务端
    server := grpc.NewServer()
    // 注册, 第一个参数是服务端, 第二个参数是必须实现 HelloMatsuri(context.Context, *MatsuriRequest) (*MatsuriResponse, error) 方法的接口
    yoyoyo.RegisterMatsuriServer(server, &Matsuri{})
    // 监听端口
    listener, err := net.Listen("tcp", ":33333")
    if err != nil {
        fmt.Println(err)
        return
    }
    // 开启服务,每来一个连接,内部会开一个协程去处理
    if err = server.Serve(listener); err != nil {
        fmt.Println(err)
        return
    }
}

服务端编写完毕之后直接启动,发现没有问题,接下来我们编写客户端。

package main

import (
    "context"
    "fmt"
    "google.golang.org/grpc"
    "matsuri/yoyoyo"
)

func main() {
    conn, err := grpc.Dial("127.0.0.1:33333", grpc.WithInsecure())
    if err != nil {
        fmt.Println(err)
        return
    }
    defer func() { _ = conn.Close() }()
    client := yoyoyo.NewMatsuriClient(conn)
    response, _ := client.HelloMatsuri(
        context.Background(),
        &yoyoyo.MatsuriRequest{Name: "夏色祭", Age: 16},
    )
    fmt.Println(response.Result)  // name: 夏色祭, age 16
}

我们看到整体没有任何问题,还是很简单的。

Python 和 Go 相互调用

下面我们来看看如何实现 Python 和 Go 之间的互相调用,Python 编写服务端,Go 去访问;Go 编写服务端,Python 去访问。

注意:如果实现互相调用,那么它们 proto 文件中的类、方法等信息要完全一致。

下面来编写 proto 文件,为了方便这里的 proto 文件名我们就不改了,还叫原来的 matsuri.proto:

syntax = "proto3";

// 如果是 Go 的话,那么只需要加上 option go_package = ".;包名";
option go_package = ".;包名";
service Mea {
  rpc HelloMea(request) returns (response){}
}
message request {
  string name = 1;
  int32 age = 2;
}

message response {
  string result = 1;
}

编写 Go 的服务端:

package main

import (
    "context"
    "fmt"
    "google.golang.org/grpc"
    "matsuri/yoyoyo"
    "net"
)

type Mea struct {

}
func (m *Mea) HelloMea (ctx context.Context, request *yoyoyo.Request) (*yoyoyo.Response, error) {
    // 参数 request 是小写, 这里自动帮我们变成了大写,同理还有下面的 response
    name := request.Name
    age := request.Age
    return &yoyoyo.Response{Result: fmt.Sprintf("你好: %s, %d 岁的单亲妈妈", name, age)}, nil
}

func main() {
    // 创建服务端
    server := grpc.NewServer()
    yoyoyo.RegisterMeaServer(server, &Mea{})
    listener, err := net.Listen("tcp", ":33333")
    if err != nil {
        fmt.Println(err)
        return
    }
    if err = server.Serve(listener); err != nil {
        fmt.Println(err)
        return
    }
}

编写 Python 的服务端:

import grpc
import matsuri_pb2_grpc as pb2_grpc
import matsuri_pb2 as pb2


class Mea(pb2_grpc.MeaServicer):

    def HelloMea(self, request, context):
        name = request.name
        age = request.age

        return pb2.response(result=f"name is {name}, {age} years old")


if __name__ == \'__main__\':
    from concurrent.futures import ThreadPoolExecutor
    grpc_server = grpc.server(ThreadPoolExecutor(max_workers=4))
    pb2_grpc.add_MeaServicer_to_server(Mea(), grpc_server)
    grpc_server.add_insecure_port("127.0.0.1:22222")
    grpc_server.start()
    grpc_server.wait_for_termination()

两个语言的服务端,我们就编写完毕了,Python 服务端监听 22222 端口,Go 服务端监听 33333 端口,那么下面来编写客户端。Python 客户端调用 Go 服务端,Go 客户端调用 Python 服务端。

package main

import (
    "context"
    "fmt"
    "google.golang.org/grpc"
    "matsuri/yoyoyo"
)

func main() {
    // 连接 22222 端口,这是 Python 服务
    conn, err := grpc.Dial("127.0.0.1:22222", grpc.WithInsecure())
    if err != nil {
        fmt.Println(err)
        return
    }
    defer func() { _ = conn.Close() }()
    client := yoyoyo.NewMeaClient(conn)
    response, _ := client.HelloMea(
        context.Background(),
        &yoyoyo.Request{Name: "神乐 mea", Age: 38},
    )
    fmt.Println(response.Result)  // name is 神乐 mea, 38 years old
}

我们看到 Go 调用 Python 的服务是完全没有问题的,那么 Python 调用 Go 呢?

import grpc
import matsuri_pb2_grpc as pb2_grpc
import matsuri_pb2 as pb2
channel = grpc.insecure_channel("127.0.0.1:33333")
client = pb2_grpc.MeaStub(channel=channel)
response = client.HelloMea(
    pb2.request(name="神乐 mea", age=38)
)
print(response.result)  # 你好: 神乐 mea, 38 岁的单亲妈妈

此时我们就通过 gRPC 和 protobuf 完成了 Python 和 Go 之间的 rpc 调用。

gRPC 的流模式

上面我们介绍了 gRPC 的使用方式,下面介绍 gRPC 中的 stream,也就是流模式。流模式可以源源不断地推送数据,因此很适合传输一些大数据,或者服务端和客户端之间进行长时间的数据交互。比如客户端可以向服务端订阅一个数据,服务端就可以利用 stream 源源不断地向客户端推送数据。

而流模式总共有以下几种:

  • 服务端数据流模式(Server-side streaming RPC)
  • 客户端数据流模式(Client-side streaming RPC)
  • 双向数据流模式(Bidirectional-side streaming RPC)

而我们上面一直演示的例子,使用的都是简单模式(Simple RPC),下面来介绍剩余的三种流模式。


服务端数据流:

这种模式是客户端发起一次请求,服务端返回一段连续的数据流。典型的例子是客户端向服务端发送一个股票代码,服务端就把该股票的实时数据源源不断地返回给客户端。


客户端数据流:

与服务端数据流模式相反,这次是客户端不断地向服务端发送数据,而在发送结束后,由服务端返回一个响应,典型的例子是物联网终端向服务器发送数据。比如大棚里面的温度传感器,显然要把里面的温度实时上报给服务器。


双向数据流:

顾名思义,这是客户端和服务端都可以向对方发送数据流,这个时候双方的数据可以同时互相发送,也就是实现实时交互,典型的例子是聊天机器人。

然后我们来编写 proto 文件,实现一下上面的几种流模式。

syntax = "proto3";

option go_package = ".;yoyoyo";

message StreamRequestData {
  string data = 1;
}

message StreamResponseData {
  string data = 1;
}

service StreamTest {
  // 服务端流模式,在返回值前面加上一个 stream
  rpc GetStream(StreamRequestData) returns (stream StreamResponseData){}
  // 客户端流模式,在参数前面加上一个 stream
  rpc PutStream(stream StreamRequestData) returns (StreamResponseData){}
  // 双向流模式
  rpc AllStream(stream StreamRequestData) returns (stream StreamResponseData){}
}

// 所以我们看到一个服务里面的方法可以有很多个,并且这里面的参数和返回值都是 StreamRequestData 和 StreamResponseData
// 但是不同的方法,我们也可以指定不同的参数

下面我们就来生成对应的 Go 源文件,然后编写对应的服务端,我们以 Go 来演示,Python 也是类似的。

package main

import (
    "fmt"
    "google.golang.org/grpc"
    "matsuri/yoyoyo"
    "net"
)

// 还是定义一个结构体,然后为结构体绑定方法
type Server struct {
}

// 但是这是流模式,绑定的方法里面的参数和返回值还和之间一样吗?我们看一下自动生成的文件吧
/*
type StreamTestServer interface {
	// 服务端流模式,在返回值前面加上一个 stream
	GetStream(*StreamRequestData, StreamTest_GetStreamServer) error
	// 客户端流模式,在参数前面加上一个 stream
	PutStream(StreamTest_PutStreamServer) error
	// 双向流模式
	AllStream(StreamTest_AllStreamServer) error
}
*/
// 我们后面在使用 RegisterStreamTestServer 进行注册的时候,第二个参数接收的实际上是一个接口 StreamTestServer
// 所以如果你想注册成服务的话,那么就必须实现上面三个方法。而且我们看到,在自动生成代码的是帮我们把注释也加上去了
func (s *Server) GetStream(request *yoyoyo.StreamRequestData, res yoyoyo.StreamTest_GetStreamServer) error {
    return nil
}

func (s *Server) PutStream(res yoyoyo.StreamTest_PutStreamServer) error {
    return nil
}

func (s *Server) AllStream(res yoyoyo.StreamTest_AllStreamServer) error {
    return nil
}

func main() {
    // 直接进行 grpc 服务端创建、注册等逻辑没有变化
    server := grpc.NewServer()
    yoyoyo.RegisterStreamTestServer(server, &Server{})
    listener, err := net.Listen("tcp", ":33333")
    if err != nil {
        fmt.Println(err)
        return
    }
    if err = server.Serve(listener); err != nil {
        fmt.Println(err)
        return
    }
}

整体逻辑是没有问题的,但是现在还不能执行,因为方法里面只返回了一个 nil。这里我们看到流模式对应函数的参数的和返回值,与之前的简单模式是不一样的。因为这是肯定的,流模式要求源源不断地返回,所以肯定不能通过 return 语句实现,因此流模式的返回值只有一个 error。了解了这些之后,我们再来编写里面的方法。

func (s *Server) GetStream(request *yoyoyo.StreamRequestData, res yoyoyo.StreamTest_GetStreamServer) error {
    // 那么服务端流模式要如何返回数据呢?答案是通过 res.Send 方法即可
    data := request.Data
    i := 1
    for i < 6{
        // 但 Send 的里面的内容还是 StreamResponseData,因为要被序列化嘛
        _ = res.Send(&yoyoyo.StreamResponseData{Data: fmt.Sprintf("%s%d", data, i)})
        i ++
        time.Sleep(time.Second)
    }
    return nil
}

func (s *Server) PutStream(res yoyoyo.StreamTest_PutStreamServer) error {
    return nil
}

func (s *Server) AllStream(res yoyoyo.StreamTest_AllStreamServer) error {
    return nil
}

这里我们先编写 GetStream,然后编写客户端去访问。

package main

import (
    "context"
    "fmt"
    "google.golang.org/grpc"
    "matsuri/yoyoyo"
)

func main() {
    conn, err := grpc.Dial("127.0.0.1:33333", grpc.WithInsecure())
    if err != nil {
        fmt.Println(err)
        return
    }
    defer func() { _ = conn.Close() }()
    client := yoyoyo.NewStreamTestClient(conn)
    // 注意:客户端调用依旧是之前的模式,因为它是基于 proto 文件来的
    response, _ := client.GetStream(
        context.Background(),
        &yoyoyo.StreamRequestData{Data: "神乐"},
    )
    // 然后我们可以进行测试了
    for {
        // 当服务端返回之后,那么 err 会得到一个 EOF
        data, err := response.Recv()
        if err != nil {
            fmt.Println(err)
            break
        }
        fmt.Println(data)
    }
    /*
    data:"神乐1"
    data:"神乐2"
    data:"神乐3"
    data:"神乐4"
    data:"神乐5"
    EOF
    */
}

打印的是一个结构体,我们可以调用里面的 Data 成员,当然这不是重点,重点是数据是实时返回的。


然后是 PutStream,客户端不断向服务端发送数据,这两个过程是相同、但又相反的。

func (s *Server) PutStream(res yoyoyo.StreamTest_PutStreamServer) error {
    for {
        data, err := res.Recv()
        if err != nil {
            break
        }
        fmt.

以上是关于python--websocket数据解析的主要内容,如果未能解决你的问题,请参考以下文章

Relay.js 没有正确解析组合片段

片段(Java) | 机试题+算法思路+考点+代码解析 2023

无法解析片段中的 findViewById [重复]

无法解析片段中的 ViewModelProvider 构造?

Python websocket 客户端 - 从 python 代码向 WebSocketServer 发送消息

RecyclerView未显示已解析的GSON数据