并非所有 std::packaged_task 在 std::async 调用中执行

Posted

技术标签:

【中文标题】并非所有 std::packaged_task 在 std::async 调用中执行【英文标题】:Not all std::packaged_tasks executed when inside std::async calls 【发布时间】:2021-10-19 03:53:14 【问题描述】:

我有一个相当复杂的代码,其中包含std::async 调用和std::packaged_task,但无法执行到最后。我将其简化为最小的可重现示例。

两个async函数一个接一个地被调用,其中有packaged_tasks使用std::async异步执行。然后我们都使用相应的future.wait() 方法等待两个异步函数完成。执行在futY.wait(); 处停止,第二个packaged_task 永远不会执行(没有第二个Inside handler func 日志)。

#include <iostream>     // std::cout
#include <future>       // std::packaged_task, std::future
#include <exception>
#include <vector>
#include <list>
#include <memory>
#include <functional>

std::list<std::function<bool(const std::vector<int> &data)>> handlers_;

std::future<std::vector<int>> countup(int from, int to) 
    std::function<std::vector<int>(std::exception_ptr, std::vector<int>)> func = [=] (std::exception_ptr ex, std::vector<int> data) 
        std::cout << "Inside handler func " << from << " " << to << std::endl;
        if (ex != nullptr) 
            std::rethrow_exception(ex);
        
        return data;
    ;
    auto packageP = std::make_shared<std::packaged_task<std::vector<int>(std::exception_ptr, std::vector<int>)>>(func);

    auto fut = packageP->get_future();
    handlers_.push_back([packageP] (const std::vector<int> &data) mutable -> bool 
            std::cout << "Calling handler with data, size: " << data.size() << std::endl;

            (*packageP)(nullptr, data);
            return data.size();
        
    );

    auto fut2 = std::async(std::launch::async, [=, &handlers_] 
            std::cout << "Before handler called " << from << to << std::endl;

            std::vector<int> vec ( to, from );

            auto res = (*handlers_.begin())(vec);

            std::cout << "Handler result " << res << " for " << from << " " << to << std::endl;
        );

    std::cout << "Called async in countup for " << from << " " << to << std::endl;

    return fut;


int main ()

  auto futX = std::async(std::launch::async, [] 
      auto fut1 = std::async(std::launch::async, [] 
        auto fut2 = countup(0, 2);

        std::cout << "Called X countup and waiting to finish" << std::endl;

        fut2.wait();

        auto vec = fut2.get();
        std::cout << "The X countup returned" << std::endl;
      );
      std::cout << "Called X async internal and waiting to finish" << std::endl;
      fut1.wait();

      return 2;
  );

  std::cout << "Called async X and waiting to finish" << std::endl;

  auto futY = std::async(std::launch::async, [] 
      auto fut1 = std::async(std::launch::async, [] 
        auto fut2 = countup(0, 3);

        std::cout << "Called Y countup and waiting to finish" << std::endl;
        fut2.wait();

        auto vec = fut2.get();
        std::cout << "The Y countup returned " << std::endl;
      );
      std::cout << "Called Y async internal and waiting to finish" << std::endl;
      fut1.wait();

      return 3;
  );

  std::cout << "Called async Y and waiting to finish" << std::endl;

  futX.wait();
  std::cout << "After async X and waiting to finish" << std::endl;

  futY.wait();
  std::cout << "After async Y and waiting to finish" << std::endl;

  int valueX = futX.get();                  // wait for the task to finish and get result
  int valueY = futY.get();                  // wait for the task to finish and get result

  std::cout << "The countdown lasted for " << valueX  << " " << valueY << " seconds" << std::endl;

  return 0;

日志如下:

Called async X and waiting to finish
Called async Y and waiting to finish
Called X async internal and waiting to finish
Called Y async internal and waiting to finish
Called async in countup for Before handler called 02
0 2
Calling handler with data, size: 2
Inside handler func 0Called async in countup for  2
Handler result 01 for 0 2
 Before handler called 03
3Calling handler with data, size: 
Called X countup and waiting to finish
The X countup returned
3
After async X and waiting to finish
Called Y countup and waiting to finish

能否请您解释一下为什么代码从不执行到最后?

【问题讨论】:

【参考方案1】:

当您调用 (*handlers_.begin()) 时,您正在重复使用相同的打包任务,我认为您在调用 handlers_.push_back 时遇到了数据竞争。

你不需要全局handlers_,你可以捕获一个本地处理程序。

#include <iostream>     // std::cout
#include <future>       // std::packaged_task, std::future
#include <exception>
#include <vector>
#include <list>
#include <memory>
#include <functional>

std::future<std::vector<int>> countup(int from, int to) 
    auto func = [=] (std::exception_ptr ex, std::vector<int> data) 
        std::cout << "Inside handler func " << from << " " << to << std::endl;
        if (ex != nullptr) 
            std::rethrow_exception(ex);
        
        return data;
    ;
    auto packageP = std::make_shared<std::packaged_task<std::vector<int>(std::exception_ptr, std::vector<int>)>>(func);

    auto fut = packageP->get_future();
    auto handler = [packageP] (const std::vector<int> &data) mutable -> bool 
        std::cout << "Calling handler with data, size: " << data.size() << std::endl;

        (*packageP)(nullptr, data);
        return data.size();
    ;

    auto fut2 = std::async(std::launch::async, [=]() mutable 
            std::cout << "Before handler called " << from << to << std::endl;

            std::vector<int> vec ( to, from );

            auto res = handler(vec);

            std::cout << "Handler result " << res << " for " << from << " " << to << std::endl;
        );

    std::cout << "Called async in countup for " << from << " " << to << std::endl;

    return fut;


int main ()

  auto futX = std::async(std::launch::async, [] 
      auto fut1 = std::async(std::launch::async, [] 
        auto fut2 = countup(0, 2);

        std::cout << "Called X countup and waiting to finish" << std::endl;

        fut2.wait();

        auto vec = fut2.get();
        std::cout << "The X countup returned" << std::endl;
      );
      std::cout << "Called X async internal and waiting to finish" << std::endl;
      fut1.wait();

      return 2;
  );

  std::cout << "Called async X and waiting to finish" << std::endl;

  auto futY = std::async(std::launch::async, [] 
      auto fut1 = std::async(std::launch::async, [] 
        auto fut2 = countup(0, 3);

        std::cout << "Called Y countup and waiting to finish" << std::endl;
        fut2.wait();

        auto vec = fut2.get();
        std::cout << "The Y countup returned " << std::endl;
      );
      std::cout << "Called Y async internal and waiting to finish" << std::endl;
      fut1.wait();

      return 3;
  );

  std::cout << "Called async Y and waiting to finish" << std::endl;

  futX.wait();
  std::cout << "After async X and waiting to finish" << std::endl;

  futY.wait();
  std::cout << "After async Y and waiting to finish" << std::endl;

  int valueX = futX.get();                  // wait for the task to finish and get result
  int valueY = futY.get();                  // wait for the task to finish and get result

  std::cout << "The countdown lasted for " << valueX  << " " << valueY << " seconds" << std::endl;

  return 0;

See it on coliru

【讨论】:

非常感谢@Caleth 的回答。事实上,我错误地只执行了一个包,一旦我删除了处理程序列表,问题就消失了。不幸的是,我在简化代码时引入了这个问题。至于我最初的问题,如果有任何理由在我的示例中可以在为futY 设置值之前执行此行int valueX = futX.get();,您能否分享一下? @rightaway717 你是什么意思?您有一些不确定排序的输出,但最终都以允许的顺序之一到达那里 我的意思是,由于某种原因,所有(有多个)future.wait() 调用都被解除阻塞,并且执行继续从期货 (future.get()) 中获取值,尽管并非所有 std::async 方法都适用于那些期货似乎回来了(没有日志)。代码挂在第一个future.get() 调用上。 @rightaway717 X 和 Y 发生在不同的线程上。他们可以交错。你想达到什么目的? 再次澄清一下。首先,有一系列std::async 调用将结果存储在期货中。然后我对这些期货进行一系列的 wait() 调用。因此,当所有异步函数返回所有期货时,wait() 调用必须一个一个地解除阻塞,我将通过一系列.get() 调用从期货中提取值。在所有相应的异步方法返回之前,所有期货是否有可能在 wait() 上解除阻塞?

以上是关于并非所有 std::packaged_task 在 std::async 调用中执行的主要内容,如果未能解决你的问题,请参考以下文章

为啥 std::future 从 std::packaged_task 和 std::async 返回不同?

何时在 async 或 packaged_task 上使用 promise?

std::packaged_task 真的很贵吗?

线程 packaged_task future

packaged_task 和 async 有啥区别

何时更喜欢 lambda 而不是带有 std::async 的打包任务? [复制]