consul-agent

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了consul-agent相关的知识,希望对你有一定的参考价值。

参考技术A consul agent 分client server

client 一般部署在靠近应用的地方,甚至本机用于对应用的读写等(本主题只讨论读写)做出转发

server 参加raft选举,只有leader server负责写,一般也负责读(除非开启一致性模式的stale)

入口 github.com/hashicorp/consul/main.go

入口

func main()

os.exit(realMain())



真实入口

func realMain() int

...

cmds := command.Map(ui)

cli := &cli.CLI

Args:        args, 

Commands:    cmds,

Autocomplete: true,

Name:        "consul",

HelpFunc:    cli.FilteredHelpFunc(names, cli.BasicHelpFunc("consul")),



exitCode, err := cli.Run()

...



命令加载

func Map(ui cli.Ui) map[string]cli.CommandFactory

m := make(map[string]cli.CommandFactory)

for name, fn := range registry

thisFn := fn

m[name] = func() (cli.Command, error)

  return thisFn(ui)





return m



初始化时候加载命令

func init()

...

Register("agent", func(ui cli.Ui) (cli.Command, error) return agent.New(ui, rev, ver, verPre, verHuman, make(chan struct)), nil...    )

agent 命令运行

func (c *cmd) Run(args []string) int

...

code := c.run(args)

... 



agent 运行

func (c *cmd) run(args []string) int

...

agent, err := agent.New(config)

...

if err := agent.Start(); err != nil

...



agent 启动

func (a *Agent) Start() error

...

if c.ServerMode

...

server, err := consul.NewServerLogger(consulCfg,a.logger,a.tokens)

...

a.delegate = server

...

else

...

client, err := consul.NewClientLogger(consulCfg,a.logger)

...

a.delegate = client

...

servers, err := a.listenHTTP()

...

for _, srv := range servers

if err := a.serveHTTP(srv); err != nil

return err





...



agent http批量启动

func (a *Agent) serveHTTP(srv *HTTPServer) error

...

err := srv.Serve(srv.ln)

...



agent 初始化http服务

func (a *Agent) listenHTTP() ([]*HTTPServer, error)

...

srv.Server.Handler = srv.handler(a.config.EnableDebug)

...



注册agent 路由

func (s *HTTPServer) handler(enableDebug bool) http.Handler

...

for pattern, fn := range endpoints

...

handleFuncMetrics(pattern, s.wrap(bound, methods))

...

...



聚合agent所需路由

func init()

...

[registerEndpoint("/v1/kv/", []string"GET", "PUT","DELETE", (*HTTPServer).KVSEndpoint)]

...   



agent 注册endpoints方法

func registerEndpoint(pattern string, methods []string, fn unboundEndpoint)

...

endpoints[pattern] = fn

...



agent kv 入口

func (s *HTTPServer) KVSEndpoint(resp http.ResponseWriter, req *http.Request) (interface, error)

...

switch req.Method

case "GET":

if keyList

return s.KVSGetKeys(resp, req, &args)



return s.KVSGet(resp, req, &args)

case "PUT":

return s.KVSPut(resp, req, &args)

case "DELETE":

return s.KVSDelete(resp, req, &args)

default:

return nil, MethodNotAllowedErrorreq.Method,[]string"GET", "PUT", "DELETE"



...



agent 真实 kv get

func (s *HTTPServer) KVSGet(resp http.ResponseWriter, req *http.Request, args *structs.KeyRequest) (interface, error)

...

if err := s.agent.RPC(method, &args, &out); err != nil

return nil, err



...



agent发起rpc

func (a *Agent) RPC(method string, args interface, reply interface) error

    ...

    return a.delegate.RPC(method, args, reply)

    ...



client初始化

func NewClientLogger(config *Config, logger *log.Logger) (*Client, error)

...

...



agent代理(client)发起rpc

func (c *Client) RPC(method string, args interface, reply interface) error

...

rpcErr := c.connPool.RPC(c.config.Datacenter,server.Addr,server.Version,method,server.UseTLS,args,reply)

...



client 发起rpc

func (p *ConnPool) RPC(dc string, addr net.Addr, version int, method string, useTLS bool, args interface, reply interface) error   

...

err = msgpackrpc.CallWithCodec(sc.codec,method,args,reply)

...



server 初始化

func NewServerLogger(config *Config, logger *log.Logger, tokens *token.Store) (*Server, error)

...

if err := s.setupRPC(tlsWrap); err != nil

...

go s.listen(s.Listener)

...



...

if err := s.setupRaft(); err != nil

s.Shutdown()

return nil, fmt.Errorf("Failed to start Raft: %v", err)



...



初始化rpc服务对应的endpoints

func init()

...

registerEndpoint(func(s *Server) interface return &KVSs )

...



server 初始化rpc

func (s *Server) setupRPC(tlsWrap tlsutil.DCWrapper) error

...

ln, err := net.ListenTCP("tcp", s.config.RPCAddr)

s.Listener = ln

...



server 启动rpc服务

func (s *Server) listen(listener net.Listener)

...

  go s.handleConn(conn, false)

...



server  处理 rpc

func (s *Server) handleConn(conn net.Conn, isTLS bool)

...

typ := pool.RPCType(buf[0])

...

switch typ

case pool.RPCConsul:

s.handleConsulConn(conn)

...



server 处理 consulconn

func (s *Server) handleConsulConn(conn net.Conn)

...

if err := s.rpcServer.ServeRequest(rpcCodec); err != nil

...



...



server  kv get

func (k *KVS) Get(args *structs.KeyRequest, reply *structs.IndexedDirEntries) error

...

如果是leader则返回false,不是则rpc获取

if done, err := k.srv.forward("KVS.Get", args,args,reply); done

...



...

return k.srv.blockingQuery(

&args.QueryOptions,

&reply.QueryMeta,

func(ws memdb.WatchSet, state *state.Store) error

index, ent, err := state.KVSGet(ws, args.Key)

...

if ent == nil

if index == 0

reply.Index = 1

else

reply.Index = index



reply.Entries = nil

else

reply.Index = ent.ModifyIndex

reply.Entries = structs.DirEntriesent



return nil

...



client kv get 接口

func (s *Server) blockingQuery(queryOpts *structs.QueryOptions, queryMeta *structs.QueryMeta,fn queryFn) error

...

开启一致性读

if queryOpts.RequireConsistent

if err := s.consistentRead(); err != nil

return err



...

err := fn(ws, state)

...



client kv put 接口

func (s *HTTPServer) KVSPut(resp http.ResponseWriter, req *http.Request, args *structs.KeyRequest) (interface, error)

...

if err := s.agent.RPC("KVS.Apply", &applyReq, &out); err != nil

    ...



...



rpc kv put 接口

func (k *KVS) Apply(args *structs.KVSRequest, reply *bool) error

...

非leader则转发

if done, err := k.srv.forward("KVS.Apply", args, args, reply); done

return err



...

resp, err := k.srv.raftApply(structs.KVSRequestType,args)

...



raft apply

func (s *Server) raftApply(t structs.MessageType, msg interface) (interface, error)

...

future := s.raft.Apply(buf, enqueueLimit)

if err := future.Error(); err != nil

return nil, err



return future.Response(), nil

...



raft apply真实入口

func (r *Raft) Apply(cmd []byte, timeout time.Duration) ApplyFuture

...

logFuture := &logFuture

log: Log

Type: LogCommand,

Data: cmd,

,



logFuture.init()

select

case <-timer:

return errorFutureErrEnqueueTimeout

case <-r.shutdownCh:

return errorFutureErrRaftShutdown

case r.applyCh <- logFuture:

return logFuture



...



leaderloop中apply此log

func (r *Raft) leaderLoop()

...

case <-r.leaderState.commitCh:

...

r.processLogs(idx, commitLog)

...

case newLog := <-r.applyCh:

ready := []*logFuturenewLog

批量

for i := 0; i < r.conf.MaxAppendEntries; i++

select

case <-r.leaderState.commitCh:

case newLog := <-r.applyCh:

ready = append(ready, newLog)

default:break

...

r.dispatchLogs(ready)

...



存储日志,通知日志复制

func (r *Raft) dispatchLogs(applyLogs []*logFuture)

...

r.leaderState.inflight.PushBack(applyLog)

...

if err := r.logs.StoreLogs(logs); err != nil

...



...

for _, f := range r.leaderState.replState

asyncNotifyCh(f.triggerCh)



...

...



复制循环

func (r *Raft) replicate(s *followerReplication)

...

for !shouldStop

select

case <-s.triggerCh:

lastLogIdx, _ := r.getLastLog()

shouldStop = r.replicateTo(s, lastLogIdx)





... 



启动复制流程

func (r *Raft) replicateTo(s *followerReplication, lastIndex uint64) (shouldStop bool)

...

if err := r.trans.AppendEntries(s.peer.ID, s.peer.Address, &req, &resp); err != nil

...



if resp.Success

updateLastAppended(s,&req)

...



...

刚添加日志更新状态

func updateLastAppended(s *followerReplication, req *AppendEntriesRequest)

...

if logs := req.Entries; len(logs) > 0

last := logs[len(logs)-1]

s.nextIndex = last.Index + 1

s.commitment.match(s.peer.ID, last.Index)



...



复制检查匹配日志索引

func (c *commitment) match(server ServerID, matchIndex uint64)

c.Lock()

defer c.Unlock()

if prev, hasVote := c.matchIndexes[server]; hasVote && matchIndex > prev

c.matchIndexes[server] = matchIndex

c.recalculate()





检查是否已经达到多数派

func (c *commitment) recalculate()

  ...

  quorumMatchIndex := matched[(len(matched)-1)/2]

if quorumMatchIndex > c.commitIndex && quorumMatchIndex >= c.startIndex

c.commitIndex = quorumMatchIndex

asyncNotifyCh(c.commitCh)





批量响应的入口

func (r *Raft) processLogs(index uint64, future *logFuture)

...

r.processLog(&future.log, future)

...



响应客户端的key put

...

if future != nil

future.respond(nil)



...

以上是关于consul-agent的主要内容,如果未能解决你的问题,请参考以下文章