如何组成异步操作?
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了如何组成异步操作?相关的知识,希望对你有一定的参考价值。
我正在寻找一种组合异步操作的方法。最终目标是执行异步操作,并使其运行完成,或者在用户定义的超时后返回。
出于示例性目的,假设我正在寻找一种方法来组合以下coroutines1:
IAsyncOperation<IBuffer> read(IBuffer buffer, uint32_t count)
{
auto&& result{ co_await socket_.InputStream().ReadAsync(buffer, count, InputStreamOptions::None) };
co_return result;
}
与socket_
是一个StreamSocket
实例。
和超时协程:
IAsyncAction timeout()
{
co_await 5s;
}
我正在寻找一种方法来以某种方式组合这些协程,一旦数据被读取或超时已经到期,它就会尽快返回。
这些是我目前评估的选项:
- C ++ 20协同程序:据我所知P1056R0,目前没有“能够创建和组合协同程序”的库或语言功能。
- Windows运行时提供的异步任务类型最终来自IAsyncInfo:同样,我没有找到任何允许我按照我需要的方式组合任务的工具。
- Concurrency Runtime:这看起来很有希望,特别是
when_any
函数模板看起来正是我需要的。
从那看起来我需要使用并发运行时。但是,我很难将所有部分组合在一起。我对如何处理异常以及是否需要取消相应的其他并发任务感到特别困惑。
问题是双重的:
- 并发运行时是唯一的选择(UWP应用程序)吗?
- 实现会是什么样的?
1这些方法是应用程序的内部方法。它们不需要让它们返回Windows Runtime兼容类型。
我认为最简单的方法是使用concurrency
库。您需要修改超时以返回与第一个方法相同的类型,即使它返回null。
(我意识到这只是部分答案......)
我的C ++糟透了,但我认为这很接近......
array<task<IBuffer>, 2> tasks =
{
concurrency::create_task([]{return read(buffer, count).get();}),
concurrency::create_task([]{return modifiedTimeout.get();})
};
concurrency::when_any(begin(tasks), end(tasks)).then([](IBuffer buffer)
{
//do something
});
正如Lee McPherson在另一个answer所建议的那样,Concurrency Runtime看起来是一个可行的选择。它提供了tasks,可以与其他人结合使用,使用continuation链接,以及与Windows Runtime异步模型无缝集成(请参阅Creating Asynchronous Operations in C++ for UWP Apps)。作为奖励,包括<pplawait.h>
标头为concurrency::task
类模板实例提供了适配器,可用作C ++ 20协程等待。
我无法回答所有问题,但这是我最终提出的问题。为了简单(并且易于验证)我使用Sleep代替实际的读取操作,并返回int
而不是IBuffer
。
任务的组成
ConcRT提供了几种组合任务的方法。根据要求,当任何提供的任务完成时,concurrency::when_any
可用于创建返回的任务。当只提供2个任务作为输入时,还有一个便利操作员(operator||
)可用。
异常传播
从任一输入任务中引发的异常不算作成功完成。当与when_any
任务一起使用时,抛出异常将不足以满足等待条件。因此,异常不能用于打破组合任务。为了解决这个问题,我选择了返回std::optional
,并在then
延续中提出了适当的例外。
任务取消
这对我来说仍然是一个谜。看来,一旦任务满足when_any
任务的等待条件,就不需要取消相应的其他未完成任务。一旦完成(成功或其他),他们就会默默地处理。
以下是使用前面提到的简化的代码。它创建一个由实际工作负载和超时任务组成的任务,两者都返回一个std::optional
。 then
continuation检查返回值,并在没有一个时抛出异常(即timeout_task
先完成)。
#include <Windows.h>
#include <cstdint>
#include <iostream>
#include <optional>
#include <ppltasks.h>
#include <stdexcept>
using namespace concurrency;
task<int> read_with_timeout(uint32_t read_duration, uint32_t timeout)
{
auto&& read_task
{
create_task([read_duration]
{
::Sleep(read_duration);
return std::optional<int>{42};
})
};
auto&& timeout_task
{
create_task([timeout]
{
::Sleep(timeout);
return std::optional<int>{};
})
};
auto&& task
{
(read_task || timeout_task)
.then([](std::optional<int> result)
{
if (!result.has_value())
{
throw std::runtime_error("timeout");
}
return result.value();
})
};
return task;
}
以下测试代码
int main()
{
try
{
auto res1{ read_with_timeout(3000, 5000).get() };
std::cout << "Succeeded. Result = " << res1 << std::endl;
auto res2{ read_with_timeout(5000, 3000).get() };
std::cout << "Succeeded. Result = " << res2 << std::endl;
}
catch( std::runtime_error const& e )
{
std::cout << "Failed. Exception = " << e.what() << std::endl;
}
}
产生这个输出:
Succeeded. Result = 42 Failed. Exception = timeout
以上是关于如何组成异步操作?的主要内容,如果未能解决你的问题,请参考以下文章