第六节——实现服务器流式 gRPC
Posted 想学习安全的小白
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了第六节——实现服务器流式 gRPC相关的知识,希望对你有一定的参考价值。
第六章——实现服务器流式 gRPC
- 服务器流式即客户端发送一个请求,服务器返回n个请求,客户端解析这n个请求
6.1、将服务器流式 RPC 定义添加到 Protobuf
- 创建文件——
proto/filter_message.proto
syntax = "proto3";
option go_package="../pb;pb";
import "memory_message.proto";
message Filter
double max_price_usd = 1;
uint32 min_cpu_cores = 2;
double min_cpu_ghz = 3;
Memory min_ram = 4;
- 字段解释
- 愿意为笔记本电脑支付的最高价格
- 笔记本电脑 CPU 应具有的最小内核数
- CPU 的最低频率
- RAM 的最小大小
6.2、定义新的服务器流式 RPC
- 在laptop_service.proto文件中定义两个消息
SearchLaptopRequest
仅包含 1 个Filter
字段SearchLaptopResponse
仅包含 1 个Laptop
字段
message SearchLaptopRequest
Filter filter = 1;
message SearchLaptopResponse
Laptop laptop = 1;
- 定义新的grpc服务——SearchLaptop,输入为
SearchLaptopRequest
,输出为SearchLaptopResponse
service LaptopService
rpc CreateLaptop(CreateLaptopRequest) returns (CreateLaptopResponse) ;
rpc SearchLaptop(SearchLaptopRequest) returns (stream SearchLaptopResponse) ;
- 使用makefile生成go代码
6.3、添加搜索功能
- 在文件laptop_store.go中的LaptopStore接口添加一个
Search()
功能- 需要一个过滤器作为输入,以及一个回调函数,以便在找到笔记本电脑时进行报告
- 上下文用于控制请求的期限/超时
type LaptopStore interface
Save(laptop *pb.Laptop) error
Search(ctx context.Context, filter *pb.Filter, found func(laptop *pb.Laptop) error) error
- 为InMemoryLaptopStore结构体实现Search函数
- 由于我们正在读取数据,我们必须获得一个读取锁,然后再解锁它
- 我们遍历商店中的所有笔记本电脑,并检查哪一台符合过滤条件。
- 在检查笔记本电脑是否合格之前,我们会检查上下文错误是否
Cancelled
存在DeadlineExceeded
。如果是,我们应该立即返回,因为请求要么已经超时,要么被客户端取消,所以继续搜索只是浪费时间。 isQualified()
函数将过滤器和笔记本电脑作为输入,如果笔记本电脑满足过滤器,则返回 true。- 当笔记本电脑合格后,我们必须在通过回调函数将其发送给调用者之前对其进行深度复制
func (store *InMemoryLaptopStore) Search(ctx context.Context, filter *pb.Filter, found func(laptop *pb.Laptop) error) error
store.mutex.Lock()
defer store.mutex.Unlock()
for _, laptop := range store.data
if ctx.Err() == context.Canceled || ctx.Err() == context.DeadlineExceeded
log.Print("context is cancelled")
return nil
if isQualified(filter, laptop)
other, err := deepCopy(laptop)
if err != nil
return err
err = found(other)
if err != nil
return err
return nil
func isQualified(filter *pb.Filter, laptop *pb.Laptop) bool
if laptop.GetPriceUsd() > filter.GetMaxPriceUsd()
return false
if laptop.GetCpu().GetNumberCores() < filter.GetMinCpuCores()
return false
if laptop.GetCpu().GetMinGhz() < filter.GetMinCpuGhz()
return false
if toBit(laptop.GetRam()) < toBit(filter.GetMinRam())
return false
return true
- 由于存在不同类型的内存单元,为了比较 RAM,我们必须编写一个函数将其值转换为最小单位:BIT
func toBit(memory *pb.Memory) uint64
value := memory.GetValue()
switch memory.GetUnit()
case pb.Memory_BIT:
return value
case pb.Memory_BYTE:
return value << 3 // 8 = 2^3
case pb.Memory_KILOBYTE:
return value << 13 // 1024 * 8 = 2^10 * 2^3 = 2^13
case pb.Memory_MEGABYTE:
return value << 23
case pb.Memory_GIGABYTE:
return value << 33
case pb.Memory_TERABYTE:
return value << 43
default:
return 0
6.4、实现服务端
- 在laptop_server.go文件中实现之前proto定义的SearchLaptop服务
- 从请求中获取过滤器。然后我们调用
server.Store.Search()
func (server *LaptopServer) SearchLaptop(req *pb.SearchLaptopRequest, stream pb.LaptopService_SearchLaptopServer) error
filter := req.GetFilter()
log.Printf("receive a search-laptop request with filter: %v", filter)
err := server.laptopStore.Search(stream.Context(), filter,
func(laptop *pb.Laptop) error
res := &pb.SearchLaptopResponseLaptop: laptop
err := stream.Send(res)
if err != nil
return err
log.Printf("sent laptop with id: %s", laptop.GetId())
return nil
)
if err != nil
return status.Errorf(codes.Internal, "unexpected error: %v", err)
return nil
6.5、实现客户端
- 在cmd/client/main.go文件中创建一个函数
createLaptop
用于生成Laptop
func createLaptop(laptopClient pb.LaptopServiceClient)
laptop := sample.NewLaptop()
laptop.Id = ""
req := &pb.CreateLaptopRequest
Laptop: laptop,
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
res, err := laptopClient.CreateLaptop(ctx, req)
if err != nil
st, ok := status.FromError(err)
if ok && st.Code() == codes.AlreadyExists
// not a big deal
log.Print("laptop already exists")
else
log.Fatal("cannot create laptop: ", err)
return
log.Printf("created laptop with id: %s", res.Id)
- 在 main 函数中,我们将使用一个 for 循环来创建 10 台随机笔记本电脑和一个新的搜索过滤器
func main()
conn, _ := grpc.Dial("localhost:8888", grpc.WithInsecure())
laptopClient := pb.NewLaptopServiceClient(conn)
for i := 0; i < 10; i++
createLaptop(laptopClient)
filter := &pb.Filter
MaxPriceUsd: 3000,
MinCpuCores: 4,
MinCpuGhz: 2.5,
MinRam: &pb.MemoryValue: 8, Unit: pb.Memory_GIGABYTE,
- 创建函数
searchLaptop()
使用过滤器以及search服务与服务端建立通信- 创建一个超时时间为 5 秒的上下文
- 如果
stream.Recv()
函数调用返回一个文件结束 (EOF) 错误,这意味着它是流的结尾,所以我们只是返回 - 如果一切顺利,我们可以从流中获取笔记本电脑并打印信息
func searchLaptop(laptopClient pb.LaptopServiceClient, filter *pb.Filter)
log.Print("search filter:", filter)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
req := &pb.SearchLaptopRequest
Filter: filter,
stream, _ := laptopClient.SearchLaptop(ctx, req)
for
res, err := stream.Recv()
if err == io.EOF
return
if err != nil
log.Fatal("cannot receive response: ", err)
laptop := res.GetLaptop()
log.Print("- found: ", laptop.GetId())
log.Print(" + brand: ", laptop.GetBrand())
log.Print(" + name: ", laptop.GetName())
log.Print(" + cpu cores: ", laptop.GetCpu().GetNumberCores())
log.Print(" + cpu min ghz: ", laptop.GetCpu().GetMinGhz())
log.Print(" + ram: ", laptop.GetMemory())
log.Print(" + price: ", laptop.GetPriceUsd())
- 在main函数中最后调用searchLaptop函数
func main()
conn, _ := grpc.Dial("localhost:8888", grpc.WithInsecure())
laptopClient := pb.NewLaptopServiceClient(conn)
for i := 0; i < 10; i++
createLaptop(laptopClient)
filter := &pb.Filter
MaxPriceUsd: 3000,
MinCpuCores: 4,
MinCpuGhz: 2.5,
MinRam: &pb.MemoryValue: 8, Unit: pb.Memory_GIGABYTE,
searchLaptop(laptopClient, filter)
6.6、启动服务观察结果
- 启动服务端,命令:
make server
- 启动客服端,命令:
make client
- 服务端结果
- 客户端结果
以上是关于第六节——实现服务器流式 gRPC的主要内容,如果未能解决你的问题,请参考以下文章