GO实现高可用高并发分布式系统:使用gRPC实现一对多和多对多交互
Posted tyler_download
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了GO实现高可用高并发分布式系统:使用gRPC实现一对多和多对多交互相关的知识,希望对你有一定的参考价值。
在上一节我们使用gRPC实现了客户端和服务端的一对一通讯,也就是客户端向服务端发出一个请求,服务端返回一个结果。但是在很多场景下可能需要客户端向服务端连续发送多个请求后,服务端才能进行处理然后返回一个结果,例如客户端向服务端发送多个订单号,让服务端对订单号进行记录,然后服务端把所有订单号记录后返回结果;或者是客户端发送一个订单号查询所有大于给定订单号的交易记录,然后服务端返回满足条件的十几条记录等。
我们首先看看服务端给客户端返回多条记录的情形。在gRPC中,可以连续发送多条数据的对象叫stream,该对象支持异步发送,假设客户端要查询所有订单号大于10的交易记录,假设在服务端存储了满足条件的记录有20条,那么服务端可以先返回5条,等5分钟后再返回10条,然后等20分钟后再返回5条,因此客户端在接收记录时需要做相应的异步处理。
我们首先修改proto文件如下:
ervice OrderManagement
rpc getOrder(google.protobuf.StringValue) returns(Order);
rpc searchOrders(google.protobuf.StringValue) returns (stream Order);
上面代码中的stream表明,当客户端通过searchOrders接口向服务器发出请求时,它需要通过stream对象来获取一系列从服务器返回的Order数据。按照上一节的方法再次编译proto文件后,我们看看它内容的改变,使用searchOrders作为关键字在生成的pb.go文件中查询我们可以看到如下内容:
type OrderManagementClient interface
GetOrder(ctx context.Context, in *wrappers.StringValue, opts ...grpc.CallOption) (*Order, error)
SearchOrders(ctx context.Context, in *wrappers.StringValue, opts ...grpc.CallOption) (OrderManagement_SearchOrdersClient, error)
。。。
type OrderManagement_SearchOrdersClient interface
Recv() (*Order, error)
grpc.ClientStream
type orderManagementSearchOrdersClient struct
grpc.ClientStream
func (x *orderManagementSearchOrdersClient) Recv() (*Order, error)
m := new(Order)
if err := x.ClientStream.RecvMsg(m); err != nil
return nil, err
return m, nil
这段代码表明客户端在调用searchOrders接口时它会获得一个名为OrderManagement_SearchOrdersClient的对象,这个对象实现了一个接口叫Recv,我们不难猜测到时候客户端需要调用Recv()来接收服务端返回的一系列Order数据。继续往下查可以看到如下代码:
// OrderManagementServer is the server API for OrderManagement service.
type OrderManagementServer interface
GetOrder(context.Context, *wrappers.StringValue) (*Order, error)
SearchOrders(*wrappers.StringValue, OrderManagement_SearchOrdersServer) error
。。。。
type OrderManagement_SearchOrdersServer interface
Send(*Order) error
grpc.ServerStream
type orderManagementSearchOrdersServer struct
grpc.ServerStream
func (x *orderManagementSearchOrdersServer) Send(m *Order) error
return x.ServerStream.SendMsg(m)
上面代码代码表明,服务端在实现searchOrders接口时需要使用一个名为OrderManagement_SearchOrdersServer的对象,它用于一个接口叫Send,我们不难猜测服务端将调用这个接口给客户端发送一系列Order数据,我们首先看服务端代码的实现,在server/main.go中增加代码如下:
func (s *server) SearchOrders(searchQuery *wrappers.StringValue,
stream pb.OrderManagement_SearchOrdersServer) error
for key, order := range orderMap
log.Print(key, order)
for _, itemStr := range order.Items
log.Print(itemStr)
if strings.Contains(itemStr, searchQuery.Value)
err := stream.Send(&order)
if err != nil
return fmt.Errorf("error sending message to stream: %v", err)
log.Print("Matching Order Found: " + key)
break
return nil //返回nil,gRPC会关闭服务器发往客户端的数据管道
服务端通过实现SearchOrders接口来执行业务逻辑,其中stream的类型为OrderManagement_SearchOrdersServer,它有gRPC框架传给我们,通过前面的分析我们知道它有接口Send, 函数的输入参数searchQuery其实就是客户端发送过来的订单号字符串,代码从该数据结构拿到订单号后,从数据存储中进行查询,把所有查到的满足条件的Order数据通过Send发送给客户端。这里需要注意的是,客户端在接收数据过程中可能由于多种原因中断连接,这时服务端调用Send就会返回错误,同时还需要注意的是当服务端发送完所有数据后,一定要return nil,这样gRPC才会把发送管道给关闭调。
同理我们看客户端的实现,在client/main.go的main函数中添加如下代码:
searchStream, _ := client.SearchOrders(ctx, &wrapper.StringValueValue: "Google")
//如果server 使用stream传输结果,客户端需要使用Recv()接收多个返回
for
searchOrder, err := searchStream.Recv()
if err == io.EOF
log.Print("EOF")
break
if err == nil
log.Print("Search result: ", searchOrder)
从前面代码查询可以看到,客户端调用SearchOrder时会返回一个orderManagementSearchOrdersClient对象,它实现了接口Recv()用来接收服务端发送来的一连串数据,所以在上面代码实现中,我们在for循环中调用Recv()接口不断接收服务端发送的数据,如果数据发送完了,前面服务端通过return nil断掉连接后,客户端就会在调用Recv时得到io.EOF错误,这是就可以中断对Recv()的调用。
以上是客户端发送一个请求,服务端返回一系列结果,我们看看反过来,客户端发送一系列请求,服务端返回一个结果,首先还是修改proto文件,增加一个接口定义:
service OrderManagement
rpc getOrder(google.protobuf.StringValue) returns(Order);
rpc searchOrders(google.protobuf.StringValue) returns (stream Order);
rpc updateOrders(stream Order) returns (google.protobuf.StringValue);
updateOrders就是新增加的接口,注意到它对应的输入参数使用了stream来修饰,也就是说客户端会给服务端连续发送一系列Order数据,服务端处理后只返回一个StringValue结构,我们可以使用前面的搜索方法在新编译后的pb.go文件里查询新增加的接口,同样道理,服务端在实现该接口是,也是在一个for循环中使用Recv接口来获取客户端发送的一系列数据,在server/main.go中添加代码如下:
func (s *server) UpdateOrders(stream pb.OrderManagement_UpdateOrdersServer) error
ordersStr := "Updated Order IDs: "
for
order, err := stream.Recv()
if err == io.EOF
//通知客户端不用继续发送
return stream.SendAndClose(&wrapper.StringValueValue: "Orders processed" + ordersStr)
orderMap[order.Id] = *order
log.Printf("Order ID ", order.Id, ": Updated")
ordersStr += order.Id + ", "
代码的实现逻辑跟前面客户端实现的服务请求逻辑一样,相当于服务端和客户端的角色颠倒了一下。这里需要注意的是服务端如何给客户端返回结果,代码中调用了SendAndClose,它把返回结果传输给客户端的同时将连接关闭,于是客户端就不能继续再给服务端发送数据。我们看看客户端的实现,在client/main.go中添加代码如下:
updOrder1 := pb.OrderId: "102", Items:[]string"Google Pixel 3A", "Google Pixel Book", Destination:"Mountain View, CA", Price:1100.00
updOrder2 := pb.OrderId: "103", Items:[]string"Apple Watch S4", "Mac Book Pro", "iPad Pro", Destination:"San Jose, CA", Price:2800.00
updOrder3 := pb.OrderId: "104", Items:[]string"Google Home Mini", "Google Nest Hub", "iPad Mini", Destination:"Mountain View, CA", Price:2200.00
updateStream, err := client.UpdateOrders(ctx)
if err != nil
log.Fatalf("%v.UpdateOrders(_) = , %v", client, err)
if err := updateStream.Send(&updOrder1); err != nil
log.Fatalf("%v.Send(%v) = %v", updateStream, updOrder1, err)
if err := updateStream.Send(&updOrder2); err != nil
log.Fatalf("%v.Send(%v) = %v", updateStream, updOrder2, err)
if err := updateStream.Send(&updOrder3); err != nil
log.Fatalf("%v.Send(%v) = %v", updateStream, updOrder3, err)
updateRes, err := updateStream.CloseAndRecv()
if err != nil
log.Fatalf("%v.CloseAndRecv() got error %v, want %v", updateStream, err, nil)
log.Printf("Update orders res: %s", updateRes)
客户端先是构造一系列Order数据然后分别调用多次Send传递给服务端,如果客户端没有多余数据要传输后,它调用CloseAndRecv(),这个函数会让服务端的Recv()返回io.EOF错误,然后客户端阻塞等待服务端将处理结果返回。
最后我们看客户端给服务端发送一系列数据,然后服务端返回一系列结果给客户端的情况。假设客户端给服务端发送了一系列订单信息,服务端收到订单信息后,把收货地址相同的货物信息合在一起发送给客户端,我们用shipment表示收货地址相同的货物信息组合。如果客户端发送order1, order2,order3, order4 等4个订单号给服务端,其中order1 ,order3 对应货物的收货地址一样, order2, order4对应的收货地址一样,于是服务端就返回两个shipment结构,第一个对应order1, order3, 第二个对应order2, order4,我们先看proto文件的修改:
service OrderManagement
rpc getOrder(google.protobuf.StringValue) returns(Order);
rpc searchOrders(google.protobuf.StringValue) returns (stream Order);
rpc updateOrders(stream Order) returns (google.protobuf.StringValue);
rpc processOrders(stream google.protobuf.StringValue) returns (stream CombinedShipment);
message Order
string id = 1;
repeated string items = 2;
string description = 3;
float price = 4;
string destination = 5;
message CombinedShipment
string id = 1;
string status = 2;
repeated Order orderList = 3;
我们先看服务端的实现,在server/main.go中添加如下代码:
func (s *server) ProcessOrder(stream pb.OrderManagement_ProcessOrdersServer) error
batchMarker := 1
var combinedShipmentMap = make(map[string]pb.CombinedShipment)
for
orderId, err := stream.Recv()
log.Printf("Reading Proc order: %s", orderId)
if err == io.EOF
log.Printf("EOF: %s", orderId)
for _, shipment := range combinedShipmentMap
if err := stream.Send(&shipment); err != nil
return err
return nil //返回nil,gRPC框架会关闭调server发送给客户端的管道
if err != nil
log.Println(err)
return err
destination := orderMap[orderId.GetValue()].Destination
shipment, found := combinedShipmentMap[destination]
if found
ord := orderMap[orderId.GetValue()]
shipment.OrdersList = append(shipment.OrderList, &ord)
combinedShipmentMap[destination] = shipment
else
comShip := pb.CombinedShipmentId: "cmb - " + (orderMap[orderId.GetValue()].Destination), Status: "Processed!",
ord := orderMap[orderId.GetValue()]
comShip.OrdersList = append(shipment.OrdersList, &ord)
combinedShipmentMap[destination] = comShip
log.Print(len(comShip.OrdersList), comShip.GetId())
if batchMarker == orderBatchSize
for _, comb := range combinedShipmentMap
log.Printf("Shipping: %v -> %v", comb.Id, len(comb.OrdersList))
if err := stream.Send(&comb); err != nil
return err
batchMarker = 0
combinedShipmentMap = make(map[string]pb.CombinedShipment)
else
batchMarker++
上面代码实现我们只需要注意几点,首先它使用一个stream对象来完成两个功能,一个功能是调用Recv()来接收客户端发送的多个数据,然后同样是这个对象,继续调用它的Send接口给客户端发送多个数据,也就是一个stream对象既负责接收客户端发送的一系列数据,又负责将服务端的一系列处理结果发送给客户端,把握这一点就行,其他那些业务逻辑无关紧要。
我们再看看客户端的实现,在client/main.go中添加如下代码:
func main()
。。。
channel := make(chan struct)
go asncClientBidirectionalRPC(streamProcOrder, channel)
time.Sleep(time.Milliscond * 1000)
if err := streamProcOrder.Send(&wrapper.StringValueValue: "101"); err != nil
log.Fatalf("%v.Send(%v) = %v", client, "101", err)
if err := streamProcOrder.CloseSend(); err != nil
log.Fatal(err)
channel <- struct
func asncClientBidirectionalRPC(streamProcOrder pb.OrderManagement_ProcessOrdersClient, c chan struct)
for
combinedShipment, errorProcOrder := streamProcOrder.Recv()
if errProcOrder == io.EOF
break
log.Printf("Combined shipment: ", combinedShipment.OrdersList)
<-c
上面代码实现中有一个关键点需要把握,客户端也是通过一个stream对象来完成数据的发送和接收,同时我们要特别注意到,同一个stream对象发送和接收完全可以在异步的条件下同时进行,所有上面代码在主函数main里通过Send发送请求,然后扔出一个goroutine异步接收服务端发送回来的数据,虽然发送和接收同时进行但客户端不用加锁,也就是gRPC框架保证了发送和接收在异步情况下业务逻辑依然不会出错。
相关代码从上一节的github路径可以获取。
以上是关于GO实现高可用高并发分布式系统:使用gRPC实现一对多和多对多交互的主要内容,如果未能解决你的问题,请参考以下文章
go实现高并发高可用分布式系统:设计类似kafka的高并发海量数据存储机制2