Tendermint P2P源码分析

Posted 小圣.

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Tendermint P2P源码分析相关的知识,希望对你有一定的参考价值。

前言

官方文档:https://docs.tendermint.com/master/spec/p2p/messages/

message

Tendermint的P2P 中的消息分为两部分:channel和message。

P2P配置

其配置在$TMHOME/config/config.toml文件中,配置说明
配置截取如下:

#######################################################
###           P2P Configuration Options             ###
#######################################################
[p2p]

# Enable the new p2p layer.
disable-legacy = false

# Select the p2p internal queue
queue-type = "priority"

# Address to listen for incoming connections
laddr = "tcp://0.0.0.0:26656"

# Address to advertise to peers for them to dial
# If empty, will use the same port as the laddr,
# and will introspect on the listener or use UPnP
# to figure out the address.
external-address = ""

# Comma separated list of seed nodes to connect to
seeds = ""

# Comma separated list of nodes to keep persistent connections to
persistent-peers = ""

# UPNP port forwarding
upnp = false

# Path to address book
addr-book-file = "config/addrbook.json"

# Set true for strict address routability rules
# Set false for private or local networks
addr-book-strict = true

# Maximum number of inbound peers
#
# TODO: Remove once p2p refactor is complete in favor of MaxConnections.
# ref: https://github.com/tendermint/tendermint/issues/5670
max-num-inbound-peers = 40

# Maximum number of outbound peers to connect to, excluding persistent peers
#
# TODO: Remove once p2p refactor is complete in favor of MaxConnections.
# ref: https://github.com/tendermint/tendermint/issues/5670
max-num-outbound-peers = 10

# Maximum number of connections (inbound and outbound).
max-connections = 64

# Rate limits the number of incoming connection attempts per IP address.
max-incoming-connection-attempts = 100

# List of node IDs, to which a connection will be (re)established ignoring any existing limits
unconditional-peer-ids = ""

# Maximum pause when redialing a persistent peer (if zero, exponential backoff is used)
persistent-peers-max-dial-period = "0s"

# Time to wait before flushing messages out on the connection
flush-throttle-timeout = "100ms"

# Maximum size of a message packet payload, in bytes
max-packet-msg-payload-size = 1400

# Rate at which packets can be sent, in bytes/second
send-rate = 5120000

# Rate at which packets can be received, in bytes/second
recv-rate = 5120000

# Set true to enable the peer-exchange reactor
pex = true

# Comma separated list of peer IDs to keep private (will not be gossiped to other peers)
private-peer-ids = ""

# Toggle to disable guard against peers connecting from the same ip.
allow-duplicate-ip = false

# Peer connection configuration.
handshake-timeout = "20s"
dial-timeout = "3s"

启用P2P

在node/node.go的NewNode里,会发现这样一段代码。

// setup Transport and Switch
	// 创建switch
	sw := createSwitch(
		config, transport, p2pMetrics, mpReactorShim, bcReactorForSwitch,
		stateSyncReactorShim, csReactorShim, evReactorShim, proxyApp, nodeInfo, nodeKey, p2pLogger,
	)

	err = sw.AddPersistentPeers(splitAndTrimEmpty(config.P2P.PersistentPeers, ",", " "))
	if err != nil {
		return nil, fmt.Errorf("could not add peers from persistent-peers field: %w", err)
	}

	err = sw.AddUnconditionalPeerIDs(splitAndTrimEmpty(config.P2P.UnconditionalPeerIDs, ",", " "))
	if err != nil {
		return nil, fmt.Errorf("could not add peer ids from unconditional_peer_ids field: %w", err)
	}
	// 设置地址簿
	addrBook, err := createAddrBookAndSetOnSwitch(config, sw, p2pLogger, nodeKey)
	if err != nil {
		return nil, fmt.Errorf("could not create addrbook: %w", err)
	}

	// Optionally, start the pex reactor
	//
	// TODO:
	//
	// We need to set Seeds and PersistentPeers on the switch,
	// since it needs to be able to use these (and their DNS names)
	// even if the PEX is off. We can include the DNS name in the NetAddress,
	// but it would still be nice to have a clear list of the current "PersistentPeers"
	// somewhere that we can return with net_info.
	//
	// If PEX is on, it should handle dialing the seeds. Otherwise the switch does it.
	// Note we currently use the addrBook regardless at least for AddOurAddress
	var (
		pexReactor   *pex.Reactor
		pexReactorV2 *pex.ReactorV2
	)
	// 创建PEX
	if config.P2P.PexReactor {
		pexReactor = createPEXReactorAndAddToSwitch(addrBook, config, sw, logger)
		pexReactorV2, err = createPEXReactorV2(config, logger, peerManager, router)
		if err != nil {
			return nil, err
		}

		router.AddChannelDescriptors(pexReactor.GetChannels())
	}

所以说,P2P的启动入口应该是先启动Switch实例

switch

Switch处理对等连接并公开一个API来接收传入的消息。每个“Reactor”负责处理一个/或多个“Channels”的传入消息。因此,当发送传出消息通常在对等机上执行时,传入消息在 reactor 上接收。其实大概意思就是连接各个reactor,进行信息交换。

至于reactor,在p2p/base_reactor.go里对Reactor的含义及作用进行了解释。简单来说,它就是和整个P2P网络进行交互的组件。在Tendermint中,一共有六个Reactor:mempool、blockchain、consensus、evidence、pex。

Switch结构

type Switch struct {
	// 继承BaseService,方便统一启动和停止
	service.BaseService

	// 找到P2P配置文件,所以说P2P的启动入口应该是先启动Switch实例
	config       *config.P2PConfig
	// 所有的创建的reactor集合
	reactors     map[string]Reactor
	// reactor和 channel 之间的对应关系,也是通过这个传递给peer再往下传递到MConnection
	chDescs      []*conn.ChannelDescriptor
	reactorsByCh map[byte]Reactor
	// peer 集合
	peers        *PeerSet
	dialing      *cmap.CMap
	reconnecting *cmap.CMap
	nodeInfo     NodeInfo // our node info
	nodeKey      NodeKey  // our node privkey
	addrBook     AddrBook
	// peers addresses with whom we'll maintain constant connection
	persistentPeersAddrs []*NetAddress
	unconditionalPeerIDs map[NodeID]struct{}

	transport Transport

	filterTimeout time.Duration
	peerFilters   []PeerFilterFunc
	connFilters   []ConnFilterFunc
	conns         ConnSet

	rng *rand.Rand // seed for randomizing dial times and orders

	metrics *Metrics
}

创建Switch

func NewSwitch(
	cfg *config.P2PConfig,
	transport Transport,
	options ...SwitchOption,
) *Switch {
	sw := &Switch{
		config:               cfg,
		reactors:             make(map[string]Reactor),
		chDescs:              make([]*conn.ChannelDescriptor, 0),
		reactorsByCh:         make(map[byte]Reactor),
		peers:                NewPeerSet(),
		dialing:              cmap.NewCMap(),
		reconnecting:         cmap.NewCMap(),
		metrics:              NopMetrics(),
		transport:            transport,
		persistentPeersAddrs: make([]*NetAddress, 0),
		unconditionalPeerIDs: make(map[NodeID]struct{}),
		filterTimeout:        defaultFilterTimeout,
		conns:                NewConnSet(),
	}

	// Ensure we have a completely undeterministic PRNG.
	sw.rng = rand.NewRand()

	sw.BaseService = *service.NewBaseService(nil, "P2P Switch", sw)

	for _, option := range options {
		option(sw)
	}

	return sw
}

启动Switch

启动switch,它会启动所有的reactors 和peers

func (sw *Switch) OnStart() error {

	// FIXME: Temporary hack to pass channel descriptors to MConn transport,
	// since they are not available when it is constructed. This will be
	// fixed when we implement the new router abstraction.
	if t, ok := sw.transport.(*MConnTransport); ok {
		t.channelDescs = sw.chDescs
	}

	// 首先调用Reactor 启动所有的Reactor.
	for _, reactor := range sw.reactors {
		err := reactor.Start()
		if err != nil {
			return fmt.Errorf("failed to start %v: %w", reactor, err)
		}
	}

	// 开始接受 peers
	go sw.acceptRoutine()

	return nil
}

acceptRoutine

func (sw *Switch) acceptRoutine() {
	for {
		var peerNodeInfo NodeInfo
		// 接收一个新连接好的peer
		c, err := sw.transport.Accept()
		if err == nil {
			// 在使用peer之前,需要在连接上执行一次握手
			// 以前的MConn transport 使用Accept() 进行handshaing。
			// 它是这是异步的,避免了head-of-line-blocking。
			// 但是随着handshakes从transport中迁移出去。
			// 我们在这里同步进行handshakes。
			// 主要作用是获取节点的信息
			peerNodeInfo, _, err = sw.handshakePeer(c, "")
		}
		if err == nil {
			err = sw.filterConn(c.(*mConnConnection).conn)
		}
		if err != nil {
			if c != nil {
				_ = c.Close()
			}
			if err == io.EOF {
				err = ErrTransportClosed{}
			}
			switch err := err.(type) {
			case ErrRejected:
				// 避免连接自己
				if err.IsSelf() {
					// Remove the given address from the address book and add to our addresses
					// to avoid dialing in the future.
					addr := err.Addr()
					sw.addrBook.RemoveAddress(&addr)
					sw.addrBook.AddOurAddress(&addr)
				}

				sw.Logger.Info(
					"Inbound Peer rejected",
					"err", err,
					"numPeers", sw.peers.Size(),
				)

				continue
			// 过滤超时peer
			case ErrFilterTimeout:
				sw.Logger.Error(
					"Peer filter timed out",
					"err", err,
				)

				continue
			// 判断是否为已经关闭的Transport
			case ErrTransportClosed:
				sw.Logger.Error(
					"Stopped accept routine, as transport is closed",
					"numPeers", sw.peers.Size(),
				)
			default:
				sw.Logger.Error(
					"Accept on transport errored",
					"err", err,
					"numPeers", sw.peers.Size(),
				)
				// We could instead have a retry loop around the acceptRoutine,
				// but that would need to stop and let the node shutdown eventually.
				// So might as well panic and let process managers restart the node.
				// There's no point in letting the node run without the acceptRoutine,
				// since it won't be able to accept new connections.
				panic(fmt.Errorf("accept routine exited: %v", err))
			}

			break
		}

		isPersistent := false
		addr, err := peerNodeInfo.NetAddress()
		if err == nil {
			isPersistent = sw.IsPeerPersistent(addr)
		}

		// 创建新的peer实例
		p := newPeer(
			peerNodeInfo,
			newPeerConn(false, isPersistent, c),
			sw.reactorsByCh,
			sw.StopPeerForError,
			PeerMetrics(sw.metrics),
		)

		if !sw.IsPeerUnconditional(p.NodeInfo().ID()) {
			// 如果我们已经有足够的peer数量,就忽略
			_, in, _ := sw.NumPeers()
			if in >= sw.config.MaxNumInboundPeers {
				sw.Logger.Info(
					"Ignoring inbound connection: already have enough inbound peers",
					"address", p.SocketAddr(),
					"have", in,
					"max", sw.config.MaxNumInboundPeers,
				)
				_ = p.CloseConn()
				continue
			}

		}

		// 把peer添加到switch中
		if err := sw.addPeer(p); err != nil {
			_ = p.CloseConn()
			if p.IsRunning() {
				_ = p.Stop()
			}
			sw.conns.RemoveAddr(p.RemoteAddr())
			sw.Logger.Info(
				"Ignoring inbound connection: error while adding peer",
				"err", err,
				"id", p.ID(),
			)
		}
	}
}

peer

peer在p2p中表示一个对等体。 在tendermint中也是它和应用程序之间进行直接的消息交互。 peer实现了Peer这个接口的定义。

Peer的结构

type peer struct {
	service.BaseService

	// 原始的 peerConn 和 multiplex 连接
	// peerConn 的创建使用 newOutboundPeerConn 和 newInboundPeerConn。这两个函数是在switch组件中被调用的
	peerConn

	// peer's node info and the channel it knows about
	// channels = nodeInfo.Channels
	// cached to avoid copying nodeInfo in hasChannel
	nodeInfo    NodeInfo
	channels    []byte
	reactors    map[byte]Reactor
	onPeerError func(Peer, interface{})

	// 用户数据
	Data *cmap.CMap

	metrics       *Metrics
	metricsTicker *time.Ticker
}

创建Peer

创建peer实例,负责初始化一个peer实例。

func newPeer(
	nodeInfo NodeInfo,
	pc peerConn,
	reactorsByCh map[byte]Reactor,
	onPeerError func(Peer, interface{}),
	options ...PeerOption,
) *peer {
	p := &peer{
		// 初始化peerConn实例
		peerConn:      pc,
		nodeInfo:      nodeInfo,
		channels:      nodeInfo.Channels, // TODO
		reactors:      reactorsByCh,
		onPeerError:   onPeerError,
		Data:          cmap.NewCMap(),
		metricsTicker: time.NewTicker(metricsTickerDuration),
		metrics:       NopMetrics(),
	}

	p.BaseService = *service.NewBaseService(nil, "Peer", p)
	for _, option := range options {
		option(p)
	}

	return p
}

启动Peer

func (p *peer) OnStart() error {
	// 不需要调用 BaseService.OnStart(),所以直接返回了 nil
	if err := p.BaseService.OnStart(); err != nil {
		return err
	}

	// 处理从connection接收到的消息。
	go p.processMessages()
	// 心跳检测
	// 每隔10s,获取链接状态,并且将发送数据的通道大小与peerID关联起来
	go p.metricsReporter()

	return nil
}

MConnection

transport的功能实现最终会到p2p/conn/connection.go文件里的MConnection上,所以MConnection是P2P最底层的部分。消息的写入和读取都是通过此组件完成的。它维护了网络连接、进行底层的网络数据传输。

MConnection是一个多路连接,在单个TCP连接上支持多个独立的流,具有不同的服务质量保证。每个流被称为一个channel,每个channel都有一个全局的唯一字节ID,每个channel都有优先级。每个channel的id和优先级在初始连接时配置。

MConnection支持三种类型的packet:

  • Ping
  • Pong
  • Msg

当我们在pingTimeout规定的时间内没有收到任何消息时,我们就会发出一个ping消息,当一个对等端收到ping消息并且没有别的附加消息时,对等端就会回应一个pong消息。如果我们在规定的时间内没有收到pong消息,我们就会断开连接。

创建MConnection

func NewMConnectionWithConfig(
	conn net.Conn,
	chDescs []*ChannelDescriptor,
	onReceive receiveCbFunc,
	onError errorCbFunc,
	config MConnConfig,
) *MConnection {
	if config.PongTimeout >= config.PingInterval {
		panic("pongTimeout must be less than pingInterval (otherwise, next ping will reset pong timer)")
	}

	mconn := &MConnection{
		// TCP连接成功返回的对象
		conn:          conn,
		// net.Con封装成bufio的读写,可以方便用类似文件IO的形式来对TCP流进行读写操作。
		bufConnReader: bufio.NewReaderSize(conn, minReadBufferSize),
		bufConnWriter: bufio.NewWriterSize(conn, minWriteBufferSize),
		sendMonitor:   flow.New(0, 0),
		recvMonitor:   flow.New(0, 0),
		send:          make(chan struct{}, 1),
		pong:          make(chan struct{}, 1),
		onReceive:     onReceive,
		onError:       onError,
		config:        config,
		created:       time.Now(),
	}

	// 创建通道
	var channelsIdx = map[byte]*Channel{}
	var channels = []*Channel{}

	for _, desc := range chDescs {
		channel := newChannel(mconn, *desc)
		channelsIdx[channel.desc.ID] = channel
		channels = append(channels, channel)
	}
	mconn.channels = channels
	mconn.channelsIdx = channelsIdx

	mconn.BaseService = *service.NewBaseService(nil, "MConnection", mconn)

	// maxPacketMsgSize() is a bit heavy, so call just once
	mconn._maxPacketMsgSize = mconn.maxPacketMsgSize()

	return mconn
}

channel

type Channel struct {
	conn          *MConnection
	desc          ChannelDescriptor
	sendQueue     chan []byte	// 发送队列
	sendQueueSize int32 // atomic.
	recving       []byte	// 接受缓存队列
	sending       []byte	// 发送缓冲区
	recentlySent  int64 // exponential moving average

	maxPacketMsgPayloadSize int

	Logger log.Logger
}

Peer调用Send发送消息其实是调用MConnecttion的Send方法,那么MConnecttion的Send其实也只是把内容发送到Channel的sendQueue中, 然后会有专门的routine读取Channel进行实际的消息发送。

启动MConnection

func (c *MConnection) OnStart() error {
	if err := c.BaseService.OnStart(); err != nil {
		return err
	}

	// 同步周期
	c.flushTimer = timer.NewThrottleTimer("flush", c.config.FlushThrottle)
	// ping周期
	c.pingTimer = time.NewTicker(c.config.PingInterval)
	c.pongTimeoutCh = make(chan bool, 1)
	c.chStatsTimer = time.NewTicker(updateStats)
	c.quitSendRoutine = make(chan struct{})
	c.doneSendRoutine = make(chan struct{})
	c.quitRecvRoutine = make(chan struct{})
	// 发送任务循环
	go c.sendRoutine()
	// 接收任务循环
	go c.recvRoutine()
	return nil
}

sendRoutine

func (c *MConnection) sendRoutine() {
	defer c._recover()

	protoWriter := protoio.NewDelimitedWriter(c.bufConnWriter)

FOR_LOOP:
	for {
		var _n int
		var err error
	SELECTION:
		select {
		// 进行周期性的flush
		case <-c.flushTimer.C

以上是关于Tendermint P2P源码分析的主要内容,如果未能解决你的问题,请参考以下文章

[Ethereum] 以太坊源码分析p2p+eth

[Ethereum] 以太坊源码分析p2p+eth

[Ethereum] 以太坊源码分析p2p+eth

Tendermint 共识分析

Tendermint 共识分析

Tendermint 共识分析