使用 Async Rust 构建简单的 P2P 节点

Posted 小乔的博客

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了使用 Async Rust 构建简单的 P2P 节点相关的知识,希望对你有一定的参考价值。

使用 Async Rust 构建简单的 P2P 节点

P2P 简介

  • P2P:peer-to-peer
  • P2P 是一种网络技术,可以在不同的计算机之间共享各种计算资源,如 CPU、网络带宽和存储。
  • P2P 是当今用户在线共享文件(如音乐、图像和其他数字媒体)的一种非常常用的方法。
    • Bittorrent 和 Gnutella 是流行的文件共享 p2p 应用程序的例子。以及比特币和以太坊等区块链网络。
    • 它们不依赖中央服务器或中介来连接多个客户端。
    • 最重要的是,它们利用用户的计算机作为客户端和服务器,从而将计算从中央服务器上卸载下来。
  • 传统的分布式系统使用 Client-Server 范式来部署
  • P2P 是另一种分布式系统
    • 在 P2P 中,一组节点(或对等点,Peer)彼此直接交互以共同提供公共服务,而无需中央协调器或管理员
    • P2P 系统中的每个节点(或 Peer)都可以充当客户端(从其他节点请求信息)和服务器(存储/检索数据并响应客户端请求执行必要的计算)。
    • P2P 网络中的所有节点不必完全相同,一个关键特征将 Client-Server 网络与 P2P 网络区分开来:缺乏具有唯一权限的专用服务器。在开放、无许可的 P2P 网络中,任何节点都可以决定提供与 P2P 节点相关的全部或部分服务集。

P2P 的特点

  • 与 Client-Server 网络相比,P2P 网络能够在其上构建不同类别的应用程序,这些应用程序是无许可、容错和抗审查的。
    • 无许可:因为数据和状态是跨多个节点复制的,所以没有服务器可以切断客户机对信息的访问。
    • 容错性:因为没有单点故障,例如中央服务器。
    • 抗审查:如区块链等网络。
    • P2P 计算还可以更好地利用资源。

P2P 的复杂性

  • 构建 P2P 系统要比传统 Client-Server 的系统复杂
    • 传输:P2P 网络中的每个 Peer 都可以使用不同的协议,例如HTTP(s)、TCP、UDP等。
    • 身份:每个 Peer 都需要知道其想要连接并发送消息的 Peer 的身份。
    • 安全性:每个 Peer 都应该能够以安全的方式与其他 Peer 通信,而不存在第三方拦截或修改消息的风险等。
    • 路由:每个 Peer 可以通过各种路由(例如数据包在 IP 协议中的分布方式)从其他 Peer 接收消息,这意味着如果消息不是针对自身的,则每个 Peer 都应该能够将消息路由到其他 Peer。
    • 消息传递:P2P 网络应该能够发送点对点消息或组消息(以发布/订阅模式)。

P2P 的要求 - 传输

  • TCP/IP 和 UDP 协议无处不在,在编写网络应用程序时非常流行。但还有其他更高级别的协议,如 HTTP(TCP上分层)和 QUIC(UDP上分层)。
  • P2P 网络中的每个 Peer 都应该能够启动到另一个节点的连接,并且由于网络中 peer 的多样性,能够通过多个协议监听传入的连接。

P2P 的要求 - Peer 身份

  • 与 web 开发领域不同,在 web 开发领域中,服务器由唯一的域名标识(例如 www.rust-lang.org,然后使用域名服务将其解析为服务器的IP地址)
  • P2P 网络中的节点需要唯一身份,以便其他节点可以访问它们。
  • P2P 网络中的节点使用公钥和私钥对(非对称公钥加密)与其他节点建立通信。
    • P2P 网络中的节点的身份称为 PeerId,是节点公钥的加密散列。

P2P 的要求 - 安全

  • 加密密钥对和 PeerId 使节点能够与它的 peers 建立安全、经过身份验证的通信通道。但这只是安全的一个方面。
  • 节点还需要实现授权框架,该框架为哪个节点可以执行何种操作建立规则。
  • 还有需要解决的网络级安全威胁,如 sybil 攻击(其中一个节点运营商利用不同身份启动大量节点,以获得网络中的优势地位)或 eclipse 攻击(其中一组恶意节点共谋以特定节点为目标,使后者无法到达任何合法节点)。

P2P 的要求 - Peer 路由

  • P2P 网络中的节点首先需要找到其他 peer 才能进行通信。这是通过维护 peer 路由表来实现的,该表包含对网络中其他 peer 的引用。
  • 但是,在具有数千个或更多动态变化的节点(即节点加入和离开网络)的 P2P 网络中,任何单个节点都难以为网络中的所有节点维护完整而准确的路由表。Peer 路由使节点能够将不是给自己准备的消息路由到目标节点。

P2P 的要求 - 消息

  • P2P 网络中的节点可以向特定节点发送消息,但也可以参与广播消息协议。
    • 例如,发布/订阅,其中节点注册对特定主题的兴趣(订阅),发送该主题消息的任何节点(发布)都由订阅该主题的所有节点接收。这种技术通常用于将消息的内容传输到整个网络。

P2P 的要求 - 流多路复用

  • 流多路复用(Stream multiplexing)是通过公共通信链路发送多个信息流的一种方法。
  • 在 P2P 的情况下,它允许多个独立的“逻辑”流共享一个公共 P2P 传输层。
    • 当考虑到一个节点与不同 peers 具有多个通信流的可能性,或者两个远程节点之间也可能存在多个并发连接的可能性时,这一点变得很重要。
    • 流多路复用有助于优化 peer 之间建立连接的开销。

注意:多路复用在后端服务开发中很常见,其中客户端可以与服务器建立底层网络连接,然后通过底层网络连接多路复用不同的流(每个流具有唯一的端口号)。

Libp2p

  • libp2p 是一个由协议、规范和库组成的模块化系统,它支持 P2P 应用程序的开发。
  • 它目前支持三种语言:JS、Go、Rust
    • 未来将支持 Haskell、Java、Python等
  • 它被许多流行的项目使用,例如:IPFS、Filecoin 和 Polkadot 等。

Libp2p 的主要模块

  • 传输(Transport):负责从一个 peer 到另一个 peer 的数据的实际传输和接收
  • 身份(Identity):libp2p 使用公钥密钥(PKI)作为 peer 节点身份的基础。使用加密算法为每个节点生成唯一的 peer id。
  • 安全(Security):节点使用其私钥对消息进行签名。节点之间的传输连接可以升级为安全的加密通道,以便远程 peer 可以相互信任,并且没有第三方可以拦截它们之间的通信。
  • Peer 发现(Peer Discovery):允许 peer 在 libp2p 网络中查找并相互通信。
  • Peer 路由(Peer Routing):使用其他 peer 的知识信息来实现与 peer 节点的通信。
  • 内容发现(Content Discovery):在不知道哪个 peer 节点拥有该内容的情况下,允许 peer 节点从其他 peer 节点获取部分内容。
  • 消息(Messaging):其中发布/订阅:允许向对某个主题感兴趣的一组 peer 发送消息。

P2P 节点的身份

P2P Node

PeerId: 12d3k.....

~/rust via 

Rust 中 async/await 的目的是啥?

【中文标题】Rust 中 async/await 的目的是啥?【英文标题】:What is the purpose of async/await in Rust?Rust 中 async/await 的目的是什么? 【发布时间】:2019-03-21 00:03:25 【问题描述】:

在像 C# 这样的语言中,给出这段代码(我不是故意使用 await 关键字):

async Task Foo()

    var task = LongRunningOperationAsync();

    // Some other non-related operation
    AnotherOperation();

    result = task.Result;

在第一行中,长操作在另一个线程中运行,并返回Task(即未来)。然后,您可以执行另一个与第一个并行运行的操作,最后,您可以等待操作完成。我认为这也是async/await在Python、JavaScript等中的行为。

另一方面,在 Rust 中,我在 the RFC 中读到:

Rust 的期货与其他语言的期货之间的根本区别在于,除非被轮询,否则 Rust 的期货不会做任何事情。整个系统都是围绕这个构建的:例如,取消正是因为这个原因而放弃了未来。相比之下,在其他语言中,调用 async fn 会启动一个立即开始执行的 future。

在这种情况下,async/await 在 Rust 中的作用是什么?看到其他语言,这种表示法是一种运行并行操作的便捷方式,但如果调用 async 函数没有运行任何东西,我看不出它在 Rust 中是如何工作的。

【问题讨论】:

对于它的价值,Python 中的异步函数也会立即产生,并且只有在事件循环要求它们时才开始执行任何操作。该设计与 Rust 的非常相似。 c++ 也有延期期货!! 【参考方案1】:

您混淆了一些概念。

Concurrency is not parallelism、asyncawait并发的工具,这有时可能意味着它们也是并行工具。

此外,future 是否立即被轮询与选择的语法是正交的。

async / await

关键字asyncawait 的存在使创建异步代码并与之交互更容易阅读,并且看起来更像“普通”同步代码。据我所知,在所有具有此类关键字的语言中都是如此。

更简单的代码

这段代码创建了一个在轮询时将两个数字相加的未来

之前

fn long_running_operation(a: u8, b: u8) -> impl Future<Output = u8> 
    struct Value(u8, u8);

    impl Future for Value 
        type Output = u8;

        fn poll(self: Pin<&mut Self>, _ctx: &mut Context) -> Poll<Self::Output> 
            Poll::Ready(self.0 + self.1)
        
    

    Value(a, b)

之后

async fn long_running_operation(a: u8, b: u8) -> u8 
    a + b

请注意,“之前”代码基本上是implementation of today's poll_fn function

另请参阅Peter Hall's answer,了解如何更好地跟踪许多变量。

参考文献

async/await 的一个潜在令人惊讶的事情是它启用了一种以前不可能的特定模式:在期货中使用引用。下面是一些以异步方式用值填充缓冲区的代码:

之前

use std::io;

fn fill_up<'a>(buf: &'a mut [u8]) -> impl Future<Output = io::Result<usize>> + 'a 
    futures::future::lazy(move |_| 
        for b in buf.iter_mut()  *b = 42 
        Ok(buf.len())
    )


fn foo() -> impl Future<Output = Vec<u8>> 
    let mut data = vec![0; 8];
    fill_up(&mut data).map(|_| data)

编译失败:

error[E0597]: `data` does not live long enough
  --> src/main.rs:33:17
   |
33 |     fill_up_old(&mut data).map(|_| data)
   |                 ^^^^^^^^^ borrowed value does not live long enough
34 | 
   | - `data` dropped here while still borrowed
   |
   = note: borrowed value must be valid for the static lifetime...

error[E0505]: cannot move out of `data` because it is borrowed
  --> src/main.rs:33:32
   |
33 |     fill_up_old(&mut data).map(|_| data)
   |                 ---------      ^^^ ---- move occurs due to use in closure
   |                 |              |
   |                 |              move out of `data` occurs here
   |                 borrow of `data` occurs here
   |
   = note: borrowed value must be valid for the static lifetime...

之后

use std::io;

async fn fill_up(buf: &mut [u8]) -> io::Result<usize> 
    for b in buf.iter_mut()  *b = 42 
    Ok(buf.len())


async fn foo() -> Vec<u8> 
    let mut data = vec![0; 8];
    fill_up(&mut data).await.expect("IO failed");
    data

这行得通!

调用async 函数不会运行任何东西

另一方面,Future 和围绕期货的整个系统的实现和设计与关键字asyncawait 无关。事实上,在 async / await 关键字出现之前,Rust 就有一个蓬勃发展的异步生态系统(例如 Tokio)。 JavaScript 也是如此。

为什么Futures 在创建时不立即轮询?

要获得最权威的答案,请查看 RFC 拉取请求中的 this comment from withoutboats:

Rust 的期货与其他期货的根本区别 语言是 Rust 的未来不会做任何事情,除非被轮询。这 整个系统都是围绕这个构建的:例如,取消是 正是因为这个原因,放弃了未来。相比之下,在其他 语言,调用 async fn 会启动一个开始执行的未来 马上。

关于这一点的一点是,Rust 中的 async 和 await 并不是天生的 并发构造。如果你的程序只使用 async & await 并且没有并发原语,您程序中的代码将 以定义的、静态已知的线性顺序执行。显然,大多数 程序将使用某种并发来调度多个, 事件循环上的并发任务,但他们不必这样做。这是什么 意味着您可以-琐碎地-在本地保证订购 某些事件,即使在两者之间执行了非阻塞 IO 他们希望与更大的非本地集合异步 事件(例如,您可以严格控制 a 内事件的顺序 请求处理程序,同时与许多其他请求并发 处理程序,甚至在等待点的两侧)。

这个属性为 Rust 的 async/await 语法提供了一种本地的 推理和低级控制使 Rust 成为现在的样子。跑起来 到第一个等待点不会本质上违反这一点 - 你会 仍然知道代码何时执行,它只会分两步执行 不同的地方取决于它是在一个之前还是之后 等待。但是,我认为其他语言做出的决定开始 立即执行很大程度上源于他们的系统 调用 async fn 时立即同时安排任务 (例如,这是我得到的潜在问题的印象 来自 Dart 2.0 文档)。

this discussion from munificent 涵盖了 Dart 2.0 的一些背景:

大家好,我在 Dart 团队。 Dart 的 async/await 主要是由 Erik Meijer,他也从事 C# 的 async/await 工作。在 C# 中,异步/等待 与第一个等待同步。对于 Dart,Erik 和其他人认为 C# 的模型太混乱了,而是指定了 async 函数在执行任何代码之前总是产生一次。

当时,我和我团队中的另一个人的任务是成为 豚鼠尝试我们的新的进行中的语法和语义 包管理器。基于那个经验,我们觉得异步函数 应该与第一个等待同步运行。我们的论点是 主要是:

    总是让步一次会无缘无故地降低性能。在大多数情况下,这无关紧要,但在某些情况下确实如此 做。即使在你可以忍受它的情况下,流血也是一种拖累 无处不在的小性能。

    总是让步意味着某些模式无法使用 async/await 实现。特别是,有这样的代码真的很常见 (这里是伪代码):

    getThingFromNetwork():
      if (downloadAlreadyInProgress):
        return cachedFuture
    
      cachedFuture = startDownload()
      return cachedFuture
    

    换句话说,您有一个异步操作,您可以在它完成之前多次调用它。以后的调用使用相同的 先前创建的未决未来。你想确保你不开始 多次操作。这意味着您需要同步 在开始操作之前检查缓存。

    如果 async 函数从一开始就是异步的,那么上面的函数就不能使用 async/await。

我们为我们的案子辩护,但最终语言设计者坚持 从顶部异步。这是几年前的事了。

原来是打错电话了。性能成本是真实的 足以让许多用户产生“异步函数是 慢”并开始避免使用它,即使在性能命中的情况下 是负担得起的。更糟糕的是,我们看到令人讨厌的并发错误 认为他们可以在函数的顶部做一些同步工作,并且 沮丧地发现他们创造了竞争条件。总的来说,它 似乎用户不会自然地假设 async 函数之前会产生 执行任何代码。

因此,对于 Dart 2,我们现在将非常痛苦的突破性更改 将异步函数更改为与第一个等待同步并 通过该过渡迁移我们所有现有的代码。我很高兴 我们正在做出改变,但我真的希望我们做了正确的事 第一天。

我不知道 Rust 的所有权和性能模型是否有所不同 对你的限制,从顶部异步确实更好, 但根据我们的经验,sync-to-the-first-await 显然更好 Dart 的权衡。

cramert replies(注意其中一些语法现在已经过时了):

如果您需要在调用函数时立即执行代码 而不是稍后轮询未来时,您可以编写您的 函数如下:

fn foo() -> impl Future<Item=Thing> 
    println!("prints immediately");
    async_block! 
        println!("prints when the future is first polled");
        await!(bar());
        await!(baz())
    

代码示例

这些示例使用 Rust 1.39 和 futures crate 0.3.1 中的异步支持。

C# 代码的文字转录

use futures; // 0.3.1

async fn long_running_operation(a: u8, b: u8) -> u8 
    println!("long_running_operation");

    a + b


fn another_operation(c: u8, d: u8) -> u8 
    println!("another_operation");

    c * d


async fn foo() -> u8 
    println!("foo");

    let sum = long_running_operation(1, 2);

    another_operation(3, 4);

    sum.await


fn main() 
    let task = foo();

    futures::executor::block_on(async 
        let v = task.await;
        println!("Result: ", v);
    );

如果你调用foo,Rust 中的事件顺序将是:

    返回实现 Future&lt;Output = u8&gt; 的内容。

就是这样。尚未完成“实际”工作。如果您获取foo 的结果并推动它完成(通过轮询,在本例中是通过futures::executor::block_on),那么接下来的步骤是:

    调用long_running_operation 返回了实现Future&lt;Output = u8&gt; 的东西(它还没有开始工作)。

    another_operation 确实有效,因为它是同步的。

    .await 语法导致long_running_operation 中的代码启动。 foo 未来将继续返回“未准备好”,直到计算完成。

输出将是:

foo
another_operation
long_running_operation
Result: 3

注意这里没有线程池:这都是在一个线程上完成的。

async

您也可以使用async 块:

use futures::future, FutureExt; // 0.3.1

fn long_running_operation(a: u8, b: u8) -> u8 
    println!("long_running_operation");

    a + b


fn another_operation(c: u8, d: u8) -> u8 
    println!("another_operation");

    c * d


async fn foo() -> u8 
    println!("foo");

    let sum = async  long_running_operation(1, 2) ;
    let oth = async  another_operation(3, 4) ;

    let both = future::join(sum, oth).map(|(sum, _)| sum);

    both.await

这里我们将同步代码包装在 async 块中,然后等待两个操作完成,然后此函数才会完成。

请注意,像这样包装同步代码不是对于实际需要很长时间的任何事情都是一个好主意;请参阅What is the best approach to encapsulate blocking I/O in future-rs? 了解更多信息。

使用线程池

// Requires the `thread-pool` feature to be enabled 
use futures::executor::ThreadPool, future, task::SpawnExt, FutureExt;

async fn foo(pool: &mut ThreadPool) -> u8 
    println!("foo");

    let sum = pool
        .spawn_with_handle(async  long_running_operation(1, 2) )
        .unwrap();
    let oth = pool
        .spawn_with_handle(async  another_operation(3, 4) )
        .unwrap();

    let both = future::join(sum, oth).map(|(sum, _)| sum);

    both.await

【讨论】:

对不起,这还不清楚。你有一个 Rust 代码的例子,它和我写的 C# 代码做同样的事情吗?我的意思是:有 2 个操作与 async/await 异步运行。 @Boiethios 您仍然可以在一个异步函数中生成多个“子”期货,并将它们一起join 我认为开头的句子可能是“您将两个概念混为一谈:并发和并行”。 Async/Await 是一种启用并发的语法。例如,Python 生成器是并发的(生成器维护自己的堆栈,与调用者堆栈并发)但不并行运行。并行性需要并发性,但没有并行性,并发性是有用的。 第一个示例的函数体比需要的复杂得多。在稳定的 Rust 中,您可以简单地使用 poll_fn(|| a + b) 并完成它。 async/await在我看来的主要优势是可以跨yield点借钱,目前这是不可能的。 @SvenMarnach 我同意这些参考资料,并且我一直在努力进行更新以展示这一点。然而,我会争论复杂性,正如我所展示的is basically what poll_fn is implemented as,以一点可重用性为模。【参考方案2】:

考虑这个简单的伪 JavaScript 代码,它获取一些数据,处理它,根据上一步获取更多数据,对其进行汇总,然后打印结果:

getData(url)
   .then(response -> parseObjects(response.data))
   .then(data -> findAll(data, 'foo'))
   .then(foos -> getWikipediaPagesFor(foos))
   .then(sumPages)
   .then(sum -> console.log("sum is: ", sum));

async/await 形式中,即:

async 
    let response = await getData(url);
    let objects = parseObjects(response.data);
    let foos = findAll(objects, 'foo');
    let pages = await getWikipediaPagesFor(foos);
    let sum = sumPages(pages);
    console.log("sum is: ", sum);

它引入了很多一次性变量,并且可以说比带有承诺的原始版本更糟糕。那何必呢?

考虑这一变化,以后在计算中需要变量 responseobjects

async 
    let response = await getData(url);
    let objects = parseObjects(response.data);
    let foos = findAll(objects, 'foo');
    let pages = await getWikipediaPagesFor(foos);
    let sum = sumPages(pages, objects.length);
    console.log("sum is: ", sum, " and status was: ", response.status);

并尝试用 promise 将其重写为原始形式:

getData(url)
   .then(response -> Promise.resolve(parseObjects(response.data))
       .then(objects -> Promise.resolve(findAll(objects, 'foo'))
           .then(foos -> getWikipediaPagesFor(foos))
           .then(pages -> sumPages(pages, objects.length)))
       .then(sum -> console.log("sum is: ", sum, " and status was: ", response.status)));

每次您需要参考上一个结果时,您都需要将整个结构嵌套更深一层。这很快就会变得非常难以阅读和维护,但async/await 版本不会遇到这个问题。

【讨论】:

在 Rust 中编写了一些“累积”代码后,不得不构建元组然后随着函数的运行时间越来越长有选择地从它们中挑选出来确实很烦人。【参考方案3】:

async/await 在 Rust 中的目的是提供一个并发工具包——与 C# 和其他语言中的相同。

在 C# 和 JavaScript 中,async 方法立即开始运行,无论您是否await 结果,它们都会被调度。在 Python 和 Rust 中,当您调用 async 方法时,在您 await 之前,什么都不会发生(它甚至没有被安排)。但无论哪种方式,它的编程风格基本相同。

库提供了生成另一个任务(与当前任务同时运行并独立于当前任务运行)的能力:请参阅async_std::task::spawntokio::task::spawn


至于为什么 Rust async 并不完全像 C#,好吧,考虑一下这两种语言之间的差异:

Rust 不鼓励全局可变状态。在 C# 和 JS 中,每个 async 方法调用都会隐式添加到全局可变队列中。这是某些隐式上下文的副作用。不管好坏,这不是 Rust 的风格。

Rust 不是一个框架。 C# 提供了一个默认的事件循环是有道理的。它还提供了一个很棒的垃圾收集器!其他语言中的许多标准内容都是 Rust 中的可选库。

【讨论】:

感谢您的回答。它为async/await 为何如此而增添了新的色彩。

以上是关于使用 Async Rust 构建简单的 P2P 节点的主要内容,如果未能解决你的问题,请参考以下文章

rust中的异步编程

Rust async 编程

Rust 中项目构建管理工具 Cargo简单介绍

RUST Ex00 Async-std

Rust定时组件async-rs / futures-timer实现原理

「Rust语言」用Rust构建微服务