浅谈 Gossipsub

Posted Netwarps

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了浅谈 Gossipsub相关的知识,希望对你有一定的参考价值。

情况概述

目前libp2p-rs团队在开发分支上初步完成了Gossip协议的相关工作,将会尽快发布。以下是Gossip协议的相关简单介绍:

从Pubsub说起

Pubsub的意思是publish/subscribe,即发布/订阅,是消息传播一种机制。任意一个节点都可以关注感兴趣的主题,并发布主题相关的消息,此消息将会被发送到任何订阅了该主题的节点上。Pubsub共有三种实现:Floodsub,Gossipsub,Randomsub。本文将主要分析floodsub和gossipsub。

Floodsub

Floodsub顾名思义,是一种效果类似于“泛洪”的发布/订阅机制。在libp2p-rs中,每一个节点通过floodsub收到相关的消息后,首先在本地进行相关的处理,然后通过再发布的形式扩散到周围的节点。

Floodsub的优点在于可维护性较强,且在消息的传播路径上能最小化网络延迟所带来的问题。但是它仍然有不足的地方,当某个节点拥有大量的连接节点时,转发消息可能将会带来极大的带宽问题。

Gossipsub

Gossipsub是在floodsub的基础上设计开发的一个协议。通过施加一些限制,以及设计一些相关的数据结构,解决了floodsub带来的问题。核心是维护mesh和fanout。

mesh&fanout

mesh是一个针对主题形成小范围的节点网格,结构如下:

    /// Overlay network of connected peers - Maps topics to connected gossipsub peers.
    mesh: HashMap<TopicHash, BTreeSet<PeerId>>,

这是一个以主题哈希为key,B树集合为value的哈希表。针对每一个主题,节点都会生成一棵B树集合,需要注意的是这棵B树有节点个数的最大最小值限制。每收到一个相关主题的消息,节点都会消息转发到对应的B树集合节点上,由于节点是有限的,此时将大幅度减少带宽的使用。

fanout则是一种比较特殊的网格,结构与mesh相同:

    /// Map of topics to list of peers that we publish to, but don\'t subscribe to.
    fanout: HashMap<TopicHash, BTreeSet<PeerId>>,

fanout记录的是发布了消息但是没有进行订阅操作的主题与节点关系。即代表一个节点可以不订阅某个主题而直接发送与该主题相关的消息。fanout的构建只与publish有关。

fanout和topic_peers是构建mesh的关键。topic_peers的结构如下:

    /// A map of all connected peers - A map of topic hash to a list of gossipsub peer Ids.
    topic_peers: HashMap<TopicHash, BTreeSet<PeerId>>,

topic_peers用来记录主题与已连接的节点ID对应关系。
构建mesh时,首先从fanout中查找数据,通过某些特定的条件筛选出节点添加到mesh;如果mesh当前节点数小于最小值,再通过topic_peers找出其他已知节点,完成mesh的构建。同时还将构建类型为GRAFT的消息并向外传播。

Control Message

Control Message是用来维护gossip的消息。共有四种类型:GRAFT,PRUNE,IHAVE,IWANT。

GRAFT,意为嫁接、移植。在gossip中,如果一个节点A订阅了某个主题,那么会向周围节点发布GRAFT消息,通知它们把A加入到mesh中。接收到消息后,周围节点会通过一系列的条件判断来决定是否A节点添加到mesh。

PRUNE与GRAFT正好相反,是从mesh中移除节点。PRUNE的触发场景一般是在取消订阅时,需要告知其他节点执行mesh移除的操作。

IHAVE是当前节点向外广播,告诉其他节点“我”的mesh有哪些主题以及这个mesh下所有已知的消息。如果有节点对某个主题感兴趣,就需要回复一个IWANT消息,双方才能进行信息的交换。

IWANT代表需要一个或者多个主题对应的消息。节点接收到IWANT消息后,会从本地消息缓存中查找对应的消息ID并返回。需要注意的是,为了防止恶意的多次请求,消息查找有一定的次数限制,这与GossipConfig的设置有关。

Message Cache

针对接收到的消息,Gossip提供全局的Message Cache进行缓存操作:

/// CacheEntry stored in the history.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct CacheEntry {
    mid: MessageId,
    topic: TopicHash,
}

/// MessageCache struct holding history of messages.
#[derive(Clone)]
pub struct MessageCache {
    msgs: HashMap<MessageId, RawGossipsubMessage>,
    /// For every message and peer the number of times this peer asked for the message
    iwant_counts: HashMap<MessageId, HashMap<PeerId, u32>>,
    history: Vec<Vec<CacheEntry>>,
    /// The number of indices in the cache history used for gossipping. That means that a message
    /// won\'t get gossipped anymore when shift got called `gossip` many times after inserting the
    /// message in the cache.
    gossip: usize,
}

msgs记录的是消息ID与具体的消息内容;iwant_counts是上面提到的对消息查找的访问次数限制;history记录了历史的缓存信息;gossip代表最大可访问到的历史消息条数。

需要注意的是,Message Cache的缓存会在heartbeat执行的时候进行清理。

    /// Shift the history array down one and delete messages associated with the
    /// last entry.
    pub fn shift(&mut self) {
        for entry in self.history.pop().expect("history is always > 1") {
            if let Some(msg) = self.msgs.remove(&entry.mid) {
                if !msg.validated {
                    // If GossipsubConfig::validate_messages is true, the implementing
                    // application has to ensure that Gossipsub::validate_message gets called for
                    // each received message within the cache timeout time."
                    debug!(
                        "The message with id {} got removed from the cache without being validated.",
                        &entry.mid
                    );
                }
            }
            debug!("Remove message from the cache: {}", &entry.mid);

            self.iwant_counts.remove(&entry.mid);
        }

        // Insert an empty vec in position 0
        self.history.insert(0, Vec::new());
    }

首先将最早的历史消息数组从history中取出,其中可能包含了一条或多条消息,再从msgs中消除对应的Gossip Message,并重置消息的访问次数。最后再往history中新增一条空记录,确保下一次执行put操作时不会出现错误情况。

Heartbeat

Heartbeat维护了一个定期执行的心跳连接过程,在固定的频率下向外发送消息。功能上实现了几个效果:维持mesh和fanout的稳定,以及向外的广播。

维护mesh

mesh维护比较好理解,因为mesh本身有上下限的限制。mesh少了,从topic_peers中补充;多了就根据评分系统执行相关的裁剪功能。此外,对于mesh中的节点还需要更新评分,以确保系统更好的运行。

维护fanout

前文有提到,fanout的作用是记录没有订阅但是发布了消息的主题。对于每一个发布了消息的主题,有一个数据结构fanout_last_pub记录了发布的时间:

    /// The last publish time for fanout topics.
    fanout_last_pub: HashMap<TopicHash, Instant>,

如果超过了Gossip预设的过期时间,这条主题就会从fanout中删除。

fanout的维护还与peer_topics有关。peer_topics是一个关联节点与主题的数据结构:

    /// A map of all connected peers to their subscribed topics.
    peer_topics: HashMap<PeerId, BTreeSet<TopicHash>>,

如果某个节点存在于fanout的某个主题中,但是在peer_topics中发现该节点已经不再关注此主题,就会被标记为需要移除。

fanout本身有节点的下限数限制,与mesh一样,都是从topic_peers中找出节点进行补充。

向外广播

这个机制是为了让不在mesh和fanout中,却又订阅了相关主题的节点,也有机会接收到主题相关的消息。并不是每个节点都一定会收到消息,随机选取节点的算法受到Config中的配置和关注主题的节点数两者共同影响。

首先将mesh和fanout合并成一个迭代器,从Message Cache中取出主题对应的消息ID,向topic_peers中随机选出的节点发送IHAVE消息。

总结

Gossip协议的东西比较多,上述内容只是其中的一小部分介绍,更详细的代码可参阅libp2p-rs后续的版本发布,感谢各位阅读。


Netwarps 由国内资深的云计算和分布式技术开发团队组成,该团队在金融、电力、通信及互联网行业有非常丰富的落地经验。Netwarps 目前在深圳、北京均设立了研发中心,团队规模30+,其中大部分为具备十年以上开发经验的技术人员,分别来自互联网、金融、云计算、区块链以及科研机构等专业领域。

Netwarps 专注于安全存储技术产品的研发与应用,主要产品有去中心化文件系统(DFS)、去中心化计算平台(DCP),致力于提供基于去中心化网络技术实现的分布式存储和分布式计算平台,具有高可用、低功耗和低网络的技术特点,适用于物联网、工业互联网等场景。

公众号:Netwarps

以上是关于浅谈 Gossipsub的主要内容,如果未能解决你的问题,请参考以下文章

浅谈AngularJS中的$parse和$eval

从内部入手,浅谈malloc和new的区别

浅谈代码重构与优化

浅谈Android保护技术__代码混淆

浅谈代码风格及代码中封号问题

浅谈代码质量与开发模式