galang 学习之grpc+ protobuf

Posted 柳清风09

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了galang 学习之grpc+ protobuf相关的知识,希望对你有一定的参考价值。

上一篇介绍了grpc+ protobuf的一个helloworld,下面介绍一个多接口,并且具有验证功能等辅助一点的例子。这个例子主要是一个坐标使用获取的例子,先看看proto文件定义:

yntax = "proto3";

option java_multiple_files = true;
option java_package = "io.grpc.examples.routeguide";
option java_outer_classname = "RouteGuideProto";

package routeguide;

// Interface exported by the server.
service RouteGuide 
  //获取单个Feature方法定义
  rpc GetFeature(Point) returns (Feature) 

  // 获取Feature列表方法定义
  rpc ListFeatures(Rectangle) returns (stream Feature) 

  // RecordRoute记录point,客户端通过stream发送到服务端
  rpc RecordRoute(stream Point) returns (RouteSummary) 

  // 双向stream的例子
  rpc RouteChat(stream RouteNote) returns (stream RouteNote) 


// Points定义包括经度纬度
message Point 
  int32 latitude = 1;
  int32 longitude = 2;


// 长方形定义
message Rectangle 
  // One corner of the rectangle.
  Point lo = 1;

  // The other corner of the rectangle.
  Point hi = 2;


// 位置定义,包括坐标和名称
message Feature 
  // The name of the feature.
  string name = 1;

  // The point where the feature is detected.
  Point location = 2;


//RouteNote包含位置和信息
message RouteNote 
  // The location from which the message is sent.
  Point location = 1;

  // The message to be sent.
  string message = 2;


// RouteSummary 路程总结定义
message RouteSummary 
  // The number of points received.
  int32 point_count = 1;

  // The number of known features passed while traversing the route.
  int32 feature_count = 2;

  // The distance covered in metres.
  int32 distance = 3;

  // The duration of the traversal in seconds.
  int32 elapsed_time = 4;

通过命令

protoc -I routeguide/ routeguide/route_guide.proto --go_out=plugins=grpc:routeguide

生成客户端和服务端调用文件route_guide.pb.go
下面看服务端实现这些接口代码如下:

package main

import (
    "encoding/json"
    "flag"
    "fmt"
    "io"
    "io/ioutil"
    "math"
    "net"
    "time"
    "strings"
    "golang.org/x/net/context"
    "google.golang.org/grpc"

    "google.golang.org/grpc/credentials"
    "google.golang.org/grpc/grpclog"

    "github.com/golang/protobuf/proto"

    pb "google.golang.org/grpc/examples/route_guide/routeguide"
    "path/filepath"
    "os"
    "log"
)
//支持ca认证服务端需要秘钥key以及私钥
var (
    tls        = flag.Bool("tls", false, "Connection uses TLS if true, else plain TCP")
    certFile   = flag.String("cert_file", "testdata/server1.pem", "The TLS cert file")
    keyFile    = flag.String("key_file", "testdata/server1.key", "The TLS key file")
    jsonDBFile = flag.String("json_db_file", "/Users/chenxy/go/src/google.golang.org/grpc/examples/route_guide/testdata/route_guide_db.json", "A json file containing a list of features")
    port       = flag.Int("port", 10000, "The server port")
)

type routeGuideServer struct 
    savedFeatures []*pb.Feature
    routeNotes    map[string][]*pb.RouteNote


// GetFeature returns the feature at the given point.
func (s *routeGuideServer) GetFeature(ctx context.Context, point *pb.Point) (*pb.Feature, error) 
    for _, feature := range s.savedFeatures 
        if proto.Equal(feature.Location, point) 
            return feature, nil
        
    
    // No feature was found, return an unnamed feature
    return &pb.FeatureLocation: point, nil


// ListFeatures lists all features contained within the given bounding Rectangle.
func (s *routeGuideServer) ListFeatures(rect *pb.Rectangle, stream pb.RouteGuide_ListFeaturesServer) error 
    for _, feature := range s.savedFeatures 
        if inRange(feature.Location, rect) 
            if err := stream.Send(feature); err != nil 
                return err
            
        
    
    return nil


// RecordRoute records a route composited of a sequence of points.
//
// It gets a stream of points, and responds with statistics about the "trip":
// number of points,  number of known features visited, total distance traveled, and
// total time spent.
func (s *routeGuideServer) RecordRoute(stream pb.RouteGuide_RecordRouteServer) error 
    var pointCount, featureCount, distance int32
    var lastPoint *pb.Point
    startTime := time.Now()
    for 
        point, err := stream.Recv()
        if err == io.EOF 
            endTime := time.Now()
            return stream.SendAndClose(&pb.RouteSummary
                PointCount:   pointCount,
                FeatureCount: featureCount,
                Distance:     distance,
                ElapsedTime:  int32(endTime.Sub(startTime).Seconds()),
            )
        
        if err != nil 
            return err
        
        pointCount++
        for _, feature := range s.savedFeatures 
            if proto.Equal(feature.Location, point) 
                featureCount++
            
        
        if lastPoint != nil 
            distance += calcDistance(lastPoint, point)
        
        lastPoint = point
    


// RouteChat receives a stream of message/location pairs, and responds with a stream of all
// previous messages at each of those locations.
func (s *routeGuideServer) RouteChat(stream pb.RouteGuide_RouteChatServer) error 
    for 
        in, err := stream.Recv()
        if err == io.EOF 
            return nil
        
        if err != nil 
            return err
        
        key := serialize(in.Location)
        if _, present := s.routeNotes[key]; !present 
            s.routeNotes[key] = []*pb.RouteNotein
         else 
            s.routeNotes[key] = append(s.routeNotes[key], in)
        
        for _, note := range s.routeNotes[key] 
            if err := stream.Send(note); err != nil 
                return err
            
        
    


// loadFeatures从JSON文件中加载features
func (s *routeGuideServer) loadFeatures(filePath string) 
    file, err := ioutil.ReadFile(filePath)
    if err != nil 
        grpclog.Fatalf("Failed to load default features: %v", err)
    
    if err := json.Unmarshal(file, &s.savedFeatures); err != nil 
        grpclog.Fatalf("Failed to load default features: %v", err)
    


func toRadians(num float64) float64 
    return num * math.Pi / float64(180)


// calcDistance calculates the distance between two points using the "haversine" formula.
// This code was taken from http://www.movable-type.co.uk/scripts/latlong.html.
func calcDistance(p1 *pb.Point, p2 *pb.Point) int32 
    const CordFactor float64 = 1e7
    const R float64 = float64(6371000) // metres
    lat1 := float64(p1.Latitude) / CordFactor
    lat2 := float64(p2.Latitude) / CordFactor
    lng1 := float64(p1.Longitude) / CordFactor
    lng2 := float64(p2.Longitude) / CordFactor
    φ1 := toRadians(lat1)
    φ2 := toRadians(lat2)
    Δφ := toRadians(lat2 - lat1)
    Δλ := toRadians(lng2 - lng1)

    a := math.Sin(Δφ/2)*math.Sin(Δφ/2) +
        math.Cos(φ1)*math.Cos(φ2)*
            math.Sin(Δλ/2)*math.Sin(Δλ/2)
    c := 2 * math.Atan2(math.Sqrt(a), math.Sqrt(1-a))

    distance := R * c
    return int32(distance)


func inRange(point *pb.Point, rect *pb.Rectangle) bool 
    left := math.Min(float64(rect.Lo.Longitude), float64(rect.Hi.Longitude))
    right := math.Max(float64(rect.Lo.Longitude), float64(rect.Hi.Longitude))
    top := math.Max(float64(rect.Lo.Latitude), float64(rect.Hi.Latitude))
    bottom := math.Min(float64(rect.Lo.Latitude), float64(rect.Hi.Latitude))

    if float64(point.Longitude) >= left &&
        float64(point.Longitude) <= right &&
        float64(point.Latitude) >= bottom &&
        float64(point.Latitude) <= top 
        return true
    
    return false


func serialize(point *pb.Point) string 
    return fmt.Sprintf("%d %d", point.Latitude, point.Longitude)


func newServer() *routeGuideServer 
    s := new(routeGuideServer)
    s.loadFeatures(*jsonDBFile)
    s.routeNotes = make(map[string][]*pb.RouteNote)
    return s


func main() 
    flag.Parse()
    lis, err := net.Listen("tcp", fmt.Sprintf(":%d", *port))
    if err != nil 
        grpclog.Fatalf("failed to listen: %v", err)
    
    var opts []grpc.ServerOption
    if *tls 
        creds, err := credentials.NewServerTLSFromFile(*certFile, *keyFile)
        if err != nil 
            grpclog.Fatalf("Failed to generate credentials %v", err)
        
        opts = []grpc.ServerOptiongrpc.Creds(creds)
    
    grpcServer := grpc.NewServer(opts...)
    pb.RegisterRouteGuideServer(grpcServer, newServer())
    grpcServer.Serve(lis)

服务端实现了ListFeatures等这些方法,为了测试服务端需要预先从JSON文件加载Features,格式如下:

[
    "location": 
        "latitude": 409146138,
        "longitude": -746188906
    ,
    "name": "Berkshire Valley Management Area Trail, Jefferson, NJ, USA"
, 
    "location": 
        "latitude": 404701380,
        "longitude": -744781745
    ,]

先启动服务端,客户端调用代码如下:

package main

import (
    "flag"
    "io"
    "math/rand"
    "time"

    "golang.org/x/net/context"
    "google.golang.org/grpc"
    "google.golang.org/grpc/credentials"
    pb "google.golang.org/grpc/examples/route_guide/routeguide"
    "google.golang.org/grpc/grpclog"
)
//指定ca证书
var (
    tls                = flag.Bool("tls", false, "Connection uses TLS if true, else plain TCP")
    caFile             = flag.String("ca_file", "testdata/ca.pem", "The file containning the CA root cert file")
    serverAddr         = flag.String("server_addr", "127.0.0.1:10000", "The server address in the format of host:port")
    serverHostOverride = flag.String("server_host_override", "x.test.youtube.com", "The server name use to verify the hostname returned by TLS handshake")
)

// printFeature gets the feature for the given point.
func printFeature(client pb.RouteGuideClient, point *pb.Point) 
    grpclog.Printf("Getting feature for point (%d, %d)", point.Latitude, point.Longitude)
    feature, err := client.GetFeature(context.Background(), point)
    if err != nil 
        grpclog.Fatalf("%v.GetFeatures(_) = _, %v: ", client, err)
    
    grpclog.Println(feature)


// printFeatures lists all the features within the given bounding Rectangle.
func printFeatures(client pb.RouteGuideClient, rect *pb.Rectangle) 
    grpclog.Printf("Looking for features within %v", rect)
    stream, err := client.ListFeatures(context.Background(), rect)
    if err != nil 
        grpclog.Fatalf("%v.ListFeatures(_) = _, %v", client, err)
    
    for 
        feature, err := stream.Recv()
        if err == io.EOF 
            break
        
        if err != nil 
            grpclog.Fatalf("%v.ListFeatures(_) = _, %v", client, err)
        
        grpclog.Println(feature)
    


// runRecordRoute sends a sequence of points to server and expects to get a RouteSummary from server.
func runRecordRoute(client pb.RouteGuideClient) 
    // Create a random number of random points
    r := rand.New(rand.NewSource(time.Now().UnixNano()))
    pointCount := int(r.Int31n(100)) + 2 // Traverse at least two points
    var points []*pb.Point
    for i := 0; i < pointCount; i++ 
        points = append(points, randomPoint(r))
    
    grpclog.Printf("Traversing %d points.", len(points))
    stream, err := client.RecordRoute(context.Background())
    if err != nil 
        grpclog.Fatalf("%v.RecordRoute(_) = _, %v", client, err)
    
    for _, point := range points 
        if err := stream.Send(point); err != nil 
            grpclog.Fatalf("%v.Send(%v) = %v", stream, point, err)
        
    
    reply, err := stream.CloseAndRecv()
    if err != nil 
        grpclog.Fatalf("%v.CloseAndRecv() got error %v, want %v", stream, err, nil)
    
    grpclog.Printf("Route summary: %v", reply)


// runRouteChat receives a sequence of route notes, while sending notes for various locations.
func runRouteChat(client pb.RouteGuideClient) 
    notes := []*pb.RouteNote
        &pb.PointLatitude: 0, Longitude: 1, "First message",
        &pb.PointLatitude: 0, Longitude: 2, "Second message",
        &pb.PointLatitude: 0, Longitude: 3, "Third message",
        &pb.PointLatitude: 0, Longitude: 1, "Fourth message",
        &pb.PointLatitude: 0, Longitude: 2, "Fifth message",
        &pb.PointLatitude: 0, Longitude: 3, "Sixth message",
    
    stream, err := client.RouteChat(context.Background())
    if err != nil 
        grpclog.Fatalf("%v.RouteChat(_) = _, %v", client, err)
    
    waitc := make(chan struct)
    go func() 
        for 
            in, err := stream.Recv()
            if err == io.EOF 
                // read done.
                close(waitc)
                return
            
            if err != nil 
                grpclog.Fatalf("Failed to receive a note : %v", err)
            
            grpclog.Printf("Got message %s at point(%d, %d)", in.Message, in.Location.Latitude, in.Location.Longitude)
        
    ()
    for _, note := range notes 
        if err := stream.Send(note); err != nil 
            grpclog.Fatalf("Failed to send a note: %v", err)
        
    
    stream.CloseSend()
    <-waitc


func randomPoint(r *rand.Rand) *pb.Point 
    lat := (r.Int31n(180) - 90) * 1e7
    long := (r.Int31n(360) - 180) * 1e7
    return &pb.PointLatitude: lat, Longitude: long


func main() 
    flag.Parse()
    var opts []grpc.DialOption
    if *tls 
        var sn string
        if *serverHostOverride != "" 
            sn = *serverHostOverride
        
        var creds credentials.TransportCredentials
        if *caFile != "" 
            var err error
            creds, err = credentials.NewClientTLSFromFile(*caFile, sn)
            if err != nil 
                grpclog.Fatalf("Failed to create TLS credentials %v", err)
            
         else 
            creds = credentials.NewClientTLSFromCert(nil, sn)
        
        opts = append(opts, grpc.WithTransportCredentials(creds))
     else 
        opts = append(opts, grpc.WithInsecure())
    
    conn, err := grpc.Dial(*serverAddr, opts...)
    if err != nil 
        grpclog.Fatalf("fail to dial: %v", err)
    
    defer conn.Close()
    client := pb.NewRouteGuideClient(conn)

    // 获取一个合法的Feature
    printFeature(client, &pb.PointLatitude: 409146138, Longitude: -746188906)

    // 这个Feature在服务端没有回返回空
    printFeature(client, &pb.PointLatitude: 0, Longitude: 0)

    // Looking for features between 40, -75 and 42, -73.
    printFeatures(client, &pb.Rectangle
        Lo: &pb.PointLatitude: 400000000, Longitude: -750000000,
        Hi: &pb.PointLatitude: 420000000, Longitude: -730000000,
    )

    // RecordRoute
    runRecordRoute(client)

    // RouteChat
    runRouteChat(client)

这个例子里面涉及到多个函数接口的调用,以及stream的使用。

以上是关于galang 学习之grpc+ protobuf的主要内容,如果未能解决你的问题,请参考以下文章

手撸golang GO与微服务 grpc

GRPC 浅析

grpc原理

如何以编程方式获取 gRPC / protobuf 版本?

protobuf及grpc的client请求

Grpc Protobuf v1.20+ 使用说明