gokit学习

Posted 爆米花9958

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了gokit学习相关的知识,希望对你有一定的参考价值。

微服务架构设计关注点

1、Circuit breaker(熔断器)

2、Rate limiter(限流器)

3、Logging(日志)

4、Metrics(Prometheus统计)

5、Request tracing(请求跟踪)

6、Service discovery and load balancing(服务发现和负载均衡)

Go-kit 当前集成的组件

功能

组件

circuit breaker断路器

hystrix-go gobreaker handy breaker

Metrics 指标

prometheus, dogstatsd, influx,graphite 等多个平台

服务发现

consul, dnssrv, etcd, eureka, lb, zookeeper

Request Tracing

Opentracing, LightStep, AppDash, Zipkin

另外日志和限流器组件, 可轻松与常见的日志库集成, 如zap, logrus

Go-kit 技术细节

go-kit 使用一个抽象Endpoint 来表示每一个服务提供的方法。Endpoint通过一个service实现具体功能。

go-kit 组件围绕Endpoint来构建, 包括断路器, 限流器,日志, Metrics, 请求追踪, 服务发现和负载均衡。比如你可以使用etcd, consul, zookeeper实现你的服务注册, 甚至如果你的团队之前使用的是用的Spring Cloud家族的Eureka, go-kit 也提供支持;

Endpoint

以下就是Endpoint的实现, 其他go-kit组件全部通过装饰者模式注入

type Endpoint func(ctx context.Context, request interface) (response interface, err error)

type Middleware func(Endpoint) Endpoint

如日志中间件, 仅需实现gokit的Logger interface即可

type Endpoint func(ctx context.Context, request interface) (response interface, err error)

func(log Logger, in endpoint.Endpoint) endpoint.Endpoint 
    return func(ctx context.Context, req interface) (interface, error) 
            logger.Log("input", toJSON(req))
            resp, err := in(ctx, req)
            logger.Log("output", toJSON(resp), "err", err)
            return resp, err
    

看了这么多,gokit是什么?怎么用还是有点晕,如果有SpringBoot的使用经验倒是可以类比一下,在SpringBoot中,我们先看下常用的层次结构

  • @Repository:标注数据访问层,可以告诉SpringMVC这是一个数据访问层,并将其申明为一个bean,例如UserDao接口的实现类UserDaoImpl,在类上加注解@Repository("userDao"),bean的名称为userDao
  • @Service:标注业务逻辑层,例如UserService接口的实现类,在类上加@Service("userService"),bean的名称为userService
  • @Controller:控制层,在控制层类上加@Controller即可,确认其是一个控制层类
  • @Entity:标准实体类,例如实体类User

而go-kit也分为三层,我们可以根据每层职责的不同进行重新组合,如下图所示,从上到下依次为:transport层,endpoint层,service层,dao层。

 

Transport就相当于Controller控制层;Endpoint就相当于Service业务逻辑层,而gokit的Service层则相当于业务实现层,例如我们经常写的CRUD操作就是业务实现层

Gokit使用http

接下来,我们写一个小栗子来说明一下

Step-1:创建Service

按照gokit的设计理念,Service将作为核心业务逻辑实现部分。所以,我们写一个Service用于实现两个整数之间的加减乘除运算。

在GOPATH下新建一个项目,然后新建go文件service.go,定义接口Service,代码如下所示:

// Service Define a service interface
type Service interface 

	// Add calculate a+b
	Add(a, b int) int

	// Subtract calculate a-b
	Subtract(a, b int) int

	// Multiply calculate a*b
	Multiply(a, b int) int

	// Divide calculate a/b
	Divide(a, b int) (int, error)

接下来创建结构ArithmeticService实现Service接口。加减乘除的实现非常简单,只有除法运算需要做下异常判断,代码如下所示:

//ArithmeticService implement Service interface
type ArithmeticService struct 


// Add implement Add method
func (s ArithmeticService) Add(a, b int) int 
	return a + b


// Subtract implement Subtract method
func (s ArithmeticService) Subtract(a, b int) int 
	return a - b


// Multiply implement Multiply method
func (s ArithmeticService) Multiply(a, b int) int 
	return a * b


// Divide implement Divide method
func (s ArithmeticService) Divide(a, b int) (int, error) 
	if b == 0 
		return 0, errors.New("the dividend can not be zero!")
	
	return a / b, nil

Step-2:创建请求、响应模型

请求模型:接收http客户端的请求后,把请求参数转为请求模型对象,用于后续业务逻辑处理。观察Service接口可以发现四个接口方法的输入参数均为两个整数,区别在于运算类型不同,所以请求模型只需包含三个字段,即:请求类型、第一个整数、第二个整数。

响应模型:用于向客户端响应结果。对于响应模型可以设置两个字段:一是结果,用于表示正常情况下的运算结果;二是错误描述,用于表示异常时的错误描述。

创建go文件endpoints.go,编写如下代码:

// ArithmeticRequest define request struct
type ArithmeticRequest struct 
	RequestType string `json:"request_type"`
	A           int    `json:"a"`
	B           int    `json:"b"`


// ArithmeticResponse define response struct
type ArithmeticResponse struct 
	Result int   `json:"result"`
	Error  error `json:"error"`

Step-3:创建Endpoint

在gokit中Endpoint是可以包装到http.Handler中的特殊方法,gokit采用装饰着模式,把Service应该执行的逻辑封装到Endpoint方法中执行。Endpoint的作用是:调用Service中相应的方法处理请求对象(ArithmeticRequest),返回响应对象(ArithmeticResponse)。接下来在endpoints.go文件中增加以下代码:

// MakeArithmeticEndpoint make endpoint
func MakeArithmeticEndpoint(svc Service) endpoint.Endpoint 
	return func(ctx context.Context, request interface) (response interface, err error) 
		req := request.(ArithmeticRequest)

		var (
			res, a, b int
			calError  error
		)

		a = req.A
		b = req.B

		if strings.EqualFold(req.RequestType, "Add") 
			res = svc.Add(a, b)
		 else if strings.EqualFold(req.RequestType, "Substract") 
			res = svc.Subtract(a, b)
		 else if strings.EqualFold(req.RequestType, "Multiply") 
			res = svc.Multiply(a, b)
		 else if strings.EqualFold(req.RequestType, "Divide") 
			res, calError = svc.Divide(a, b)
		 else 
			return nil, ErrInvalidRequestType
		

		return ArithmeticResponseResult: res, Error: calError, nil
	

Step-4:创建Transport

Transport层用于接收用户网络请求并将其转为Endpoint可以处理的对象,然后交由Endpoint执行,最后将处理结果转为响应对象向用户响应。为了完成这项工作,Transport需要具备两个工具方法:

  • 解码器:把用户的请求内容转换为请求对象(ArithmeticRequest);
  • 编码器:把处理结果转换为响应对象(ArithmeticResponse);

下面创建新的go文件,命名为transports.go,并添加如下代码:

// decodeArithmeticRequest decode request params to struct
func decodeArithmeticRequest(_ context.Context, r *http.Request) (interface, error) 
	vars := mux.Vars(r)
	requestType, ok := vars["type"]
	if !ok 
		return nil, ErrorBadRequest
	

	pa, ok := vars["a"]
	if !ok 
		return nil, ErrorBadRequest
	

	pb, ok := vars["b"]
	if !ok 
		return nil, ErrorBadRequest
	

	a, _ := strconv.Atoi(pa)
	b, _ := strconv.Atoi(pb)

	return ArithmeticRequest
		RequestType: requestType,
		A:           a,
		B:           b,
	, nil


// encodeArithmeticResponse encode response to return
func encodeArithmeticResponse(ctx context.Context, w http.ResponseWriter, response interface) error 
	w.Header().Set("Content-Type", "application/json;charset=utf-8")
	return json.NewEncoder(w).Encode(response)

decodeArithmeticRequest从用户请求中解析请求参数type、a、b,并将三个参数转换为请求对象ArithmeticRequest;encodeArithmeticResponse把响应内容转为json结构,向用户回写响应内容。完成以上工作后,就可以使用解码器、编码器创建HTTP处理方法了,代码如下所示:

var (
	ErrorBadRequest = errors.New("invalid request parameter")
)

// MakeHttpHandler make http handler use mux
func MakeHttpHandler(ctx context.Context, endpoint endpoint.Endpoint, logger log.Logger) http.Handler 
	r := mux.NewRouter()

	options := []kithttp.ServerOption
		kithttp.ServerErrorLogger(logger),
		kithttp.ServerErrorEncoder(kithttp.DefaultErrorEncoder),
	

	r.Methods("POST").Path("/calculate/type/a/b").Handler(kithttp.NewServer(
		endpoint,
		decodeArithmeticRequest,
		encodeArithmeticResponse,
		options...,
	))
	return r

Step-5:编写Main方法

到目前为止,我们已经为该服务完成了Service、Endpoint、Transport三个层次的构建工作,只需要通过main方法将它们按照gokit的要求组织起来,然后使用http库将服务发布即可。组织步骤如下:

  • 创建Service对象:声明Service接口,实例化为ArithmeticService;
  • 创建Endpoint对象;
  • 创建http处理对象handler;
  • 启动http服务;

创建go文件main.go添加以下代码:

func main() 

	ctx := context.Background()
	errChan := make(chan error)

	var svc Service
	svc = ArithmeticService
	endpoint := MakeArithmeticEndpoint(svc)

	var logger log.Logger
	
		logger = log.NewLogfmtLogger(os.Stderr)
		logger = log.With(logger, "ts", log.DefaultTimestampUTC)
		logger = log.With(logger, "caller", log.DefaultCaller)
	

	r := MakeHttpHandler(ctx, endpoint, logger)

	go func() 
		fmt.Println("Http Server start at port:9000")
		handler := r
		errChan <- http.ListenAndServe(":9000", handler)
	()

	go func() 
		c := make(chan os.Signal, 1)
		signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
		errChan <- fmt.Errorf("%s", <-c)
	()

	fmt.Println(<-errChan)

Step-6:编译&运行

接下来直接在idea中运行项目

 

通过postman调用一下

 

请求流程: 

请求->decodeArithmeticRequest-> endPoint -> encodeArithmeticResponse-> 返回输出

Gokit使用gRPC

gRPC的Transport会比http稍微麻烦一些,主要是gRPC还需要实现一个Handler接口,除此之外与http的实现几乎差不多。

我们还利用已经编写好的service,代码就不再粘贴了

Step-1:创建请求、响应模型

创建请求、响应模型的方式和使用http有一些不同,我们首先定义proto文件及生成pb文件

在实现grpcServer之前先得定义接口:

syntax = "proto3";

message GrpcResponse 
    int32 Result = 1;
    string Error = 2;


message GrpcRequest 
    string RequestType=1;
    int32 A = 2;
    int32 B = 3;


service GrpcService 
    rpc Add(GrpcRequest) returns (GrpcResponse) 

    rpc    Subtract(GrpcRequest) returns (GrpcResponse) 

    rpc    Multiply(GrpcRequest) returns (GrpcResponse) 

    rpc    Divide(GrpcRequest) returns (GrpcResponse) 

进入proto文件目录下执行

protoc service.proto --go_out==plugins=grpc:. *.proto

生成的文件如下:

// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
//     protoc-gen-go v1.25.0-devel
//     protoc        v3.14.0
// source: arithmetic.proto

package mygrpc

import (
   context "context"
   grpc "google.golang.org/grpc"
   codes "google.golang.org/grpc/codes"
   status "google.golang.org/grpc/status"
   protoreflect "google.golang.org/protobuf/reflect/protoreflect"
   protoimpl "google.golang.org/protobuf/runtime/protoimpl"
   reflect "reflect"
   sync "sync"
)

const (
   // Verify that this generated code is sufficiently up-to-date.
   _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
   // Verify that runtime/protoimpl is sufficiently up-to-date.
   _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
)

type GrpcResponse struct 
   state         protoimpl.MessageState
   sizeCache     protoimpl.SizeCache
   unknownFields protoimpl.UnknownFields

   Result int32  `protobuf:"varint,1,opt,name=Result,proto3" json:"Result,omitempty"`
   Error  string `protobuf:"bytes,2,opt,name=Error,proto3" json:"Error,omitempty"`

......
type GrpcRequest struct 
   state         protoimpl.MessageState
   sizeCache     protoimpl.SizeCache
   unknownFields protoimpl.UnknownFields

   RequestType string `protobuf:"bytes,1,opt,name=RequestType,proto3" json:"RequestType,omitempty"`
   A           int32  `protobuf:"varint,2,opt,name=A,proto3" json:"A,omitempty"`
   B           int32  `protobuf:"varint,3,opt,name=B,proto3" json:"B,omitempty"`



func (c *grpcServiceClient) Add(ctx context.Context, in *GrpcRequest, opts ...grpc.CallOption) (*GrpcResponse, error) 
   out := new(GrpcResponse)
   err := c.cc.Invoke(ctx, "/GrpcService/Add", in, out, opts...)
   if err != nil 
      return nil, err
   
   return out, nil


func (c *grpcServiceClient) Subtract(ctx context.Context, in *GrpcRequest, opts ...grpc.CallOption) (*GrpcResponse, error) 
   out := new(GrpcResponse)
   err := c.cc.Invoke(ctx, "/GrpcService/Subtract", in, out, opts...)
   if err != nil 
      return nil, err
   
   return out, nil


func (c *grpcServiceClient) Multiply(ctx context.Context, in *GrpcRequest, opts ...grpc.CallOption) (*GrpcResponse, error) 
   out := new(GrpcResponse)
   err := c.cc.Invoke(ctx, "/GrpcService/Multiply", in, out, opts...)
   if err != nil 
      return nil, err
   
   return out, nil


func (c *grpcServiceClient) Divide(ctx context.Context, in *GrpcRequest, opts ...grpc.CallOption) (*GrpcResponse, error) 
   out := new(GrpcResponse)
   err := c.cc.Invoke(ctx, "/GrpcService/Divide", in, out, opts...)
   if err != nil 
      return nil, err
   
   return out, nil


// GrpcServiceServer is the server API for GrpcService service.
type GrpcServiceServer interface 
   Add(context.Context, *GrpcRequest) (*GrpcResponse, error)
   Subtract(context.Context, *GrpcRequest) (*GrpcResponse, error)
   Multiply(context.Context, *GrpcRequest) (*GrpcResponse, error)
   Divide(context.Context, *GrpcRequest) (*GrpcResponse, error)

......

Step-2:创建Endpoint

在gokit中Endpoint是可以包装到grpc.Handler中的特殊方法,gokit采用装饰着模式,把Service应该执行的逻辑封装到Endpoint方法中执行。Endpoint的作用是:调用Service中相应的方法处理请求对象(GrpcRequest ),返回响应对象(GrpcResponse )。接下来在myendpoint.go文件中增加以下代码:

func MakeArithmeticEndpoint(svc myhttp.Service) endpoint.Endpoint 
   return func(ctx context.Context, request interface) (response interface, err error) 
      req := request.(*GrpcRequest)

      var (
         a, b, res int
         calError  error
      )

      a = int(req.A)
      b = int(req.B)

      if strings.EqualFold(req.RequestType, "Add") 
         res = svc.Add(a, b)
       else if strings.EqualFold(req.RequestType, "Substract") 
         res = svc.Subtract(a, b)
       else if strings.EqualFold(req.RequestType, "Multiply") 
         res = svc.Multiply(a, b)
       else if strings.EqualFold(req.RequestType, "Divide") 
         res, calError = svc.Divide(a, b)
       else 
         return nil, errors.New("the dividend can not be zero!")
      
      if calError != nil 
         return &GrpcResponseResult: int32(res), Error: calError.Error(), nil
      
      return &GrpcResponseResult: int32(res), Error: "", nil
   

Step-3:创建Transport

Transport层用于接收用户网络请求并将其转为Endpoint可以处理的对象,然后交由Endpoint执行,最后将处理结果转为响应对象向用户响应。

package mygrpc

import (
   "context"
   "github.com/go-kit/kit/endpoint"
   "github.com/go-kit/kit/transport/grpc"
)

func decodeRequest(_ context.Context, req interface) (interface, error) 
   return req, nil


func encodeResponse(_ context.Context, req interface) (interface, error) 
   return req, nil


type GrpcServer struct 
   grpcHandler grpc.Handler


func (g *GrpcServer) Add(ctx context.Context, in *GrpcRequest) (*GrpcResponse, error) 
   _, rsp, err := g.grpcHandler.ServeGRPC(ctx, in)
   if err != nil 
      return nil, err
   
   return rsp.(*GrpcResponse), err


func (g *GrpcServer) Subtract(ctx context.Context, in *GrpcRequest) (*GrpcResponse, error) 
   _, rsp, err := g.grpcHandler.ServeGRPC(ctx, in)
   if err != nil 
      return nil, err
   
   return rsp.(*GrpcResponse), err


func (g *GrpcServer) Multiply(ctx context.Context, in *GrpcRequest) (*GrpcResponse, error) 
   _, rsp, err := g.grpcHandler.ServeGRPC(ctx, in)
   if err != nil 
      return nil, err
   
   return rsp.(*GrpcResponse), err


func (g *GrpcServer) Divide(ctx context.Context, in *GrpcRequest) (*GrpcResponse, error) 
   _, rsp, err := g.grpcHandler.ServeGRPC(ctx, in)
   if err != nil 
      return nil, err
   
   return rsp.(*GrpcResponse), err



func MakeGRPCHandler(ctx context.Context, endpoint endpoint.Endpoint) *GrpcServer 
   return &GrpcServer
      grpcHandler: grpc.NewServer(
         endpoint,
         decodeRequest,
         encodeResponse,
      ),
   

Step-4:编写Main方法

main方法也要做一些修改,代码如下:

var svc myhttp.Service
svc = myhttp.ArithmeticService
endpoint = mygrpc.MakeArithmeticEndpoint(svc)
grpcServer := mygrpc.MakeGRPCHandler(ctx, endpoint)

go func() 
   //启动grpc服务
   ls, _ := net.Listen("tcp", ":9002")
   gs := grpc.NewServer()
   mygrpc.RegisterGrpcServiceServer(gs, grpcServer)
   gs.Serve(ls)

()

GRPC Client

写一个client来测试一下我们的代码

package main

import (
   "context"
   "fmt"
   "gokitdemo/pkg/mygrpc"
   "google.golang.org/grpc"
)

func main() 
   serviceAddress := "127.0.0.1:9002"
   conn, err := grpc.Dial(serviceAddress, grpc.WithInsecure())
   if err != nil 
      panic("connect error")
   
   defer conn.Close()

   grpcClient:=mygrpc.NewGrpcServiceClient(conn)
   response,err:=grpcClient.Add(context.Background(),&mygrpc.GrpcRequestA:5,B:4,RequestType:"Add")
   fmt.Println(response.Result)

运行结果:

 

参考:

https://www.jianshu.com/p/0c34a75569b1

https://www.jianshu.com/p/cffe039fa060

https://www.jianshu.com/p/44e48a8fa0de

https://juejin.cn/post/6844903780933173261

https://blog.csdn.net/weixin_42117918/article/details/89186723

https://lattecake.com/post/20140#

 

 

 

以上是关于gokit学习的主要内容,如果未能解决你的问题,请参考以下文章

NUCLEO-L496ZG+Gokit3S+Rtthead+AT组件组网

gokit的理解

gokit的理解

gokit的理解

机智云5.0推出IoT套件GoKit4.0 可实现物联网应用协同开发

通过Gokit玩转Echo语音控制开发技能