第七节——实现客户端流式GRPC
Posted 想学习安全的小白
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了第七节——实现客户端流式GRPC相关的知识,希望对你有一定的参考价值。
第七章——使用客户端流式 gRPC 分块上传文件
7.1、在proto文件中定义client-streaming RPC
- 在laptop_service.proto文件中定义新的消息UploadImageRequest
- 使用oneof关键词定义属性data,里面可以是
ImageInfo
消息,也可以是chunk_data
- 使用oneof关键词定义属性data,里面可以是
message UploadImageRequest
oneof data
ImageInfo info=1;
bytes chunk_data=2;
message ImageInfo
string laptop_id=1;
string image_type=2;
- 定义一条
UploadImageResponse
消息,一旦服务器接收到所有图像块,该消息将返回给客户端- 此消息包含服务器生成的图像 ID,以及上传图像的总大小(以字节为单位)。
message UploadImageResponse
string id = 1;
uint32 size = 2;
- 定义
UploadImage
RPC
service LaptopService
...
rpc UploadImage(stream UploadImageRequest) returns (UploadImageResponse) ;
- 运行命令:
make gen
来生成代码
7.2、实现图片存储
- 创建文件
service/image_store.go
- 定义
ImageStore
接口,接口有一个Save方法 - Save方法需要 3 个输入参数:笔记本电脑 ID、图像类型和字节缓冲区给出的图像数据。它会返回已保存图像的 ID
type ImageStore interface
Save(laptopID string, imageType string, imageData bytes.Buffer) (string, error)
- 定义
DiskImageStore
结构体用于实现接口
type DiskImageStore struct
mutex sync.RWMutex
imageFolder string
images map[string]*ImageInfo
type ImageInfo struct
LaptopID string
Type string
Path string
- 编写一个函数来创建一个新的
DiskImageStore
. 它只有 1 个输入,即图像文件夹。在里面,我们只需要初始化images
func NewDiskImageStore(imageFolder string) *DiskImageStore
return &DiskImageStore
imageFolder: imageFolder,
images: make(map[string]*ImageInfo),
- 实现接口
Save()
方法- 使用
uuid.NewRandom()
为图像生成一个新的随机 UUID - 通过加入图像文件夹、图像 ID 和图像类型来创建存储图像的路径
- 调用
os.Create()
创建文件 - 调用
imageData.WriteTo()
将图像数据写入创建的文件
- 使用
func (store *DiskImageStore) Save(laptopID string, imageType string, imageData bytes.Buffer) (string, error)
imageID, _ := uuid.NewRandom()
imagePath := fmt.Sprintf("%s%s%s", store.imageFolder, imageID, imageType)
file, err := os.Create(imagePath)
if err != nil
return "", fmt.Errorf("cannot create image file: %w", err)
imageData.WriteTo(file)
store.mutex.Lock()
defer store.mutex.Unlock()
store.images[imageID.String()] = &ImageInfo
LaptopID: laptopID,
Type: imageType,
Path: imagePath,
return imageID.String(), nil
- 将图像存储添加到
LaptopServer
结构- 修改laptop_server.go文件中的LaptopServer结构体,添加imageStore属性
- 修改NewLaptopServer函数,传递一个ImageStore用于生成LaptopServer结构体
type LaptopServer struct
pb.UnimplementedLaptopServiceServer
laptopStore LaptopStore
imageStore ImageStore
func NewLaptopServer(laptopStore LaptopStore, imageStore ImageStore) *LaptopServer
return &LaptopServer
laptopStore: laptopStore,
imageStore: imageStore,
- 在server/main.go文件中,重新使用NewLaptopServer方法
func main()
laptopStore := service.NewInMemoryLaptopStore()
imageStore := service.NewDiskImageStore("img")
laptopServer := service.NewLaptopServer(laptopStore, imageStore)
grpcServer := grpc.NewServer()
pb.RegisterLaptopServiceServer(grpcServer, laptopServer)
listener, _ := net.Listen("tcp", ":8888")
grpcServer.Serve(listener)
7.3、实现服务端
- 在文件service/laptop_server.go中实现在proto中定义的UploadImage方法
- 调用
stream.Recv()
接收第一个请求 - 从请求中获取笔记本电脑 ID 和图像类型(由于proto中使用oneof定义的data所以有两个结构体)。
- 第一次Recv接收的是图像的ID,之后接收图像的数据
- 创建一个新的字节缓冲区用于接收图像块数据
- 使用了一个 for 循环调用
stream.Recv()
以获取图像数据 - 使用Write方法将数据写入缓冲区
- 在for循环之后,我们已经收集了缓冲区中图像的所有数据。所以我们可以调用
imageStore.Save()
将图像数据保存到存储中并取回图像ID
- 调用
func (server *LaptopServer) UploadImage(stream pb.LaptopService_UploadImageServer) error
req, err := stream.Recv()
if err != nil
return logError(status.Errorf(codes.Unknown, "cannot receive image info"))
laptopID := req.GetInfo().GetLaptopId()
imageType := req.GetInfo().GetImageType()
log.Printf("receive an upload-image request for laptop %s with image type %s", laptopID, imageType)
imageData := bytes.Buffer
imageSize := 0
for
log.Print("waiting to receive more data")
req, err := stream.Recv()
if err == io.EOF
log.Print("no more data")
break
if err != nil
return logError(status.Errorf(codes.Unknown, "cannot receive chunk data: %v", err))
chunk := req.GetChunkData()
size := len(chunk)
log.Printf("received a chunk with size: %d", size)
imageSize += size
imageData.Write(chunk)
imageID, err := server.imageStore.Save(laptopID, imageType, imageData)
if err != nil
return logError(status.Errorf(codes.Internal, "cannot save image to the store: %v", err))
res := &pb.UploadImageResponse
Id: imageID,
Size: uint32(imageSize),
err = stream.SendAndClose(res)
if err != nil
return logError(status.Errorf(codes.Unknown, "cannot send response: %v", err))
log.Printf("saved image with id: %s, size: %d", imageID, imageSize)
return nil
7.4、实现客户端
- 在cmd/client/main.go文件中重构createLaptop函数
func createLaptop(laptopClient pb.LaptopServiceClient, laptop *pb.Laptop)
...
- 创建单独的函数来测试创建笔记本电脑 RPC 和搜索笔记本电脑 RPC
func testCreateLaptop(laptopClient pb.LaptopServiceClient)
createLaptop(laptopClient, sample.NewLaptop())
func testSearchLaptop(laptopClient pb.LaptopServiceClient)
for i := 0; i < 10; i++
createLaptop(laptopClient, sample.NewLaptop())
filter := &pb.Filter
MaxPriceUsd: 3000,
MinCpuCores: 4,
MinCpuGhz: 2.5,
MinRam: &pb.MemoryValue: 8, Unit: pb.Memory_GIGABYTE,
searchLaptop(laptopClient, filter)
- 编写一个新函数来测试上传图像 RPC 并从 main 函数中调用它
- 在
testUploadImage()
函数中,我们首先随机生成一台笔记本电脑,然后调用createLaptop()
函数在服务器上创建它
- 在
func testUploadImage(laptopClient pb.LaptopServiceClient)
laptop := sample.NewLaptop()
createLaptop(laptopClient, laptop)
uploadImage(laptopClient, laptop.GetId(), "tmp/laptop.jpg")
func main()
conn, _ := grpc.Dial("localhost:8888", grpc.WithInsecure())
laptopClient := pb.NewLaptopServiceClient(conn)
testUploadImage(laptopClient)
- 将编写
uploadImage()
将这台笔记本电脑的图像上传到服务器的函数。此函数有 3 个输入参数:笔记本电脑客户端、笔记本电脑 ID 和笔记本电脑图像的路径- 调用
os.Open()
打开图像文件 - 创建一个超时时间为 5 秒的上下文,并调用
laptopClient.UploadImage()
函数 - 创建第一个请求以向服务器发送一些图像信息,其中包括笔记本电脑 ID、图像类型或图像文件的扩展名
- 调用
stream.Send()
将第一个请求发送到服务器 - 将创建一个缓冲区读取器以分块读取图像文件的内容。假设每个块为 1 KB,或 1024 字节。我们将在 for 循环中顺序读取图像数据块
- 调用
reader.Read()
读取数据到缓冲区 - 调用
stream.Send()
将其发送到服务器
- 调用
func uploadImage(laptopClient pb.LaptopServiceClient, laptopID string, imagePath string)
file, _ := os.Open(imagePath)
defer file.Close()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
stream, err := laptopClient.UploadImage(ctx)
if err != nil
log.Fatal("cannot upload image: ", err)
req := &pb.UploadImageRequest
Data: &pb.UploadImageRequest_Info
Info: &pb.ImageInfo
LaptopId: laptopID,
ImageType: filepath.Ext(imagePath),
,
,
err = stream.Send(req)
if err != nil
log.Fatal("cannot send image info to server: ", err, stream.RecvMsg(nil))
reader := bufio.NewReader(file)
buffer := make([]byte, 1024)
for
n, err := reader.Read(buffer)
if err == io.EOF
break
if err != nil
log.Fatal("cannot read chunk to buffer: ", err)
req := &pb.UploadImageRequest
Data: &pb.UploadImageRequest_ChunkData
ChunkData: buffer[:n],
,
err = stream.Send(req)
if err != nil
log.Fatal("cannot send chunk to server: ", err, stream.RecvMsg(nil))
res, err := stream.CloseAndRecv()
if err != nil
log.Fatal("cannot receive response: ", err)
log.Printf("image uploaded with id: %s, size: %d", res.GetId(), res.GetSize())
7.5、启动服务
- 先将laptop.jpg图片存放进tmp文件夹中
- 启动服务端,命令:
make server
- 启动客户端,命令:
make client
以上是关于第七节——实现客户端流式GRPC的主要内容,如果未能解决你的问题,请参考以下文章