Istio源码解析系列part2—服务治理配置生效流程解析

Posted ServiceMesher

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Istio源码解析系列part2—服务治理配置生效流程解析相关的知识,希望对你有一定的参考价值。

本文为系列文章:


本系列文章主要从源码(35e2b904)出发,对istio做深入剖析,让大家对istio有更深的认知,从而方便平时排查问题。不了解Service Mesh和Istio的同学请先阅读敖小剑老师如下文章进行概念上的理解:

  • Service Mesh:下一代微服务

  • 服务网格新生代-Istio

服务治理配置生效流程解析

如果大家安装bookinfo并执行过文档中的task,可以了解到,所有服务治理流程都是通过istioctl工具,执行指定yaml配置文件来实现。那么从执行istioctl指令到配置文件生效,整个流程到底是什么样的呢?下面给大家做一个简单的介绍。

整个配置生效的流程图如下所示:

Istio源码解析系列part2—服务治理配置生效流程解析

配置文件解析

以task request-routing为例,我们的需求是把名为jason的用户访问reviews服务的版本切换为v2。 route-rule-reviews-test-v2.yaml内容如下所示:

 
   
   
 
  1. apiVersion: config.istio.io/v1alpha2

  2. kind: RouteRule

  3. metadata:

  4.  name: reviews-test-v2

  5. spec:

  6.  destination:

  7.    name: reviews

  8.  precedence: 2

  9.  match:

  10.    request:

  11.      headers:

  12.        cookie:

  13.          regex: "^(.*?;)?(user=jason)(;.*)?$"

  14.  route:

  15.  - labels:

  16.      version: v2

解析并执行istioctl create指令

通过 istioctl create-f samples/bookinfo/kube/route-rule-reviews-test-v2.yaml指令来使规则生效,执行 istioctl create指令运行的相关代码入口如下:

istio/cmd/istioctl/main.go#postCmd#113行。

 
   
   
 
  1. postCmd = &cobra.Command{

  2.        Use:     "create",

  3.        Short:   "Create policies and rules",

  4.        Example: "istioctl create -f example-routing.yaml",

  5.        RunE: func(c *cobra.Command, args []string) error {

  6.                    if len(args) != 0 {

  7.                      c.Println(c.UsageString())

  8.                      return fmt.Errorf("create takes no arguments")

  9.                    }

  10.                    // varr为转换成功的istio内部model.Config切片,包括routeRule、gateway、ingressRule、egressRule、policy等

  11.                    // others是不能转换成model.Config的k8s object wrapper切片,后面会当成mixer配置来处理

  12.                    varr, others, err := readInputs()

  13.                    if err != nil {

  14.                        return err

  15.                    }

  16.                    if len(varr) == 0 && len(others) == 0 {

  17.                        return errors.New("nothing to create")

  18.                    }

  19.            ...

  20.        }

  21. }

解析出model.Config切片、crd.istioKind切片流程

  • model.Config 为istio配置单元

  • crd.IstioKind 对k8s API对象做了一层封装

readInput函数解析 create命令的相关参数(比如 -f),如果是-f指定的文件是有效文件,则会调用 pilot/pkg/config/kube/crd包的ParseInputs函数解析该文件。

 
   
   
 
  1. func readInputs() ([]model.Config, []crd.IstioKind, error) {

  2.    var reader io.Reader

  3.        ...

  4.            // 读取指定yaml文件

  5.        if in, err = os.Open(file); err != nil {

  6.            return nil, nil, err

  7.        }

  8.        defer func() {

  9.            if err = in.Close(); err != nil {

  10.                log.Errorf("Error: close file from %s, %s", file, err)

  11.            }

  12.        }()

  13.        reader = in

  14.    ...

  15.    input, err := ioutil.ReadAll(reader)

  16.    ...

  17.    return crd.ParseInputs(string(input))

  18. }

ParseInputs函数内部逻辑:

 
   
   
 
  1. func ParseInputs(inputs string) ([]model.Config, []IstioKind, error) {

  2.    var varr []model.Config

  3.    var others []IstioKind

  4.    reader := bytes.NewReader([]byte(inputs))

  5.    var empty = IstioKind{}

  6.    // We store configs as a YaML stream; there may be more than one decoder.

  7.    yamlDecoder := kubeyaml.NewYAMLOrJSONDecoder(reader, 512*1024)

  8.    for {

  9.        obj := IstioKind{}

  10.        // 从reader中反序列化出IstioKind实例obj

  11.        err := yamlDecoder.Decode(&obj)

  12.        ...

  13.        schema, exists := model.IstioConfigTypes.GetByType(CamelCaseToKabobCase(obj.Kind))

  14.        ...

  15.        config, err := ConvertObject(schema, &obj, "")

  16.        ...

  17.        if err := schema.Validate(config.Spec); err != nil {

  18.            return nil, nil, fmt.Errorf("configuration is invalid: %v", err)

  19.        }

  20.        varr = append(varr, *config)

  21.    }

  22.    return varr, others, nil

  23. }

ParseInputs返回三种类型的值[]Config、[]IstioKind、error。

istio/pilot/pkg/model#[]Config

其中Config为Istio内部的配置单元,包含匿名ConfigMeta以及ConfigMeta序列化的protobuf message;用户指定的yaml配置会被解析成相应的实例。

pilot/pkg/config/kube/crd#[]IstioKind

IstioKind为k8s API object的一层封装,内部包含两个匿名结构体和一个map:

 
   
   
 
  1.  type IstioKind struct {

  2.      meta_v1.TypeMeta   `json:",inline"`

  3.      meta_v1.ObjectMeta `json:"metadata"`

  4.      Spec               map[string]interface{} `json:"spec"`

  5.  }

  • IstioKindk8s.io/apimachinery/pkg/apis/meta/v1#TypeMetaTypeMeta包含了k8s REST资源类型(如 RouteRule)、k8s API版本号(如 config.istio.io/v1alpha2)。

  • k8s.io/apimachinery/pkg/apis/meta/v1#ObjectMeta ObjectMeta包含了k8s 资源对象包含的各必要字段,包括Name、Namespace、UID等。

  • Spec 一个存储Spec数据的map。

上述代码将string类型的配置反序列化成 IstioKind实例后,通过 model.IstioConfigTypes.GetByType()方法获取istio的 []ProtoSchema实例。

 
   
   
 
  1. // ConfigDescriptor 是一个由ProtoSchema组成的切片

  2. type ConfigDescriptor []ProtoSchema

  3. // ProtoSchema结构体定义了配置类型名称和protobuf消息的双向映射

  4. type ProtoSchema struct {

  5.    Type        string // 配置的proto类型,如route-rule

  6.    Plural      string // type复数形式,如route-rules

  7.    Group       string // 配置的proto组名,如config

  8.    Version     string // 配置API的版本号,如一lpha2

  9.    MessageName string // 配置的proto message名,如istio.routing.v1alpha1.RouteRule

  10.    Gogo        bool   // 是否为gogo protobuf编码

  11.    Validate    func(config proto.Message) error // protobuf校验函数

  12. }

拿到schema后,通过 ConvertObject方法,将k8s风格的object实例转换成istio内部的Config模型实例,并根据schema类型调用相应的校验函数对protobuf message进行校验。

将配置变更提交到k8s

istio/cmd/istioctl/main.go#postCmd#140行。

 
   
   
 
  1. for _, config := range varr {

  2.    // 初始化namespace数据

  3.    if config.Namespace, err = handleNamespaces(config.Namespace); err != nil {

  4.        return err

  5.    }

  6.    // 构造k8s crd.Client实例,crd.Client包含初始化的apiVerison到restClient映射的map。

  7.    // 对每一种apiVerison(由schema.Group、"istio.io"、schema.Version组成的string,如"config.istio.io/v1alpha2"、"networking.istio.io/v1alpha3"等)

  8.    // 都对应一个crd.restClient实例。

  9.    var configClient *crd.Client

  10.    if configClient, err = newClient(); err != nil {

  11.        return err

  12.    }

  13.    var rev string

  14.    // 通过k8s REST接口执行配置

  15.    if rev, err = configClient.Create(config); err != nil {

  16.        return err

  17.    }

  18.    fmt.Printf("Created config %v at revision %v ", config.Key(), rev)

  19. }

configClient.Create方法执行流程如下:

 
   
   
 
  1. func (cl *Client) Create(config model.Config) (string, error) {

  2.    rc, ok := cl.clientset[apiVersionFromConfig(&config)]

  3.    ...

  4.    // 根据config.Type获取schema

  5.    schema, exists := rc.descriptor.GetByType(config.Type)

  6.    ...

  7.    // 调用schema指定的Validate函数,对Spec这个protobuff进行校验

  8.    if err := schema.Validate(config.Spec); err != nil {

  9.        return "", multierror.Prefix(err, "validation error:")

  10.    }

  11.    // ConvertConfig函数将model.Config实例转换成IstioObject实例。

  12.    // IstioObject是一个k8s API object的接口,crd包下有很多结构体实现了该接口,如MockConfig、RouteRule等

  13.    out, err := ConvertConfig(schema, config)

  14.    ...

  15.    // 检索clientset map,用指定的restClient实例发送POST请求,使配置生效。

  16.    obj := knownTypes[schema.Type].object.DeepCopyObject().(IstioObject)

  17.    err = rc.dynamic.Post().

  18.        Namespace(out.GetObjectMeta().Namespace).

  19.        Resource(ResourceName(schema.Plural)).

  20.        Body(out).

  21.        Do().Into(obj)

  22.    if err != nil {

  23.        return "", err

  24.    }

  25.    return obj.GetObjectMeta().ResourceVersion, nil

  26. }

pilot-discovery初始化

pilot/cmd/pilot-discovery/main.go#57行,构造discoveryServer实例。

 
   
   
 
  1. ...

  2. discoveryServer, err := bootstrap.NewServer(serverArgs)

  3. if err != nil {

  4.    return fmt.Errorf("failed to create discovery service: %v", err)

  5. }

  6. ...

监听k8s相关资源变更

NewServer函数内部流程如下:

 
   
   
 
  1. func NewServer(args PilotArgs) (*Server, error) {

  2.    ...

  3.    // 初始化pilot配置控制器,根据pilot-discovery启动指令,初始化配置控制器。

  4.    // 默认只会初始化kube配置控制器(kubeConfigController,它实现了model.ConfigStoreCache接口)。

  5.    // kubeConfigController会watch k8s pod registration 、ingress resources、traffic rules等变化。

  6.    if err := s.initConfigController(&args); err != nil {

  7.        return nil, err

  8.    }

  9.    // 初始化服务发现控制器,控制器内部会构造K8sServiceControllers。

  10.    if err := s.initServiceControllers(&args); err != nil {

  11.        return nil, err

  12.    }

  13.    // 初始化DiscoveryService实例,实例内部注册了envoy xDS路由。

  14.    // kubeConfigController中watch到变更后,envoy轮询xDS接口,获取变更。

  15.    if err := s.initDiscoveryService(&args); err != nil {

  16.        return nil, err

  17.    }

  18.    ...

  19. }

注册envoy xDS路由

initDiscoveryServic方法内部流程如下:

 
   
   
 
  1. func (s *Server) initDiscoveryService(args *PilotArgs) error {

  2.    // 构造pilot runtime environment。environment中保存了kubeConfigController、serviceController等。

  3.    environment := model.Environment{

  4.        Mesh:             s.mesh,

  5.        IstioConfigStore: model.MakeIstiostore(s.configController),

  6.        ServiceDiscovery: s.ServiceController,

  7.        ServiceAccounts:  s.ServiceController,

  8.        MixerSAN:         s.mixerSAN,

  9.    }

  10.    // 构造DiscoveryService实例。

  11.    discovery, err := envoy.NewDiscoveryService(

  12.        s.ServiceController,

  13.        s.configController,

  14.        environment,

  15.        args.DiscoveryOptions,

  16.    )

  17. }

NewDiscoveryService方法内部流程如下:

 
   
   
 
  1. func NewDiscoveryService(ctl model.Controller, configCache model.ConfigStoreCache,

  2.    environment model.Environment, o DiscoveryServiceOptions) (*DiscoveryService, error) {

  3.    out := &DiscoveryService{

  4.        Environment: environment, // 将environment赋值给Environment成员。

  5.        sdsCache:    newDiscoveryCache("sds", o.EnableCaching),

  6.        cdsCache:    newDiscoveryCache("cds", o.EnableCaching),

  7.        rdsCache:    newDiscoveryCache("rds", o.EnableCaching),

  8.        ldsCache:    newDiscoveryCache("lds", o.EnableCaching),

  9.    }

  10.    container := restful.NewContainer()

  11.    ...

  12.    // 注册web service容器。

  13.    out.Register(container)

  14. }

out.Register方法内部流程如下:

 
   
   
 
  1. func (ds *DiscoveryService) Register(container *restful.Container) {

  2.    ws := &restful.WebService{}

  3.    ws.Produces(restful.MIME_JSON)

  4.    ...

  5.    // 注册Envoy xDS(SDS、CDS、RDS、LDS)路由

  6.    // 注册 Envoy RDS(Route discovery service)路由。https://www.envoyproxy.io/docs/envoy/latest/api-v1/route_config/rds

  7.    // RDS可以与SDS、EDS协同工作,来构建用户指定的路由拓扑(如流量切换、蓝绿部署等)。

  8.    ws.Route(ws.

  9.        GET(fmt.Sprintf("/v1/routes/{%s}/{%s}/{%s}", RouteConfigName, ServiceCluster, ServiceNode)).

  10.        To(ds.ListRoutes).

  11.        Doc("RDS registration").

  12.        Param(ws.PathParameter(RouteConfigName, "route configuration name").DataType("string")).

  13.        Param(ws.PathParameter(ServiceCluster, "client proxy service cluster").DataType("string")).

  14.        Param(ws.PathParameter(ServiceNode, "client proxy service node").DataType("string")))

  15.    // 注册 Envoy LDS(Listener discovery service)路由。https://www.envoyproxy.io/docs/envoy/latest/configuration/listeners/lds

  16.    // Envoy可以从通过这个接口动态获取需要新的Listener信息,从而在运行时动态实例化Listener。

  17.    // Listener可以用来处理不同的代理任务(如速率限制、HTTP连接管理、原始TCP代理等)。

  18.    ws.Route(ws.

  19.        GET(fmt.Sprintf("/v1/listeners/{%s}/{%s}", ServiceCluster, ServiceNode)).

  20.        To(ds.ListListeners).

  21.        Doc("LDS registration").

  22.        Param(ws.PathParameter(ServiceCluster, "client proxy service cluster").DataType("string")).

  23.        Param(ws.PathParameter(ServiceNode, "client proxy service node").DataType("string")))

  24.    ...

  25. }

  • RDS路由绑定的 ds.ListRoutes方法读取environment中相关配置,返回给Envoy实例需要配置的路由信息。

  • LDS路由绑定的 ds.ListListeners方法读取environment中相关配置,返回给Envoy实例需要的Listener信息。 Envoy实例轮询xDS接口,获取变更的配置信息,最终执行具体的服务治理策略。

总结

结合上文中贴出的流程图

总结如下

Istio的pilot-discovery启动

  1. 初始化kube配置控制器,控制器中watch k8s pod、ingress以及流量管理规则等变更。

  2. 初始化envoy各发现服务,注册envoy xDS路由,绑定相应的配置变更handler。

  3. pilot-discovery等待envoy实例轮询xDS接口,将变更返给envoy实例。

用户通过istioctl应用配置

  1. istioctl解析指令(create、delete等),通过k8s REST接口,将变更推送的k8s。

  2. k8s产生变更,变更同步到 kubeConfigController中。

  3. envoy实例轮询xDS接口,应用变更。

点击【阅读原文】www.servicemesher.com中浏览效果更佳。


以上是关于Istio源码解析系列part2—服务治理配置生效流程解析的主要内容,如果未能解决你的问题,请参考以下文章

Istio技术与实践01: 源码解析之Pilot多云平台服务发现机制

Istio系列学习----Istio的路由规则配置:VirtualService

使用 Istio治理微服务入门

istio 服务治理指引

2020-05-18【Istio服务治理,K8S各个组件】

Istio微服务治理网格基本使用以及与Kubernetes集成的架构