第八节——实现双向流式GRPC
Posted 想学习安全的小白
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了第八节——实现双向流式GRPC相关的知识,希望对你有一定的参考价值。
第八章——实现双向流式 gRPC
8.1、定义双向流式 gRPC protobuf
- 在proto/laptop_service.proto文件中定义一个新的消息
RateLaptopRequest
,具有两个属性:笔记本电脑 ID 和分数
message RateLaptopRequest
string laptop_id = 1;
double score = 2;
- 定义一个消息
RateLaptopResponse
,具有三个属性笔记本电脑 ID、这台笔记本电脑的评分次数和平均评分
message RateLaptopResponse
string laptop_id = 1;
uint32 rated_count = 2;
double average_score = 3;
- 定义一个新的双向流 RPC
service LaptopService
...
rpc RateLaptop(stream RateLaptopRequest) returns (stream RateLaptopResponse) ;
- 运行命令:
make gen
8.2、实现评级商店
- 创建新文件
service/rating_store.go
- 定义一个
RatingStore
接口。它有 1 个函数Add
,将笔记本电脑 ID 和分数作为输入,并返回更新的笔记本电脑评级或错误。
type RatingStore interface
Add(laptopID string, score float64) (*Rating, error)
type Rating struct
Count uint32
Sum float64
- 定义
InMemoryRatingStore
结构体来实现接口
type InMemoryRatingStore struct
mutex sync.RWMutex
rating map[string]*Rating
func NewInMemoryRatingStore() *InMemoryRatingStore
return &InMemoryRatingStore
rating: make(map[string]*Rating),
- 为
InMemoryRatingStore
结构体实现Add功能- 要更改存储的内部数据,必须获得一个锁
- 从map中获得笔记本电脑 ID 的评级
- 如果未找到评分,我们只需创建一个 count 为 1 且 sum 为输入分数的新对象。否则,我们将评分计数增加 1 并将分数添加到总和中
- 将更新后的评分放回地图并将其返回给调用者
func (store *InMemoryRatingStore) Add(laptopID string, score float64) (*Rating, error)
store.mutex.Lock()
defer store.mutex.Unlock()
rating := store.rating[laptopID]
if rating == nil
rating = &Rating
Count: 1,
Sum: score,
else
rating.Count++
rating.Sum += score
store.rating[laptopID] = rating
return rating, nil
- 在service/laptop_server.go文件中使用
InMemoryRatingStore
结构体- 向
LaptopServer
结构添加RatingStore属性 - 向
NewLaptopServer()
函数添加RatingStore结构体参数
- 向
type LaptopServer struct
pb.UnimplementedLaptopServiceServer
laptopStore LaptopStore
imageStore ImageStore
ratingStore RatingStore
func NewLaptopServer(laptopStore LaptopStore, imageStore ImageStore, ratingStore RatingStore) *LaptopServer
return &LaptopServer
laptopStore: laptopStore,
imageStore: imageStore,
ratingStore: ratingStore,
- 修改cmd/server/main.go文件中的main函数
func main()
laptopStore := service.NewInMemoryLaptopStore()
imageStore := service.NewDiskImageStore("img")
ratingStore := service.NewInMemoryRatingStore()
laptopServer := service.NewLaptopServer(laptopStore, imageStore, ratingStore)
grpcServer := grpc.NewServer()
pb.RegisterLaptopServiceServer(grpcServer, laptopServer)
listener, _ := net.Listen("tcp", ":8888")
grpcServer.Serve(listener)
8.3、服务端代码完善
- 在service/laptop_server.go文件中,实现在proto中定义的服务RateLaptop
func (server *LaptopServer) RateLaptop(stream pb.LaptopService_RateLaptopServer) error
return nil
- 使用 for 循环从流中接收多个请求,接受前检查上下文错误以查看它是否已被取消或是否已超过截止日期
func (server *LaptopServer) RateLaptop(stream pb.LaptopService_RateLaptopServer) error
for
err := contextError(stream.Context())
if err != nil
return nil
req, err := stream.Recv()
if err == io.EOF
log.Print("no more data")
break
if err != nil
return logError(status.Errorf(codes.Unknown, "cannot receive stream request: %v", err))
return nil
func contextError(ctx context.Context) error
switch ctx.Err()
case context.Canceled:
return logError(status.Error(codes.Canceled, "request is canceled"))
case context.DeadlineExceeded:
return logError(status.Error(codes.DeadlineExceeded, "deadline is exceeded"))
default:
return nil
- 从请求中获取笔记本电脑 ID 和分数。在这里写一个日志,说明收到了带有此笔记本电脑 ID 和分数的请求
func (server *LaptopServer) RateLaptop(stream pb.LaptopService_RateLaptopServer) error
for
...
laptopID := req.GetLaptopId()
score := req.GetScore()
log.Printf("received a rate-laptop request: id = %s, score = %.2f", laptopID, score)
...
return nil
- 调用
ratingStore.Add()
将新的笔记本电脑分数添加到商店并取回更新的评分对象
func (server *LaptopServer) RateLaptop(stream pb.LaptopService_RateLaptopServer) error
for
...
rating, err := server.ratingStore.Add(laptopID, score)
if err != nil
return logError(status.Errorf(codes.Internal, "cannot add rating to the store: %v", err))
...
return nil
- 创建一个
RateLaptopResponse
笔记本电脑 ID 是输入笔记本电脑 ID,从评级对象中获取评级计数,并使用评级的总和和计数计算平均分数.调用stream.Send()
将响应发送给客户端
func (server *LaptopServer) RateLaptop(stream pb.LaptopService_RateLaptopServer) error
for
...
res := &pb.RateLaptopResponse
LaptopId: laptopID,
RatedCount: rating.Count,
AverageScore: rating.Sum / float64(rating.Count),
err = stream.Send(res)
if err != nil
return logError(status.Errorf(codes.Unknown, "cannot send stream response: %v", err))
return nil
8.4、客户端代码完善
- 在sample/random.go文件中添加一个新函数来生成随机笔记本电脑分数
func RandomLaptopScore() float64
return float64(randomInt(1, 10))
- 在cmd/client/main.go文件中定义一个rateLaptop()函数
func rateLaptop(laptopClient pb.LaptopServiceClient, laptopIDs []string, scores []float64) error
- 创建一个 5 秒后超时的新上下文,并用函数laptopClient.RateLaptop()调用上下文
func rateLaptop(laptopClient pb.LaptopServiceClient, laptopIDs []string, scores []float64) error
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
stream, err := laptopClient.RateLaptop(ctx)
if err != nil
return fmt.Errorf("cannot rate laptop: %v", err)
...
- 创建一个通道来等待来自服务器的响应
func rateLaptop(laptopClient pb.LaptopServiceClient, laptopIDs []string, scores []float64) error
...
waitResponse := make(chan error)
...
- 启动一个新的 go 例程来接收响应。在 go 例程中,我们使用 for 循环,并调用
stream.Recv()
以从服务器获取响应
func rateLaptop(laptopClient pb.LaptopServiceClient, laptopIDs []string, scores []float64) error
...
// go routine to receive responses
go func()
for
res, err := stream.Recv()
if err == io.EOF
log.Print("no more responses")
waitResponse <- nil
return
if err != nil
waitResponse <- fmt.Errorf("cannot receive stream response: %v", err)
return
log.Print("received response: ", res)
()
...
- 遍历笔记本电脑列表,并使用输入笔记本电脑 ID 和相应的输入分数为每台笔记本电脑创建一个新请求,之后调用
stream.Send()
将请求发送到服务器
func rateLaptop(laptopClient pb.LaptopServiceClient, laptopIDs []string, scores []float64) error
...
// send requests
for i, laptopID := range laptopIDs
req := &pb.RateLaptopRequest
LaptopId: laptopID,
Score: scores[i],
err := stream.Send(req)
if err != nil
return fmt.Errorf("cannot send stream request: %v - %v", err, stream.RecvMsg(nil))
log.Print("sent request: ", req)
...
- 在发送完所有请求后调用
stream.CloseSend()
告诉服务器我们不再发送任何数据。最后从waitResponse
通道中读取并返回接收到的错误。
func rateLaptop(laptopClient pb.LaptopServiceClient, laptopIDs []string, scores []float64) error
...
err = stream.CloseSend()
if err != nil
return fmt.Errorf("cannot close send: %v", err)
err = <-waitResponse
return err
-
写一个
testRateLaptop()
函数来调用rateLaptop()
功能-
假设我们要评价 3 台笔记本电脑,所以我们声明一个切片来保留笔记本电脑的 ID。
-
使用 for 循环生成随机笔记本电脑,将其 ID 保存到切片中,
-
调用
createLaptop()
函数在服务器上创建它。
-
func testRateLaptop(laptopClient pb.LaptopServiceClient)
n := 3
laptopIDs := make([]string, n)
for i := 0; i < n; i++
laptop := sample.NewLaptop()
laptopIDs[i] = laptop.GetId()
createLaptop(laptopClient, laptop)
...
- 做一个切片来保持分数。我想对这 3 台笔记本电脑进行多轮评分,因此我将在这里使用 for 循环并询问我们是否要进行另一轮评分
func testRateLaptop(laptopClient pb.LaptopServiceClient)
...
scores := make([]float64, n)
for
fmt.Print("rate laptop (y/n)? ")
var answer string
fmt.Scan(&answer)
if strings.ToLower(answer) != "y"
break
for i := 0; i < n; i++
scores[i] = sample.RandomLaptopScore()
err := rateLaptop(laptopClient, laptopIDs, scores)
if err != nil
log.Fatal(err)
- 在 main 函数中,只需调用
testRateLaptop()
,就可以了
func main()
conn, _ := grpc.Dial("localhost:8888", grpc.WithInsecure())
laptopClient := pb.NewLaptopServiceClient(conn)
testRateLaptop(laptopClient)
8.5、启动服务
- 启动服务端,命令:
make server
- 启动客户端,命令:
make client
以上是关于第八节——实现双向流式GRPC的主要内容,如果未能解决你的问题,请参考以下文章