从 rust 中的通道迭代器获取第一个接收到的值
Posted
技术标签:
【中文标题】从 rust 中的通道迭代器获取第一个接收到的值【英文标题】:Get the first received value from an iterator of channels in rust 【发布时间】:2021-04-14 06:43:30 【问题描述】:我有一个 futures::channel::mpsc::UnboundedReceiver<T>
的迭代器。我想处理接收者的每一个答案,一次只处理一个,同时还要处理其他未来。
这应该可以通过循环 futures::select! 来实现。但我需要某种方式从UnboundReceiver<T>
中获取解析值。
我尝试使用futures::future::select_all(Iter)
,但编译失败,错误为:futures::channel::mpsc::UnboundedReceiver<T> is not a future
。
游乐场示例是here。
【问题讨论】:
【参考方案1】:futures::channel::mpsc::UnboundedReceiver 实现Stream
但不是未来,因此您可以通过调用futures::stream::select_all(recv)
创建SelectAll
,然后通过调用select_all.next()
解析到下一条就绪消息.
我通过使用它来调整你的例子:
use futures::channel::mpsc, stream::self, StreamExt, select_all; // 0.3.8
use tokio; // 1.0.1
#[tokio::main]
async fn main() -> failure::Fallible<()>
let mut recv = Vec::new();
let mut futures = stream::FuturesUnordered::new();
for _i in 0..3
let (tx, rx) = mpsc::unbounded();
recv.push(rx);
futures.push(tokio::spawn(async move
tokio::time::sleep(std::time::Duration::from_millis(3000)).await;
tx.unbounded_send("Message").unwrap();
));
let mut select_all = select_all(recv);
loop
futures::select!
msg = select_all.next() =>
println!(":#?", msg);
_ = futures.select_next_some() =>
eprintln!("Thread died");
,
complete => break
Ok(())
请注意,这不是多线程,而是异步编程,您会生成异步 tokio 任务而不是线程。 我建议在这里阅读答案:What is the difference between asynchronous programming and multithreading?
【讨论】:
以上是关于从 rust 中的通道迭代器获取第一个接收到的值的主要内容,如果未能解决你的问题,请参考以下文章