第八节——实现双向流式GRPC

Posted 想学习安全的小白

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了第八节——实现双向流式GRPC相关的知识,希望对你有一定的参考价值。

第八章——实现双向流式 gRPC

8.1、定义双向流式 gRPC protobuf

  1. 在proto/laptop_service.proto文件中定义一个新的消息RateLaptopRequest,具有两个属性:笔记本电脑 ID 和分数
message RateLaptopRequest 
  string laptop_id = 1;
  double score = 2;

  1. 定义一个消息RateLaptopResponse,具有三个属性笔记本电脑 ID、这台笔记本电脑的评分次数和平均评分
message RateLaptopResponse 
  string laptop_id = 1;
  uint32 rated_count = 2;
  double average_score = 3;

  1. 定义一个新的双向流 RPC
service LaptopService 
  ...
  rpc RateLaptop(stream RateLaptopRequest) returns (stream RateLaptopResponse) ;

  1. 运行命令:make gen

8.2、实现评级商店

  1. 创建新文件service/rating_store.go
  2. 定义一个RatingStore接口。它有 1 个函数Add,将笔记本电脑 ID 和分数作为输入,并返回更新的笔记本电脑评级或错误。
type RatingStore interface 
	Add(laptopID string, score float64) (*Rating, error)


type Rating struct 
	Count uint32
	Sum   float64

  1. 定义InMemoryRatingStore结构体来实现接口
type InMemoryRatingStore struct 
    mutex  sync.RWMutex
    rating map[string]*Rating


func NewInMemoryRatingStore() *InMemoryRatingStore 
    return &InMemoryRatingStore
        rating: make(map[string]*Rating),
    

  1. InMemoryRatingStore结构体实现Add功能
    • 要更改存储的内部数据,必须获得一个锁
    • 从map中获得笔记本电脑 ID 的评级
    • 如果未找到评分,我们只需创建一个 count 为 1 且 sum 为输入分数的新对象。否则,我们将评分计数增加 1 并将分数添加到总和中
    • 将更新后的评分放回地图并将其返回给调用者
func (store *InMemoryRatingStore) Add(laptopID string, score float64) (*Rating, error) 
    store.mutex.Lock()
    defer store.mutex.Unlock()

    rating := store.rating[laptopID]
    if rating == nil 
        rating = &Rating
            Count: 1,
            Sum:   score,
        
     else 
        rating.Count++
        rating.Sum += score
    

    store.rating[laptopID] = rating
    return rating, nil

  1. 在service/laptop_server.go文件中使用InMemoryRatingStore结构体
    • LaptopServer结构添加RatingStore属性
    • NewLaptopServer()函数添加RatingStore结构体参数
type LaptopServer struct 
	pb.UnimplementedLaptopServiceServer
	laptopStore LaptopStore
	imageStore  ImageStore
	ratingStore RatingStore


func NewLaptopServer(laptopStore LaptopStore, imageStore ImageStore, ratingStore RatingStore) *LaptopServer 
	return &LaptopServer
		laptopStore: laptopStore,
		imageStore:  imageStore,
		ratingStore: ratingStore,
	

  1. 修改cmd/server/main.go文件中的main函数
func main() 
	laptopStore := service.NewInMemoryLaptopStore()
	imageStore := service.NewDiskImageStore("img")
	ratingStore := service.NewInMemoryRatingStore()
	laptopServer := service.NewLaptopServer(laptopStore, imageStore, ratingStore)
	grpcServer := grpc.NewServer()
	pb.RegisterLaptopServiceServer(grpcServer, laptopServer)
	listener, _ := net.Listen("tcp", ":8888")
	grpcServer.Serve(listener)

8.3、服务端代码完善

  1. 在service/laptop_server.go文件中,实现在proto中定义的服务RateLaptop
func (server *LaptopServer) RateLaptop(stream pb.LaptopService_RateLaptopServer) error 
    return nil

  1. 使用 for 循环从流中接收多个请求,接受前检查上下文错误以查看它是否已被取消或是否已超过截止日期
func (server *LaptopServer) RateLaptop(stream pb.LaptopService_RateLaptopServer) error 
	for 
		err := contextError(stream.Context())
		if err != nil 
			return nil
		

		req, err := stream.Recv()
		if err == io.EOF 
			log.Print("no more data")
			break
		
		if err != nil 
			return logError(status.Errorf(codes.Unknown, "cannot receive stream request: %v", err))
		
	
	return nil


func contextError(ctx context.Context) error 
	switch ctx.Err() 
	case context.Canceled:
		return logError(status.Error(codes.Canceled, "request is canceled"))
	case context.DeadlineExceeded:
		return logError(status.Error(codes.DeadlineExceeded, "deadline is exceeded"))
	default:
		return nil
	

  1. 从请求中获取笔记本电脑 ID 和分数。在这里写一个日志,说明收到了带有此笔记本电脑 ID 和分数的请求
func (server *LaptopServer) RateLaptop(stream pb.LaptopService_RateLaptopServer) error 
    for 
        ...
        laptopID := req.GetLaptopId()
        score := req.GetScore()

        log.Printf("received a rate-laptop request: id = %s, score = %.2f", laptopID, score)

        ...
    

    return nil

  1. 调用ratingStore.Add()将新的笔记本电脑分数添加到商店并取回更新的评分对象
func (server *LaptopServer) RateLaptop(stream pb.LaptopService_RateLaptopServer) error 
    for 
        ...

        rating, err := server.ratingStore.Add(laptopID, score)
        if err != nil 
            return logError(status.Errorf(codes.Internal, "cannot add rating to the store: %v", err))
        
        
        ...

    return nil

  1. 创建一个RateLaptopResponse笔记本电脑 ID 是输入笔记本电脑 ID,从评级对象中获取评级计数,并使用评级的总和和计数计算平均分数.调用stream.Send()将响应发送给客户端
func (server *LaptopServer) RateLaptop(stream pb.LaptopService_RateLaptopServer) error 
    for 
        ...
        
        res := &pb.RateLaptopResponse
            LaptopId:     laptopID,
            RatedCount:   rating.Count,
            AverageScore: rating.Sum / float64(rating.Count),
        

        err = stream.Send(res)
        if err != nil 
            return logError(status.Errorf(codes.Unknown, "cannot send stream response: %v", err))
        
    

    return nil

8.4、客户端代码完善

  1. 在sample/random.go文件中添加一个新函数来生成随机笔记本电脑分数
func RandomLaptopScore() float64 
    return float64(randomInt(1, 10))

  1. 在cmd/client/main.go文件中定义一个rateLaptop()函数
func rateLaptop(laptopClient pb.LaptopServiceClient, laptopIDs []string, scores []float64) error 

  1. 创建一个 5 秒后超时的新上下文,并用函数laptopClient.RateLaptop()调用上下文
func rateLaptop(laptopClient pb.LaptopServiceClient, laptopIDs []string, scores []float64) error 
    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    defer cancel()

    stream, err := laptopClient.RateLaptop(ctx)
    if err != nil 
        return fmt.Errorf("cannot rate laptop: %v", err)
    

    ...

  1. 创建一个通道来等待来自服务器的响应
func rateLaptop(laptopClient pb.LaptopServiceClient, laptopIDs []string, scores []float64) error 
    ...

    waitResponse := make(chan error)

    ...

  1. 启动一个新的 go 例程来接收响应。在 go 例程中,我们使用 for 循环,并调用stream.Recv()以从服务器获取响应
func rateLaptop(laptopClient pb.LaptopServiceClient, laptopIDs []string, scores []float64) error 
    ...

    // go routine to receive responses
    go func() 
        for 
            res, err := stream.Recv()
            if err == io.EOF 
                log.Print("no more responses")
                waitResponse <- nil
                return
            
            if err != nil 
                waitResponse <- fmt.Errorf("cannot receive stream response: %v", err)
                return
            

            log.Print("received response: ", res)
        
    ()

    ...

  1. 遍历笔记本电脑列表,并使用输入笔记本电脑 ID 和相应的输入分数为每台笔记本电脑创建一个新请求,之后调用stream.Send()将请求发送到服务器
func rateLaptop(laptopClient pb.LaptopServiceClient, laptopIDs []string, scores []float64) error 
    ...

    // send requests
    for i, laptopID := range laptopIDs 
        req := &pb.RateLaptopRequest
            LaptopId: laptopID,
            Score:    scores[i],
        

        err := stream.Send(req)
        if err != nil 
            return fmt.Errorf("cannot send stream request: %v - %v", err, stream.RecvMsg(nil))
        

        log.Print("sent request: ", req)
    

    ...

  1. 在发送完所有请求后调用stream.CloseSend()告诉服务器我们不再发送任何数据。最后从waitResponse通道中读取并返回接收到的错误。
func rateLaptop(laptopClient pb.LaptopServiceClient, laptopIDs []string, scores []float64) error 
    ...

    err = stream.CloseSend()
    if err != nil 
        return fmt.Errorf("cannot close send: %v", err)
    

    err = <-waitResponse
    return err

  1. 写一个testRateLaptop()函数来调用rateLaptop()功能

    • 假设我们要评价 3 台笔记本电脑,所以我们声明一个切片来保留笔记本电脑的 ID。

    • 使用 for 循环生成随机笔记本电脑,将其 ID 保存到切片中,

    • 调用createLaptop()函数在服务器上创建它。

func testRateLaptop(laptopClient pb.LaptopServiceClient) 
    n := 3
    laptopIDs := make([]string, n)

    for i := 0; i < n; i++ 
        laptop := sample.NewLaptop()
        laptopIDs[i] = laptop.GetId()
        createLaptop(laptopClient, laptop)
    

    ...

  1. 做一个切片来保持分数。我想对这 3 台笔记本电脑进行多轮评分,因此我将在这里使用 for 循环并询问我们是否要进行另一轮评分
func testRateLaptop(laptopClient pb.LaptopServiceClient) 
    ...

    scores := make([]float64, n)
    for 
        fmt.Print("rate laptop (y/n)? ")
        var answer string
        fmt.Scan(&answer)

        if strings.ToLower(answer) != "y" 
            break
        

        for i := 0; i < n; i++ 
            scores[i] = sample.RandomLaptopScore()
        

        err := rateLaptop(laptopClient, laptopIDs, scores)
        if err != nil 
            log.Fatal(err)
        
    

  1. 在 main 函数中,只需调用testRateLaptop(),就可以了
func main() 
	conn, _ := grpc.Dial("localhost:8888", grpc.WithInsecure())

	laptopClient := pb.NewLaptopServiceClient(conn)
	testRateLaptop(laptopClient)

8.5、启动服务

  1. 启动服务端,命令:make server
  2. 启动客户端,命令:make client

以上是关于第八节——实现双向流式GRPC的主要内容,如果未能解决你的问题,请参考以下文章

基于grpc的流式方式实现双向通讯(python)

扩展 gRPC 双向流式聊天服务

gRPC 客户端流是如何实现的

gRPC 流式调用

gRPC 流式传输极简入门指南

gRPC 流式传输极简入门指南