如何同时运行包含借用 TcpStream 的期货?
Posted
技术标签:
【中文标题】如何同时运行包含借用 TcpStream 的期货?【英文标题】:How to run futures containing borrowed TcpStream concurrently? 【发布时间】:2020-02-02 10:18:48 【问题描述】:我试图让这段代码 sn-p 并发运行而不是顺序运行,因为对等点的数量可能很大。我正在使用async_std 1.4
和锈1.41
pub struct Peer
pub peer_id: String,
pub tcp_stream: Arc<TcpStream>,
pub public_key: [u8; 32],
async fn send_to_all_peers(message: Protocol, peers: &HashMap<String,Peer>) -> Result<()>
for peer in peers.values()
let mut stream = &*peer.tcp_stream;
stream.write_all(&bincode::serialize(&message)?).await?;
Ok(())
我尝试使用futures::future::join_all
方法没有任何运气,因为包装我在async_std::task::spawn
中创建和使用的未来需要静态生命周期。这是我尝试过的:
async fn send_to_all_peers(message: Protocol, peers: &HashMap<String,Peer>)
let handles = peers.values().into_iter().map(|peer|
task::spawn(
async
let mut stream = &*peer.tcp_stream;
if let Err(err) = stream
.write_all(&bincode::serialize(&message).unwrap())
.await
error!("Error when writing to tcp_stream: ", err);
)
);
futures::future::join_all(handles).await;
我确定我缺少一些方法,感谢您的帮助!
【问题讨论】:
【参考方案1】:由于您尝试同时发送消息,因此每个任务都必须拥有自己的消息副本:
use async_std::task, net::TcpStream;
use futures::future, io::AsyncWriteExt;
use serde::Serialize;
use std::
collections::HashMap,
error::Error,
sync::Arc,
;
pub struct Peer
pub peer_id: String,
pub tcp_stream: Arc<TcpStream>,
pub public_key: [u8; 32],
#[derive(Serialize)]
struct Protocol;
async fn send_to_all_peers(
message: Protocol,
peers: &HashMap<String, Peer>)
-> Result<(), Box<dyn Error>>
let msg = bincode::serialize(&message)?;
let handles = peers.values()
.map(|peer|
let msg = msg.clone();
let socket = peer.tcp_stream.clone();
task::spawn(async move
let mut socket = &*socket;
socket.write_all(&msg).await
)
);
future::try_join_all(handles).await?;
Ok(())
【讨论】:
谢谢!这解决了它。我希望我可以绕过克隆Arc<TcpStream>
,但我想这是不可能安全的。【参考方案2】:
你有没有尝试过类似的东西
let handles = peers.values().into_iter().map(|peer|
let mut stream = &*peer.tcp_stream;
stream.write_all(&bincode::serialize(&message).unwrap())
let results = futures::future::join_all(handles).await
?
注意 .map 闭包如何不等待,而是直接返回一个未来,然后将其传递给 join_all,然后等待。
【讨论】:
我已经尝试过另一个生命周期问题,这是错误的基本要点:rust stream.write_all(message) returns a value referencing data owned by the current function
以上是关于如何同时运行包含借用 TcpStream 的期货?的主要内容,如果未能解决你的问题,请参考以下文章