手撸golang GO与微服务 grpc

Posted ioly

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了手撸golang GO与微服务 grpc相关的知识,希望对你有一定的参考价值。

手撸golang GO与微服务 grpc

缘起

最近阅读 [Go微服务实战] (刘金亮, 2021.1)
本系列笔记拟采用golang练习之
gitee: https://gitee.com/ioly/learning.gooop

GRPC

gRPC是跨平台、跨语言并且效率非常高的RPC方式。

gRPC默认使用protobuf。
可以用proto files创建gRPC服务,
用protobuf消息类型来定义方法参数和返回类型。

建议在gRPC里使用proto3,
因为这样可以使用gRPC支持的全部语言,
并且能避免proto2客户端与proto3服务端交互时出现兼容性问题。

在实战项目中使用gRPC时,
一定要注意服务器的防火墙必须支持HTTP2.0,
因为gRPC是基于HTTP2.0设计的。

目标

  • 测试验证gRPC的四种通讯模式:

    • 请求-应答模式
    • 客户端推流模式
    • 客户端拉流模式
    • 双向流模式

设计

  • hello.proto: 定义gRPC通讯协议
  • HelloServer.go: gRPC服务端的实现
  • pb/hello.pb.go: protoc生成的代码,源码略
  • logger/logger.go: 搜集运行日志以便诊断, 源码略

单元测试

grpc_test.go,依次在四种gRPC通讯模式下发送/接收1000条消息,并对所有消息日志进行校验

package grpc

import (
    "context"
    "fmt"
    "google.golang.org/grpc"
    "io"
    g "learning/gooop/grpc"
    "learning/gooop/grpc/logger"
    "learning/gooop/grpc/pb"
    "strconv"
    "sync"
    "testing"
    "time"
)

func Test_HelloServer(t *testing.T) {
    fnAssertTrue := func(b bool, msg string) {
        if !b {
            t.Fatal(msg)
        }
    }
    logger.Verbose(false)

    serverPort := 3333
    serverAddress := fmt.Sprintf("127.0.0.1:%d", serverPort)
    iTotalMsgCount := 1000

    // start server
    srv := new(g.HelloServer)
    err := srv.BeginServeTCP(serverPort)
    if err != nil {
        t.Fatal(err)
    }
    time.Sleep(100 * time.Millisecond)

    // connect to grpc server
    conn, err := grpc.Dial(serverAddress, grpc.WithInsecure())
    if err != nil {
        t.Fatal(err)
    }
    defer conn.Close()

    // create grpc client
    client := pb.NewHelloServerClient(conn)

    // test SimpleRequest
    ctx := context.Background()

    for i := 0; i < iTotalMsgCount; i++ {
        msg := &pb.HelloMessage{
            Msg: fmt.Sprintf("SimpleRequest %d", i),
        }
        reply, err := client.SimpleRequest(ctx, msg)
        if err != nil {
            t.Fatal(err)
        }
        fnAssertTrue(reply.Msg == "reply "+msg.Msg, "invalid SimpleRequest response")
    }
    t.Log("passed SimpleRequest")

    // test ClientStream
    clientStream, err := client.ClientStream(ctx)
    if err != nil {
        t.Fatal(err)
    }
    for i := 0; i < iTotalMsgCount; i++ {
        msg := &pb.HelloMessage{
            Msg: fmt.Sprintf("ClientStream %08d", i),
        }
        err = clientStream.Send(msg)
        if err != nil {
            t.Fatal(err)
        }
    }
    reply, err := clientStream.CloseAndRecv()
    if err != nil {
        t.Fatal(err)
    }
    fnAssertTrue(reply.Msg == "reply ClientStream", "invalid ClientStream response")

    // logger.Logf("HelloServer.ClientStream, recv %s", msg.String())
    for i := 0; i < iTotalMsgCount; i++ {
        log := fmt.Sprintf("HelloServer.ClientStream, recv ClientStream %08d", i)
        fnAssertTrue(logger.Count(log) == 1, "expecting log "+log)
    }
    t.Log("passed ClientStream")

    // test ServerStream
    serverStream, err := client.ServerStream(ctx, &pb.HelloMessage{Msg: strconv.Itoa(iTotalMsgCount)})
    if err != nil {
        t.Fatal(err)
    }

    for {
        msg, err := serverStream.Recv()
        if err == io.EOF {
            break
        }
        if err != nil {
            t.Fatal(err)
        }
        logger.Logf("ServerStream.Recv %s", msg.Msg)
    }

    for i := 0; i < iTotalMsgCount; i++ {
        log := fmt.Sprintf("ServerStream.Recv ServerStream-%08d", i)
        fnAssertTrue(logger.Count(log) == 1, "expecting log "+log)
    }
    t.Log("passed ServerStream")

    // test DualStream
    dualStream, err := client.DualStream(ctx)
    var wg sync.WaitGroup

    wg.Add(1)
    go func() {
        defer wg.Done()
        for i := 0; i < iTotalMsgCount; i++ {
            msg := &pb.HelloMessage{
                Msg: fmt.Sprintf("DualStream.Send %08d", i),
            }
            err := dualStream.Send(msg)
            if err != nil {
                t.Fatal(err)
            }
        }

        err = dualStream.CloseSend()
        if err != nil {
            t.Fatal(err)
        }
    }()

    wg.Add(1)
    go func() {
        defer wg.Done()
        for {
            msg, err := dualStream.Recv()
            if err == io.EOF {
                break
            }

            if err != nil {
                t.Fatal(err)
            }

            logger.Logf("DualStream.Recv %s", msg.Msg)
        }
    }()

    wg.Wait()
    for i := 0; i < iTotalMsgCount; i++ {
        // Msg: "reply " + msg.Msg,
        // logger.Logf("DualStream.Recv %s", msg.Msg)
        log := fmt.Sprintf("DualStream.Recv reply DualStream.Send %08d", i)
        fnAssertTrue(logger.Count(log) == 1, "expecting log "+log)
    }
    t.Log("passed DualStream")
}

测试输出

$ go test -v grpc_test.go 
=== RUN   Test_HelloServer
    grpc_test.go:60: passed SimpleRequest
    grpc_test.go:87: passed ClientStream
    grpc_test.go:110: passed ServerStream
    grpc_test.go:159: passed DualStream
--- PASS: Test_HelloServer (0.79s)
PASS
ok      command-line-arguments  0.791s

hello.proto

定义四种通讯模式的rpc接口

syntax = "proto3";

package pb;

option go_package="./pb";

service HelloServer {
  rpc SimpleRequest(HelloMessage) returns (HelloMessage);
  rpc ClientStream(stream HelloMessage) returns (HelloMessage);
  rpc ServerStream(HelloMessage) returns (stream HelloMessage);
  rpc DualStream(stream HelloMessage) returns (stream HelloMessage);
}

message HelloMessage {
  string msg = 1;
}

HelloServer.go

gRPC服务端的实现

package grpc

import (
    "context"
    "fmt"
    "google.golang.org/grpc"
    "io"
    "learning/gooop/grpc/logger"
    "learning/gooop/grpc/pb"
    "net"
    "strconv"
)

type HelloServer int

func (me *HelloServer) SimpleRequest(ctx context.Context, msg *pb.HelloMessage) (*pb.HelloMessage, error) {
    //logger.Logf("HelloServer.SimpleRequest, %s", msg.Msg)
    msg.Msg = "reply " + msg.Msg
    return msg, nil
}

func (me *HelloServer) ClientStream(stream pb.HelloServer_ClientStreamServer) error {
    for {
        msg, err := stream.Recv()
        if err == io.EOF {
            logger.Logf("HelloServer.ClientStream, EOF")
            break
        }

        if err != nil {
            logger.Logf("HelloServer.ClientStream, err=%v", err)
            return err
        }

        logger.Logf("HelloServer.ClientStream, recv %s", msg.Msg)
    }

    err := stream.SendAndClose(&pb.HelloMessage{
        Msg: "reply ClientStream",
    })
    if err != nil {
        logger.Logf("HelloServer.ClientStream, SendAndClose err=%v", err)
    }
    return nil
}

func (me *HelloServer) ServerStream(msg *pb.HelloMessage, stream pb.HelloServer_ServerStreamServer) error {
    iMsgCount, err := strconv.Atoi(msg.Msg)
    if err != nil {
        return err
    }

    for i := 0; i < iMsgCount; i++ {
        msg := &pb.HelloMessage{
            Msg: fmt.Sprintf("ServerStream-%08d", i),
        }
        err := stream.Send(msg)
        if err != nil {
            return err
        }
    }

    return nil
}

func (me *HelloServer) DualStream(stream pb.HelloServer_DualStreamServer) error {
    for {
        msg, err := stream.Recv()
        if err == io.EOF {
            return nil
        }

        if err != nil {
            logger.Logf("HelloServer.DualStream, recv err=%v", err)
            return err
        }
        logger.Logf("HelloServer.DualStream, recv msg=%v", msg.Msg)

        ret := &pb.HelloMessage{
            Msg: "reply " + msg.Msg,
        }
        err = stream.Send(ret)
        if err != nil {
            logger.Logf("HelloServer.DualStream, send err=%v", err)
            return err
        }
    }
}

func (me *HelloServer) BeginServeTCP(port int) error {
    listener, err := net.Listen("tcp", fmt.Sprintf("0.0.0.0:%d", port))
    if err != nil {
        return err
    }

    server := grpc.NewServer()
    pb.RegisterHelloServerServer(server, me)
    go func() {
        panic(server.Serve(listener))
    }()

    return nil
}

(end)

以上是关于手撸golang GO与微服务 grpc的主要内容,如果未能解决你的问题,请参考以下文章

手撸golang GO与微服务 ChatServer之3 压测与诊断

手撸golang GO与微服务 聚合模式之1

手撸golang GO与微服务 聚合模式之2

手撸golang GO与微服务 ChatServer之1

手撸golang GO与微服务 ChatServer之2

手撸golang GO与微服务 ES-CQRS模式之2