Tendermint P2P源码分析
Posted 小圣.
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Tendermint P2P源码分析相关的知识,希望对你有一定的参考价值。
前言
官方文档:https://docs.tendermint.com/master/spec/p2p/messages/
message
Tendermint的P2P 中的消息分为两部分:channel和message。
P2P配置
其配置在$TMHOME/config/config.toml文件中,配置说明
配置截取如下:
#######################################################
### P2P Configuration Options ###
#######################################################
[p2p]
# Enable the new p2p layer.
disable-legacy = false
# Select the p2p internal queue
queue-type = "priority"
# Address to listen for incoming connections
laddr = "tcp://0.0.0.0:26656"
# Address to advertise to peers for them to dial
# If empty, will use the same port as the laddr,
# and will introspect on the listener or use UPnP
# to figure out the address.
external-address = ""
# Comma separated list of seed nodes to connect to
seeds = ""
# Comma separated list of nodes to keep persistent connections to
persistent-peers = ""
# UPNP port forwarding
upnp = false
# Path to address book
addr-book-file = "config/addrbook.json"
# Set true for strict address routability rules
# Set false for private or local networks
addr-book-strict = true
# Maximum number of inbound peers
#
# TODO: Remove once p2p refactor is complete in favor of MaxConnections.
# ref: https://github.com/tendermint/tendermint/issues/5670
max-num-inbound-peers = 40
# Maximum number of outbound peers to connect to, excluding persistent peers
#
# TODO: Remove once p2p refactor is complete in favor of MaxConnections.
# ref: https://github.com/tendermint/tendermint/issues/5670
max-num-outbound-peers = 10
# Maximum number of connections (inbound and outbound).
max-connections = 64
# Rate limits the number of incoming connection attempts per IP address.
max-incoming-connection-attempts = 100
# List of node IDs, to which a connection will be (re)established ignoring any existing limits
unconditional-peer-ids = ""
# Maximum pause when redialing a persistent peer (if zero, exponential backoff is used)
persistent-peers-max-dial-period = "0s"
# Time to wait before flushing messages out on the connection
flush-throttle-timeout = "100ms"
# Maximum size of a message packet payload, in bytes
max-packet-msg-payload-size = 1400
# Rate at which packets can be sent, in bytes/second
send-rate = 5120000
# Rate at which packets can be received, in bytes/second
recv-rate = 5120000
# Set true to enable the peer-exchange reactor
pex = true
# Comma separated list of peer IDs to keep private (will not be gossiped to other peers)
private-peer-ids = ""
# Toggle to disable guard against peers connecting from the same ip.
allow-duplicate-ip = false
# Peer connection configuration.
handshake-timeout = "20s"
dial-timeout = "3s"
启用P2P
在node/node.go的NewNode里,会发现这样一段代码。
// setup Transport and Switch
// 创建switch
sw := createSwitch(
config, transport, p2pMetrics, mpReactorShim, bcReactorForSwitch,
stateSyncReactorShim, csReactorShim, evReactorShim, proxyApp, nodeInfo, nodeKey, p2pLogger,
)
err = sw.AddPersistentPeers(splitAndTrimEmpty(config.P2P.PersistentPeers, ",", " "))
if err != nil {
return nil, fmt.Errorf("could not add peers from persistent-peers field: %w", err)
}
err = sw.AddUnconditionalPeerIDs(splitAndTrimEmpty(config.P2P.UnconditionalPeerIDs, ",", " "))
if err != nil {
return nil, fmt.Errorf("could not add peer ids from unconditional_peer_ids field: %w", err)
}
// 设置地址簿
addrBook, err := createAddrBookAndSetOnSwitch(config, sw, p2pLogger, nodeKey)
if err != nil {
return nil, fmt.Errorf("could not create addrbook: %w", err)
}
// Optionally, start the pex reactor
//
// TODO:
//
// We need to set Seeds and PersistentPeers on the switch,
// since it needs to be able to use these (and their DNS names)
// even if the PEX is off. We can include the DNS name in the NetAddress,
// but it would still be nice to have a clear list of the current "PersistentPeers"
// somewhere that we can return with net_info.
//
// If PEX is on, it should handle dialing the seeds. Otherwise the switch does it.
// Note we currently use the addrBook regardless at least for AddOurAddress
var (
pexReactor *pex.Reactor
pexReactorV2 *pex.ReactorV2
)
// 创建PEX
if config.P2P.PexReactor {
pexReactor = createPEXReactorAndAddToSwitch(addrBook, config, sw, logger)
pexReactorV2, err = createPEXReactorV2(config, logger, peerManager, router)
if err != nil {
return nil, err
}
router.AddChannelDescriptors(pexReactor.GetChannels())
}
所以说,P2P的启动入口应该是先启动Switch实例
switch
Switch处理对等连接并公开一个API来接收传入的消息。每个“Reactor”负责处理一个/或多个“Channels”的传入消息。因此,当发送传出消息通常在对等机上执行时,传入消息在 reactor 上接收。其实大概意思就是连接各个reactor,进行信息交换。
至于reactor,在p2p/base_reactor.go里对Reactor的含义及作用进行了解释。简单来说,它就是和整个P2P网络进行交互的组件。在Tendermint中,一共有六个Reactor:mempool、blockchain、consensus、evidence、pex。
Switch结构
type Switch struct {
// 继承BaseService,方便统一启动和停止
service.BaseService
// 找到P2P配置文件,所以说P2P的启动入口应该是先启动Switch实例
config *config.P2PConfig
// 所有的创建的reactor集合
reactors map[string]Reactor
// reactor和 channel 之间的对应关系,也是通过这个传递给peer再往下传递到MConnection
chDescs []*conn.ChannelDescriptor
reactorsByCh map[byte]Reactor
// peer 集合
peers *PeerSet
dialing *cmap.CMap
reconnecting *cmap.CMap
nodeInfo NodeInfo // our node info
nodeKey NodeKey // our node privkey
addrBook AddrBook
// peers addresses with whom we'll maintain constant connection
persistentPeersAddrs []*NetAddress
unconditionalPeerIDs map[NodeID]struct{}
transport Transport
filterTimeout time.Duration
peerFilters []PeerFilterFunc
connFilters []ConnFilterFunc
conns ConnSet
rng *rand.Rand // seed for randomizing dial times and orders
metrics *Metrics
}
创建Switch
func NewSwitch(
cfg *config.P2PConfig,
transport Transport,
options ...SwitchOption,
) *Switch {
sw := &Switch{
config: cfg,
reactors: make(map[string]Reactor),
chDescs: make([]*conn.ChannelDescriptor, 0),
reactorsByCh: make(map[byte]Reactor),
peers: NewPeerSet(),
dialing: cmap.NewCMap(),
reconnecting: cmap.NewCMap(),
metrics: NopMetrics(),
transport: transport,
persistentPeersAddrs: make([]*NetAddress, 0),
unconditionalPeerIDs: make(map[NodeID]struct{}),
filterTimeout: defaultFilterTimeout,
conns: NewConnSet(),
}
// Ensure we have a completely undeterministic PRNG.
sw.rng = rand.NewRand()
sw.BaseService = *service.NewBaseService(nil, "P2P Switch", sw)
for _, option := range options {
option(sw)
}
return sw
}
启动Switch
启动switch,它会启动所有的reactors 和peers
func (sw *Switch) OnStart() error {
// FIXME: Temporary hack to pass channel descriptors to MConn transport,
// since they are not available when it is constructed. This will be
// fixed when we implement the new router abstraction.
if t, ok := sw.transport.(*MConnTransport); ok {
t.channelDescs = sw.chDescs
}
// 首先调用Reactor 启动所有的Reactor.
for _, reactor := range sw.reactors {
err := reactor.Start()
if err != nil {
return fmt.Errorf("failed to start %v: %w", reactor, err)
}
}
// 开始接受 peers
go sw.acceptRoutine()
return nil
}
acceptRoutine
func (sw *Switch) acceptRoutine() {
for {
var peerNodeInfo NodeInfo
// 接收一个新连接好的peer
c, err := sw.transport.Accept()
if err == nil {
// 在使用peer之前,需要在连接上执行一次握手
// 以前的MConn transport 使用Accept() 进行handshaing。
// 它是这是异步的,避免了head-of-line-blocking。
// 但是随着handshakes从transport中迁移出去。
// 我们在这里同步进行handshakes。
// 主要作用是获取节点的信息
peerNodeInfo, _, err = sw.handshakePeer(c, "")
}
if err == nil {
err = sw.filterConn(c.(*mConnConnection).conn)
}
if err != nil {
if c != nil {
_ = c.Close()
}
if err == io.EOF {
err = ErrTransportClosed{}
}
switch err := err.(type) {
case ErrRejected:
// 避免连接自己
if err.IsSelf() {
// Remove the given address from the address book and add to our addresses
// to avoid dialing in the future.
addr := err.Addr()
sw.addrBook.RemoveAddress(&addr)
sw.addrBook.AddOurAddress(&addr)
}
sw.Logger.Info(
"Inbound Peer rejected",
"err", err,
"numPeers", sw.peers.Size(),
)
continue
// 过滤超时peer
case ErrFilterTimeout:
sw.Logger.Error(
"Peer filter timed out",
"err", err,
)
continue
// 判断是否为已经关闭的Transport
case ErrTransportClosed:
sw.Logger.Error(
"Stopped accept routine, as transport is closed",
"numPeers", sw.peers.Size(),
)
default:
sw.Logger.Error(
"Accept on transport errored",
"err", err,
"numPeers", sw.peers.Size(),
)
// We could instead have a retry loop around the acceptRoutine,
// but that would need to stop and let the node shutdown eventually.
// So might as well panic and let process managers restart the node.
// There's no point in letting the node run without the acceptRoutine,
// since it won't be able to accept new connections.
panic(fmt.Errorf("accept routine exited: %v", err))
}
break
}
isPersistent := false
addr, err := peerNodeInfo.NetAddress()
if err == nil {
isPersistent = sw.IsPeerPersistent(addr)
}
// 创建新的peer实例
p := newPeer(
peerNodeInfo,
newPeerConn(false, isPersistent, c),
sw.reactorsByCh,
sw.StopPeerForError,
PeerMetrics(sw.metrics),
)
if !sw.IsPeerUnconditional(p.NodeInfo().ID()) {
// 如果我们已经有足够的peer数量,就忽略
_, in, _ := sw.NumPeers()
if in >= sw.config.MaxNumInboundPeers {
sw.Logger.Info(
"Ignoring inbound connection: already have enough inbound peers",
"address", p.SocketAddr(),
"have", in,
"max", sw.config.MaxNumInboundPeers,
)
_ = p.CloseConn()
continue
}
}
// 把peer添加到switch中
if err := sw.addPeer(p); err != nil {
_ = p.CloseConn()
if p.IsRunning() {
_ = p.Stop()
}
sw.conns.RemoveAddr(p.RemoteAddr())
sw.Logger.Info(
"Ignoring inbound connection: error while adding peer",
"err", err,
"id", p.ID(),
)
}
}
}
peer
peer在p2p中表示一个对等体。 在tendermint中也是它和应用程序之间进行直接的消息交互。 peer实现了Peer这个接口的定义。
Peer的结构
type peer struct {
service.BaseService
// 原始的 peerConn 和 multiplex 连接
// peerConn 的创建使用 newOutboundPeerConn 和 newInboundPeerConn。这两个函数是在switch组件中被调用的
peerConn
// peer's node info and the channel it knows about
// channels = nodeInfo.Channels
// cached to avoid copying nodeInfo in hasChannel
nodeInfo NodeInfo
channels []byte
reactors map[byte]Reactor
onPeerError func(Peer, interface{})
// 用户数据
Data *cmap.CMap
metrics *Metrics
metricsTicker *time.Ticker
}
创建Peer
创建peer实例,负责初始化一个peer实例。
func newPeer(
nodeInfo NodeInfo,
pc peerConn,
reactorsByCh map[byte]Reactor,
onPeerError func(Peer, interface{}),
options ...PeerOption,
) *peer {
p := &peer{
// 初始化peerConn实例
peerConn: pc,
nodeInfo: nodeInfo,
channels: nodeInfo.Channels, // TODO
reactors: reactorsByCh,
onPeerError: onPeerError,
Data: cmap.NewCMap(),
metricsTicker: time.NewTicker(metricsTickerDuration),
metrics: NopMetrics(),
}
p.BaseService = *service.NewBaseService(nil, "Peer", p)
for _, option := range options {
option(p)
}
return p
}
启动Peer
func (p *peer) OnStart() error {
// 不需要调用 BaseService.OnStart(),所以直接返回了 nil
if err := p.BaseService.OnStart(); err != nil {
return err
}
// 处理从connection接收到的消息。
go p.processMessages()
// 心跳检测
// 每隔10s,获取链接状态,并且将发送数据的通道大小与peerID关联起来
go p.metricsReporter()
return nil
}
MConnection
transport的功能实现最终会到p2p/conn/connection.go文件里的MConnection上,所以MConnection是P2P最底层的部分。消息的写入和读取都是通过此组件完成的。它维护了网络连接、进行底层的网络数据传输。
MConnection是一个多路连接,在单个TCP连接上支持多个独立的流,具有不同的服务质量保证。每个流被称为一个channel,每个channel都有一个全局的唯一字节ID,每个channel都有优先级。每个channel的id和优先级在初始连接时配置。
MConnection支持三种类型的packet:
- Ping
- Pong
- Msg
当我们在pingTimeout规定的时间内没有收到任何消息时,我们就会发出一个ping消息,当一个对等端收到ping消息并且没有别的附加消息时,对等端就会回应一个pong消息。如果我们在规定的时间内没有收到pong消息,我们就会断开连接。
创建MConnection
func NewMConnectionWithConfig(
conn net.Conn,
chDescs []*ChannelDescriptor,
onReceive receiveCbFunc,
onError errorCbFunc,
config MConnConfig,
) *MConnection {
if config.PongTimeout >= config.PingInterval {
panic("pongTimeout must be less than pingInterval (otherwise, next ping will reset pong timer)")
}
mconn := &MConnection{
// TCP连接成功返回的对象
conn: conn,
// net.Con封装成bufio的读写,可以方便用类似文件IO的形式来对TCP流进行读写操作。
bufConnReader: bufio.NewReaderSize(conn, minReadBufferSize),
bufConnWriter: bufio.NewWriterSize(conn, minWriteBufferSize),
sendMonitor: flow.New(0, 0),
recvMonitor: flow.New(0, 0),
send: make(chan struct{}, 1),
pong: make(chan struct{}, 1),
onReceive: onReceive,
onError: onError,
config: config,
created: time.Now(),
}
// 创建通道
var channelsIdx = map[byte]*Channel{}
var channels = []*Channel{}
for _, desc := range chDescs {
channel := newChannel(mconn, *desc)
channelsIdx[channel.desc.ID] = channel
channels = append(channels, channel)
}
mconn.channels = channels
mconn.channelsIdx = channelsIdx
mconn.BaseService = *service.NewBaseService(nil, "MConnection", mconn)
// maxPacketMsgSize() is a bit heavy, so call just once
mconn._maxPacketMsgSize = mconn.maxPacketMsgSize()
return mconn
}
channel
type Channel struct {
conn *MConnection
desc ChannelDescriptor
sendQueue chan []byte // 发送队列
sendQueueSize int32 // atomic.
recving []byte // 接受缓存队列
sending []byte // 发送缓冲区
recentlySent int64 // exponential moving average
maxPacketMsgPayloadSize int
Logger log.Logger
}
Peer调用Send发送消息其实是调用MConnecttion的Send方法,那么MConnecttion的Send其实也只是把内容发送到Channel的sendQueue中, 然后会有专门的routine读取Channel进行实际的消息发送。
启动MConnection
func (c *MConnection) OnStart() error {
if err := c.BaseService.OnStart(); err != nil {
return err
}
// 同步周期
c.flushTimer = timer.NewThrottleTimer("flush", c.config.FlushThrottle)
// ping周期
c.pingTimer = time.NewTicker(c.config.PingInterval)
c.pongTimeoutCh = make(chan bool, 1)
c.chStatsTimer = time.NewTicker(updateStats)
c.quitSendRoutine = make(chan struct{})
c.doneSendRoutine = make(chan struct{})
c.quitRecvRoutine = make(chan struct{})
// 发送任务循环
go c.sendRoutine()
// 接收任务循环
go c.recvRoutine()
return nil
}
sendRoutine
func (c *MConnection) sendRoutine() {
defer c._recover()
protoWriter := protoio.NewDelimitedWriter(c.bufConnWriter)
FOR_LOOP:
for {
var _n int
var err error
SELECTION:
select {
// 进行周期性的flush
case <-c.flushTimer.C以上是关于Tendermint P2P源码分析的主要内容,如果未能解决你的问题,请参考以下文章