使用 C++11 异步特性的管道数据流

Posted

技术标签:

【中文标题】使用 C++11 异步特性的管道数据流【英文标题】:Pipeline dataflow using C++11 async features 【发布时间】:2013-07-17 18:08:57 【问题描述】:

我正在尝试实现具有以下功能的多线程管道数据流框架:

    管道可以描述为无环有向图。每个节点执行一些处理,并具有任意数量的任意类型的输入和一个任意类型的输出。

    对于每个给定的输入数据实例,每个节点的执行次数不应超过一次,之后应缓存结果。尽管此缓存不应在内存中保留所需的时间,并且应在任何其他节点不再需要时将其删除。

    每个节点都应支持惰性求值,即仅在其他节点需要其输出时执行。

是否可以通过使用 C++11 多线程特性来实现这一点,尤其是 std::futurestd::promisestd::async?谁能给点线索?

【问题讨论】:

管道是否可以复用?也就是说,您是否想要一个描述管道的结构,然后每个输入令牌实例化一次以运行它? 是的,我就是这么想的。 【参考方案1】:

我相信使用async 框架实际上是相当简单的。

如果您查看std::launch,您会发现有一个延迟模式:

std::launch::deferred:任务在第一次请求结果时在调用线程上执行(惰性求值)

因此,您可以启动任务并仅在需要结果时才执行它。但是,由于您提到了无环图,您可能希望共享结果:不能共享 std::future(通过调用 std::async 返回);为此,您需要 std::shared_future

因此,总而言之:

// Disclaimer:
// Compiles but does not run, but I have not figured it out.
// See: http://ideone.com/XZ49Dg

#include <future>
#include <iostream>

int main() 
    std::shared_future<std::string> greeting = std::async(std::launch::deferred, []() 
        std::string s;
        std::cout << "Enter how you would like to be greeted: ";
        std::getline(std::cin, s);
        return s;
    );

    std::shared_future<int> choice = std::async(std::launch::deferred, []() 
        int c = 0;
        std::cout << "Pick any integer: ";
        std::cin >> c;
        return c;
    );

    std::shared_future<void> complete = std::async(std::launch::deferred, [=]() mutable 
        std::string const g = greeting.get();
        int const c = choice.get();

        std::cout << "Hello " << g << ", you picked " << c << "!\n";
    );

    complete.wait();

【讨论】:

感谢您的回答!尽管尚不清楚应如何处理不同的输入令牌以及将来不再需要时如何清除存储的值(即在将执行传递到后续阶段后应清除内存的顺序管道中)。 shared_future 应该以某种方式实例化并在图形本身内部传递,而不是作为全局变量。不过不知道该怎么做。

以上是关于使用 C++11 异步特性的管道数据流的主要内容,如果未能解决你的问题,请参考以下文章

无法使用带有 DCE 管道的异步 RPC 将数据推送到服务器

如何检查数据是不是已完成加载以使用异步管道 Angular 8

使用异步管道从可观察对象中读取对象字段

使用异步管道显示从应用程序状态检索到的 Observable 数据

使用角度异步管道以完全反应式方式刷新数据

离子生命周期取消/订阅 Firebase 数据库流(使用异步管道)