如何同时运行包含借用 TcpStream 的期货?

Posted

技术标签:

【中文标题】如何同时运行包含借用 TcpStream 的期货?【英文标题】:How to run futures containing borrowed TcpStream concurrently? 【发布时间】:2020-02-02 10:18:48 【问题描述】:

我试图让这段代码 sn-p 并发运行而不是顺序运行,因为对等点的数量可能很大。我正在使用async_std 1.4 和锈1.41

pub struct Peer 
    pub peer_id: String,
    pub tcp_stream: Arc<TcpStream>,
    pub public_key: [u8; 32],


async fn send_to_all_peers(message: Protocol, peers: &HashMap<String,Peer>) -> Result<()> 
    for peer in peers.values() 
        let mut stream = &*peer.tcp_stream;
        stream.write_all(&bincode::serialize(&message)?).await?;
    
    Ok(())

我尝试使用futures::future::join_all 方法没有任何运气,因为包装我在async_std::task::spawn 中创建和使用的未来需要静态生命周期。这是我尝试过的:

async fn send_to_all_peers(message: Protocol, peers: &HashMap<String,Peer>) 
    let handles = peers.values().into_iter().map(|peer| 
        task::spawn(
            async 
                let mut stream = &*peer.tcp_stream;
                if let Err(err) = stream
                    .write_all(&bincode::serialize(&message).unwrap())
                    .await
                
                    error!("Error when writing to tcp_stream: ", err);
                
            
        )
    );
    futures::future::join_all(handles).await;

我确定我缺少一些方法,感谢您的帮助!

【问题讨论】:

【参考方案1】:

由于您尝试同时发送消息,因此每个任务都必须拥有自己的消息副本:

use async_std::task, net::TcpStream;
use futures::future, io::AsyncWriteExt;
use serde::Serialize;
use std::
    collections::HashMap,
    error::Error,
    sync::Arc,
;

pub struct Peer 
    pub peer_id: String,
    pub tcp_stream: Arc<TcpStream>,
    pub public_key: [u8; 32],


#[derive(Serialize)]
struct Protocol;

async fn send_to_all_peers(
    message: Protocol,
    peers: &HashMap<String, Peer>)
    -> Result<(), Box<dyn Error>>

    let msg = bincode::serialize(&message)?;
    let handles = peers.values()
        .map(|peer| 
            let msg = msg.clone();
            let socket = peer.tcp_stream.clone();
            task::spawn(async move 
                let mut socket = &*socket;
                socket.write_all(&msg).await
            )
        );

    future::try_join_all(handles).await?;
    Ok(())

【讨论】:

谢谢!这解决了它。我希望我可以绕过克隆Arc&lt;TcpStream&gt;,但我想这是不可能安全的。【参考方案2】:

你有没有尝试过类似的东西

let handles = peers.values().into_iter().map(|peer| 
   let mut stream = &*peer.tcp_stream;
   stream.write_all(&bincode::serialize(&message).unwrap())

let results = futures::future::join_all(handles).await

?

注意 .map 闭包如何不等待,而是直接返回一个未来,然后将其传递给 join_all,然后等待。

【讨论】:

我已经尝试过另一个生命周期问题,这是错误的基本要点:rust stream.write_all(message) returns a value referencing data owned by the current function

以上是关于如何同时运行包含借用 TcpStream 的期货?的主要内容,如果未能解决你的问题,请参考以下文章

如何在Rust中读写XML到TcpStream?

在结构中声明 TcpStream 的 Rust 问题

可变地借用一个结构字段,同时在闭包中借用另一个

怎么用期货做风险对冲(如何利用期货对冲风险)

如何得到外盘期货交易所实时数据?及API接口?

如何处理 TcpStream 上的并行读写?