从 rust 中的通道迭代器获取第一个接收到的值

Posted

技术标签:

【中文标题】从 rust 中的通道迭代器获取第一个接收到的值【英文标题】:Get the first received value from an iterator of channels in rust 【发布时间】:2021-04-14 06:43:30 【问题描述】:

我有一个 futures::channel::mpsc::UnboundedReceiver<T> 的迭代器。我想处理接收者的每一个答案,一次只处理一个,同时还要处理其他未来。

这应该可以通过循环 futures::select! 来实现。但我需要某种方式从UnboundReceiver<T> 中获取解析值。 我尝试使用futures::future::select_all(Iter),但编译失败,错误为:futures::channel::mpsc::UnboundedReceiver<T> is not a future

游乐场示例是here。

【问题讨论】:

【参考方案1】:

futures::channel::mpsc::UnboundedReceiver 实现Stream 但不是未来,因此您可以通过调用futures::stream::select_all(recv) 创建SelectAll,然后通过调用select_all.next() 解析到下一条就绪消息. 我通过使用它来调整你的例子:

use futures::channel::mpsc, stream::self, StreamExt, select_all; // 0.3.8
use tokio; // 1.0.1

#[tokio::main]
async fn main() -> failure::Fallible<()> 
    let mut recv = Vec::new();
    let mut futures = stream::FuturesUnordered::new();
    for _i in 0..3 
        let (tx, rx) = mpsc::unbounded();
        recv.push(rx);
        futures.push(tokio::spawn(async move 
            tokio::time::sleep(std::time::Duration::from_millis(3000)).await;
            tx.unbounded_send("Message").unwrap();
        ));
    
    let mut select_all = select_all(recv);
    loop 
        futures::select! 
            msg = select_all.next() => 
                println!(":#?", msg);
            
            _ = futures.select_next_some() => 
                eprintln!("Thread died");
            ,
            complete => break
        
    
    Ok(())

请注意,这不是多线程,而是异步编程,您会生成异步 tokio 任务而不是线程。 我建议在这里阅读答案:What is the difference between asynchronous programming and multithreading?

【讨论】:

以上是关于从 rust 中的通道迭代器获取第一个接收到的值的主要内容,如果未能解决你的问题,请参考以下文章

从Rust中的迭代器填充切片的最佳方法是什么? [重复]

从 Rust 中的多个音频流中并行获取相同大小的块

获取下拉框第一个选项的值最后一个选项的值第二个选项的值

归并方法

IBM MQ 从接收通道获取数据

微信小程序将自定义组件获取的值传递给页面