第七节——实现客户端流式GRPC

Posted 想学习安全的小白

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了第七节——实现客户端流式GRPC相关的知识,希望对你有一定的参考价值。

第七章——使用客户端流式 gRPC 分块上传文件

7.1、在proto文件中定义client-streaming RPC

  1. 在laptop_service.proto文件中定义新的消息UploadImageRequest
    • 使用oneof关键词定义属性data,里面可以是ImageInfo消息,也可以是chunk_data
message UploadImageRequest
    oneof data
        ImageInfo info=1;
        bytes chunk_data=2;
    


message ImageInfo
    string laptop_id=1;
    string image_type=2;

  1. 定义一条UploadImageResponse消息,一旦服务器接收到所有图像块,该消息将返回给客户端
    • 此消息包含服务器生成的图像 ID,以及上传图像的总大小(以字节为单位)。
message UploadImageResponse 
  string id = 1;
  uint32 size = 2;

  1. 定义UploadImageRPC
service LaptopService 
  ...
  rpc UploadImage(stream UploadImageRequest) returns (UploadImageResponse) ;

  1. 运行命令:make gen 来生成代码

7.2、实现图片存储

  1. 创建文件service/image_store.go
  2. 定义ImageStore接口,接口有一个Save方法
  3. Save方法需要 3 个输入参数:笔记本电脑 ID、图像类型和字节缓冲区给出的图像数据。它会返回已保存图像的 ID
type ImageStore interface 
	Save(laptopID string, imageType string, imageData bytes.Buffer) (string, error)

  1. 定义DiskImageStore结构体用于实现接口
type DiskImageStore struct 
	mutex       sync.RWMutex
	imageFolder string
	images      map[string]*ImageInfo


type ImageInfo struct 
	LaptopID string
	Type     string
	Path     string

  1. 编写一个函数来创建一个新的DiskImageStore. 它只有 1 个输入,即图像文件夹。在里面,我们只需要初始化images
func NewDiskImageStore(imageFolder string) *DiskImageStore 
	return &DiskImageStore
		imageFolder: imageFolder,
		images:      make(map[string]*ImageInfo),
	

  1. 实现接口Save()方法
    • 使用uuid.NewRandom()为图像生成一个新的随机 UUID
    • 通过加入图像文件夹、图像 ID 和图像类型来创建存储图像的路径
    • 调用os.Create()创建文件
    • 调用imageData.WriteTo()将图像数据写入创建的文件
func (store *DiskImageStore) Save(laptopID string, imageType string, imageData bytes.Buffer) (string, error) 
	imageID, _ := uuid.NewRandom()

	imagePath := fmt.Sprintf("%s%s%s", store.imageFolder, imageID, imageType)

	file, err := os.Create(imagePath)
	if err != nil 
		return "", fmt.Errorf("cannot create image file: %w", err)
	

	imageData.WriteTo(file)

	store.mutex.Lock()
	defer store.mutex.Unlock()

	store.images[imageID.String()] = &ImageInfo
		LaptopID: laptopID,
		Type:     imageType,
		Path:     imagePath,
	

	return imageID.String(), nil

  1. 将图像存储添加到LaptopServer结构
    • 修改laptop_server.go文件中的LaptopServer结构体,添加imageStore属性
    • 修改NewLaptopServer函数,传递一个ImageStore用于生成LaptopServer结构体
type LaptopServer struct 
	pb.UnimplementedLaptopServiceServer
	laptopStore LaptopStore
	imageStore  ImageStore


func NewLaptopServer(laptopStore LaptopStore, imageStore ImageStore) *LaptopServer 
	return &LaptopServer
		laptopStore: laptopStore,
		imageStore:  imageStore,
	

  1. 在server/main.go文件中,重新使用NewLaptopServer方法
func main() 
	laptopStore := service.NewInMemoryLaptopStore()
	imageStore := service.NewDiskImageStore("img")
	laptopServer := service.NewLaptopServer(laptopStore, imageStore)
	grpcServer := grpc.NewServer()
	pb.RegisterLaptopServiceServer(grpcServer, laptopServer)
	listener, _ := net.Listen("tcp", ":8888")
	grpcServer.Serve(listener)

7.3、实现服务端

  1. 在文件service/laptop_server.go中实现在proto中定义的UploadImage方法
    • 调用stream.Recv()接收第一个请求
    • 从请求中获取笔记本电脑 ID 和图像类型(由于proto中使用oneof定义的data所以有两个结构体)。
    • 第一次Recv接收的是图像的ID,之后接收图像的数据
    • 创建一个新的字节缓冲区用于接收图像块数据
    • 使用了一个 for 循环调用stream.Recv()以获取图像数据
    • 使用Write方法将数据写入缓冲区
    • 在for循环之后,我们已经收集了缓冲区中图像的所有数据。所以我们可以调用imageStore.Save()将图像数据保存到存储中并取回图像ID
func (server *LaptopServer) UploadImage(stream pb.LaptopService_UploadImageServer) error 
	req, err := stream.Recv()
	if err != nil 
		return logError(status.Errorf(codes.Unknown, "cannot receive image info"))
	

	laptopID := req.GetInfo().GetLaptopId()
	imageType := req.GetInfo().GetImageType()
	log.Printf("receive an upload-image request for laptop %s with image type %s", laptopID, imageType)

	imageData := bytes.Buffer
	imageSize := 0

	for 
		log.Print("waiting to receive more data")

		req, err := stream.Recv()
		if err == io.EOF 
			log.Print("no more data")
			break
		
		if err != nil 
			return logError(status.Errorf(codes.Unknown, "cannot receive chunk data: %v", err))
		

		chunk := req.GetChunkData()
		size := len(chunk)

		log.Printf("received a chunk with size: %d", size)

		imageSize += size
		imageData.Write(chunk)
	

	imageID, err := server.imageStore.Save(laptopID, imageType, imageData)
	if err != nil 
		return logError(status.Errorf(codes.Internal, "cannot save image to the store: %v", err))
	

	res := &pb.UploadImageResponse
		Id:   imageID,
		Size: uint32(imageSize),
	

	err = stream.SendAndClose(res)
	if err != nil 
		return logError(status.Errorf(codes.Unknown, "cannot send response: %v", err))
	

	log.Printf("saved image with id: %s, size: %d", imageID, imageSize)
	return nil

7.4、实现客户端

  1. 在cmd/client/main.go文件中重构createLaptop函数
func createLaptop(laptopClient pb.LaptopServiceClient, laptop *pb.Laptop) 
    ...

  1. 创建单独的函数来测试创建笔记本电脑 RPC 和搜索笔记本电脑 RPC
func testCreateLaptop(laptopClient pb.LaptopServiceClient) 
    createLaptop(laptopClient, sample.NewLaptop())


func testSearchLaptop(laptopClient pb.LaptopServiceClient) 
    for i := 0; i < 10; i++ 
		createLaptop(laptopClient, sample.NewLaptop())
	

	filter := &pb.Filter
		MaxPriceUsd: 3000,
		MinCpuCores: 4,
		MinCpuGhz:   2.5,
		MinRam:      &pb.MemoryValue: 8, Unit: pb.Memory_GIGABYTE,
	

	searchLaptop(laptopClient, filter)

  1. 编写一个新函数来测试上传图像 RPC 并从 main 函数中调用它
    • testUploadImage()函数中,我们首先随机生成一台笔记本电脑,然后调用createLaptop()函数在服务器上创建它
func testUploadImage(laptopClient pb.LaptopServiceClient) 
	laptop := sample.NewLaptop()
	createLaptop(laptopClient, laptop)
	uploadImage(laptopClient, laptop.GetId(), "tmp/laptop.jpg")


func main() 
	conn, _ := grpc.Dial("localhost:8888", grpc.WithInsecure())

	laptopClient := pb.NewLaptopServiceClient(conn)
	testUploadImage(laptopClient)

  1. 将编写uploadImage()将这台笔记本电脑的图像上传到服务器的函数。此函数有 3 个输入参数:笔记本电脑客户端、笔记本电脑 ID 和笔记本电脑图像的路径
    • 调用os.Open()打开图像文件
    • 创建一个超时时间为 5 秒的上下文,并调用laptopClient.UploadImage()函数
    • 创建第一个请求以向服务器发送一些图像信息,其中包括笔记本电脑 ID、图像类型或图像文件的扩展名
    • 调用stream.Send()将第一个请求发送到服务器
    • 将创建一个缓冲区读取器以分块读取图像文件的内容。假设每个块为 1 KB,或 1024 字节。我们将在 for 循环中顺序读取图像数据块
    • 调用reader.Read()读取数据到缓冲区
    • 调用stream.Send()将其发送到服务器
func uploadImage(laptopClient pb.LaptopServiceClient, laptopID string, imagePath string) 
	file, _ := os.Open(imagePath)
	defer file.Close()

	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel()

	stream, err := laptopClient.UploadImage(ctx)
	if err != nil 
		log.Fatal("cannot upload image: ", err)
	

	req := &pb.UploadImageRequest
		Data: &pb.UploadImageRequest_Info
			Info: &pb.ImageInfo
				LaptopId:  laptopID,
				ImageType: filepath.Ext(imagePath),
			,
		,
	

	err = stream.Send(req)
	if err != nil 
		log.Fatal("cannot send image info to server: ", err, stream.RecvMsg(nil))
	

	reader := bufio.NewReader(file)
	buffer := make([]byte, 1024)

	for 
		n, err := reader.Read(buffer)
		if err == io.EOF 
			break
		
		if err != nil 
			log.Fatal("cannot read chunk to buffer: ", err)
		

		req := &pb.UploadImageRequest
			Data: &pb.UploadImageRequest_ChunkData
				ChunkData: buffer[:n],
			,
		

		err = stream.Send(req)
		if err != nil 
			log.Fatal("cannot send chunk to server: ", err, stream.RecvMsg(nil))
		
	

	res, err := stream.CloseAndRecv()
	if err != nil 
		log.Fatal("cannot receive response: ", err)
	

	log.Printf("image uploaded with id: %s, size: %d", res.GetId(), res.GetSize())


7.5、启动服务

  1. 先将laptop.jpg图片存放进tmp文件夹中
  2. 启动服务端,命令:make server
  3. 启动客户端,命令:make client

以上是关于第七节——实现客户端流式GRPC的主要内容,如果未能解决你的问题,请参考以下文章

第七节——实现客户端流式GRPC

第六节——实现服务器流式 gRPC

第六节——实现服务器流式 gRPC

第六节——实现服务器流式 gRPC

第七节,初识模块字节码和注释

gRPC - Firestore 如何实现服务器-> 客户端实时流式传输