以太坊源码分析--p2p节点发现
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了以太坊源码分析--p2p节点发现相关的知识,希望对你有一定的参考价值。
参考技术A节点发现功能主要涉及 Server \\ Table \\ udp 这几个数据结构,它们有独自的事件响应循环,节点发现功能便是它们互相协作完成的。其中,每个以太坊客户端启动后都会在本地运行一个 Server ,并将网络拓扑中相邻的节点视为 Node ,而 Table 是 Node 的容器, udp 则是负责维持底层的连接。下面重点描述它们中重要的字段和事件循环处理的关键部分。
PrivateKey - 本节点的私钥,用于与其他节点建立时的握手协商
Protocols - 支持的所有上层协议
StaticNodes - 预设的静态 Peer ,节点启动时会首先去向它们发起连接,建立邻居关系
newTransport - 下层传输层实现,定义握手过程中的数据加密解密方式,默认的传输层实现是用 newRLPX() 创建的 rlpx ,这不是本文的重点
ntab - 典型实现是 Table ,所有 peer 以 Node 的形式存放在 Table
ourHandshake - 与其他节点建立连接时的握手信息,包含本地节点的版本号以及支持的上层协议
addpeer - 连接握手完成后,连接过程通过这个通道通知 Server
Server 的监听循环,启动底层监听socket,当收到连接请求时,Accept后调用 setupConn() 开始连接建立过程
Server的主要事件处理和功能实现循环
Node 唯一表示网络上的一个节点
IP - IP地址
UDP/TCP - 连接使用的UDP/TCP端口号
ID - 以太坊网络中唯一标识一个节点,本质上是一个椭圆曲线公钥(PublicKey),与 Server 的 PrivateKey 对应。一个节点的IP地址不一定是固定的,但ID是唯一的。
sha - 用于节点间的距离计算
Table 主要用来管理与本节点与其他节点的连接的建立\\更新\\删除
bucket - 所有 peer 按与本节点的距离远近放在不同的桶(bucket)中,详见之后的 节点维护
refreshReq - 更新 Table 请求通道
Table 的主要事件循环,主要负责控制 refresh 和 revalidate 过程。
refresh.C - 定时(30s)启动Peer刷新过程的定时器
refreshReq - 接收其他线程投递到 Table 的 刷新Peer连接 的通知,当收到该通知时启动更新,详见之后的 更新邻居关系
revalidate.C - 定时重新检查以连接节点的有效性的定时器,详见之后的 探活检测
udp 负责节点间通信的底层消息控制,是 Table 运行的 Kademlia 协议的底层组件
conn - 底层监听端口的连接
addpending - udp 用来接收 pending 的channel。使用场景为:当我们向其他节点发送数据包后(packet)后可能会期待收到它的回复,pending用来记录一次这种还没有到来的回复。举个例子,当我们发送ping包时,总是期待对方回复pong包。这时就可以将构造一个pending结构,其中包含期待接收的pong包的信息以及对应的callback函数,将这个pengding投递到udp的这个channel。 udp 在收到匹配的pong后,执行预设的callback。
gotreply - udp 用来接收其他节点回复的通道,配合上面的addpending,收到回复后,遍历已有的pending链表,看是否有匹配的pending。
Table - 和 Server 中的ntab是同一个 Table
udp 的处理循环,负责控制消息的向上递交和收发控制
udp 的底层接受数据包循环,负责接收其他节点的 packet
以太坊使用 Kademlia 分布式路由存储协议来进行网络拓扑维护,了解该协议建议先阅读 易懂分布式 。更权威的资料可以查看 wiki 。总的来说该协议:
源码中由 Table 结构保存所有 bucket , bucket 结构如下
节点可以在 entries 和 replacements 互相转化,一个 entries 节点如果 Validate 失败,那么它会被原本将一个原本在 replacements 数组的节点替换。
有效性检测就是利用 ping 消息进行探活操作。 Table.loop() 启动了一个定时器(0~10s),定期随机选择一个bucket,向其 entries 中末尾的节点发送 ping 消息,如果对方回应了 pong ,则探活成功。
Table.loop() 会定期(定时器超时)或不定期(收到refreshReq)地进行更新邻居关系(发现新邻居),两者都调用 doRefresh() 方法,该方法对在网络上查找离自身和三个随机节点最近的若干个节点。
Table 的 lookup() 方法用来实现节点查找目标节点,它的实现就是 Kademlia 协议,通过节点间的接力,一步一步接近目标。
当一个节点启动后,它会首先向配置的静态节点发起连接,发起连接的过程称为 Dial ,源码中通过创建 dialTask 跟踪这个过程
dialTask表示一次向其他节点主动发起连接的任务
在 Server 启动时,会调用 newDialState() 根据预配置的 StaticNodes 初始化一批 dialTask , 并在 Server.run() 方法中,启动这些这些任务。
Dial 过程需要知道目标节点( dest )的IP地址,如果不知道的话,就要先使用 recolve() 解析出目标的IP地址,怎么解析?就是先要用借助 Kademlia 协议在网络中查找目标节点。
当得到目标节点的IP后,下一步便是建立连接,这是通过 dialTask.dial() 建立连接
连接建立的握手过程分为两个阶段,在在 SetupConn() 中实现
第一阶段为 ECDH密钥建立 :
第二阶段为协议握手,互相交换支持的上层协议
如果两次握手都通过,dialTask将向 Server 的 addpeer 通道发送 peer 的信息
区块链入门教程以太坊源码分析p2p-dial.go源码分析
dial.go在p2p里面主要负责建立链接的部分工作。 比如发现建立链接的节点。 与节点建立链接。 通过discover来查找指定节点的地址。等功能。
dial.go里面利用一个dailstate的数据结构来存储中间状态,是dial功能里面的核心数据结构。
// dialstate schedules dials and discovery lookups.
// it get‘s a chance to compute new tasks on every iteration
// of the main loop in Server.run.
type dialstate struct {
maxDynDials int //最大的动态节点链接数量
ntab discoverTable //discoverTable 用来做节点查询的
netrestrict *netutil.Netlist
lookupRunning bool
dialing map[discover.NodeID]connFlag //正在链接的节点
lookupBuf []*discover.Node // current discovery lookup results //当前的discovery查询结果
randomNodes []*discover.Node // filled from Table //从discoverTable随机查询的节点
static map[discover.NodeID]*dialTask //静态的节点。
hist *dialHistory
start time.Time // time when the dialer was first used
bootnodes []*discover.Node // default dials when there are no peers //这个是内置的节点。 如果没有找到其他节点。那么使用链接这些节点。
}
dailstate的创建过程。
func newDialState(static []*discover.Node, bootnodes []*discover.Node, ntab discoverTable, maxdyn int, netrestrict *netutil.Netlist) *dialstate {
s := &dialstate{
maxDynDials: maxdyn,
ntab: ntab,
netrestrict: netrestrict,
static: make(map[discover.NodeID]*dialTask),
dialing: make(map[discover.NodeID]connFlag),
bootnodes: make([]*discover.Node, len(bootnodes)),
randomNodes: make([]*discover.Node, maxdyn/2),
hist: new(dialHistory),
}
copy(s.bootnodes, bootnodes)
for _, n := range static {
s.addStatic(n)
}
return s
}
dail最重要的方法是newTasks方法。这个方法用来生成task。 task是一个接口。有一个Do的方法。
type task interface {
Do(*Server)
}
func (s *dialstate) newTasks(nRunning int, peers map[discover.NodeID]*Peer, now time.Time) []task {
if s.start == (time.Time{}) {
s.start = now
}
var newtasks []task
//addDial是一个内部方法, 首先通过checkDial检查节点。然后设置状态,最后把节点增加到newtasks队列里面。
addDial := func(flag connFlag, n *discover.Node) bool {
if err := s.checkDial(n, peers); err != nil {
log.Trace("Skipping dial candidate", "id", n.ID, "addr", &net.TCPAddr{IP: n.IP, Port: int(n.TCP)}, "err", err)
return false
}
s.dialing[n.ID] = flag
newtasks = append(newtasks, &dialTask{flags: flag, dest: n})
return true
}
// Compute number of dynamic dials necessary at this point.
needDynDials := s.maxDynDials
//首先判断已经建立的连接的类型。如果是动态类型。那么需要建立动态链接数量减少。
for _, p := range peers {
if p.rw.is(dynDialedConn) {
needDynDials--
}
}
//然后再判断正在建立的链接。如果是动态类型。那么需要建立动态链接数量减少。
for _, flag := range s.dialing {
if flag&dynDialedConn != 0 {
needDynDials--
}
}
// Expire the dial history on every invocation.
s.hist.expire(now)
// Create dials for static nodes if they are not connected.
//查看所有的静态类型。如果可以那么也创建链接。
for id, t := range s.static {
err := s.checkDial(t.dest, peers)
switch err {
case errNotWhitelisted, errSelf:
log.Warn("Removing static dial candidate", "id", t.dest.ID, "addr", &net.TCPAddr{IP: t.dest.IP, Port: int(t.dest.TCP)}, "err", err)
delete(s.static, t.dest.ID)
case nil:
s.dialing[id] = t.flags
newtasks = append(newtasks, t)
}
}
// If we don‘t have any peers whatsoever, try to dial a random bootnode. This
// scenario is useful for the testnet (and private networks) where the discovery
// table might be full of mostly bad peers, making it hard to find good ones.
//如果当前还没有任何链接。 而且20秒(fallbackInterval)内没有创建任何链接。 那么就使用bootnode创建链接。
if len(peers) == 0 && len(s.bootnodes) > 0 && needDynDials > 0 && now.Sub(s.start) > fallbackInterval {
bootnode := s.bootnodes[0]
s.bootnodes = append(s.bootnodes[:0], s.bootnodes[1:]...)
s.bootnodes = append(s.bootnodes, bootnode)
if addDial(dynDialedConn, bootnode) {
needDynDials--
}
}
// Use random nodes from the table for half of the necessary
// dynamic dials.
//否则使用1/2的随机节点创建链接。
randomCandidates := needDynDials / 2
if randomCandidates > 0 {
n := s.ntab.ReadRandomNodes(s.randomNodes)
for i := 0; i < randomCandidates && i < n; i++ {
if addDial(dynDialedConn, s.randomNodes[i]) {
needDynDials--
}
}
}
// Create dynamic dials from random lookup results, removing tried
// items from the result buffer.
i := 0
for ; i < len(s.lookupBuf) && needDynDials > 0; i++ {
if addDial(dynDialedConn, s.lookupBuf[i]) {
needDynDials--
}
}
s.lookupBuf = s.lookupBuf[:copy(s.lookupBuf, s.lookupBuf[i:])]
// Launch a discovery lookup if more candidates are needed.
// 如果就算这样也不能创建足够动态链接。 那么创建一个discoverTask用来再网络上查找其他的节点。放入lookupBuf
if len(s.lookupBuf) < needDynDials && !s.lookupRunning {
s.lookupRunning = true
newtasks = append(newtasks, &discoverTask{})
}
// Launch a timer to wait for the next node to expire if all
// candidates have been tried and no task is currently active.
// This should prevent cases where the dialer logic is not ticked
// because there are no pending events.
// 如果当前没有任何任务需要做,那么创建一个睡眠的任务返回。
if nRunning == 0 && len(newtasks) == 0 && s.hist.Len() > 0 {
t := &waitExpireTask{s.hist.min().exp.Sub(now)}
newtasks = append(newtasks, t)
}
return newtasks
}
checkDial方法, 用来检查任务是否需要创建链接。
func (s *dialstate) checkDial(n *discover.Node, peers map[discover.NodeID]*Peer) error {
_, dialing := s.dialing[n.ID]
switch {
case dialing: //正在创建
return errAlreadyDialing
case peers[n.ID] != nil: //已经链接了
return errAlreadyConnected
case s.ntab != nil && n.ID == s.ntab.Self().ID: //建立的对象不是自己
return errSelf
case s.netrestrict != nil && !s.netrestrict.Contains(n.IP): //网络限制。 对方的IP地址不在白名单里面。
return errNotWhitelisted
case s.hist.contains(n.ID): // 这个ID曾经链接过。
return errRecentlyDialed
}
return nil
}
taskDone方法。 这个方法再task完成之后会被调用。 查看task的类型。如果是链接任务,那么增加到hist里面。 并从正在链接的队列删除。 如果是查询任务。 把查询的记过放在lookupBuf里面。
func (s *dialstate) taskDone(t task, now time.Time) {
switch t := t.(type) {
case *dialTask:
s.hist.add(t.dest.ID, now.Add(dialHistoryExpiration))
delete(s.dialing, t.dest.ID)
case *discoverTask:
s.lookupRunning = false
s.lookupBuf = append(s.lookupBuf, t.results...)
}
}
dialTask.Do方法,不同的task有不同的Do方法。 dailTask主要负责建立链接。 如果t.dest是没有ip地址的。 那么尝试通过resolve查询ip地址。 然后调用dial方法创建链接。 对于静态的节点。如果第一次失败,那么会尝试再次resolve静态节点。然后再尝试dial(因为静态节点的ip是配置的。 如果静态节点的ip地址变动。那么我们尝试resolve静态节点的新地址,然后调用链接。)
func (t *dialTask) Do(srv *Server) {
if t.dest.Incomplete() {
if !t.resolve(srv) {
return
}
}
success := t.dial(srv, t.dest)
// Try resolving the ID of static nodes if dialing failed.
if !success && t.flags&staticDialedConn != 0 {
if t.resolve(srv) {
t.dial(srv, t.dest)
}
}
}
resolve方法。这个方法主要调用了discover网络的Resolve方法。如果失败,那么超时再试
// resolve attempts to find the current endpoint for the destination
// using discovery.
//
// Resolve operations are throttled with backoff to avoid flooding the
// discovery network with useless queries for nodes that don‘t exist.
// The backoff delay resets when the node is found.
func (t *dialTask) resolve(srv *Server) bool {
if srv.ntab == nil {
log.Debug("Can‘t resolve node", "id", t.dest.ID, "err", "discovery is disabled")
return false
}
if t.resolveDelay == 0 {
t.resolveDelay = initialResolveDelay
}
if time.Since(t.lastResolved) < t.resolveDelay {
return false
}
resolved := srv.ntab.Resolve(t.dest.ID)
t.lastResolved = time.Now()
if resolved == nil {
t.resolveDelay *= 2
if t.resolveDelay > maxResolveDelay {
t.resolveDelay = maxResolveDelay
}
log.Debug("Resolving node failed", "id", t.dest.ID, "newdelay", t.resolveDelay)
return false
}
// The node was found.
t.resolveDelay = initialResolveDelay
t.dest = resolved
log.Debug("Resolved node", "id", t.dest.ID, "addr", &net.TCPAddr{IP: t.dest.IP, Port: int(t.dest.TCP)})
return true
}
dial方法,这个方法进行了实际的网络连接操作。 主要通过srv.SetupConn方法来完成, 后续再分析Server.go的时候再分析这个方法。
// dial performs the actual connection attempt.
func (t *dialTask) dial(srv *Server, dest *discover.Node) bool {
fd, err := srv.Dialer.Dial(dest)
if err != nil {
log.Trace("Dial error", "task", t, "err", err)
return false
}
mfd := newMeteredConn(fd, false)
srv.SetupConn(mfd, t.flags, dest)
return true
}
discoverTask和waitExpireTask的Do方法,
func (t *discoverTask) Do(srv *Server) {
// newTasks generates a lookup task whenever dynamic dials are
// necessary. Lookups need to take some time, otherwise the
// event loop spins too fast.
next := srv.lastLookup.Add(lookupInterval)
if now := time.Now(); now.Before(next) {
time.Sleep(next.Sub(now))
}
srv.lastLookup = time.Now()
var target discover.NodeID
rand.Read(target[:])
t.results = srv.ntab.Lookup(target)
}
func (t waitExpireTask) Do(*Server) {
time.Sleep(t.Duration)
}
以上是关于以太坊源码分析--p2p节点发现的主要内容,如果未能解决你的问题,请参考以下文章