C++异步:structured concurrency实现解析!

Posted 腾讯云开发者

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了C++异步:structured concurrency实现解析!相关的知识,希望对你有一定的参考价值。

导语 | 本篇我们将介绍整个libunifex structure concurrency的实现思路,并结合一部分具体的cpo实现,对这部分的实现做深入分析。

前篇《C++异步:libunifex中的concepts详解!》中我们介绍了libunifex作为框架部分的concept设计,本篇我们将在这个基础上,继续介绍整个libunifex structure concurrency的实现思路,并结合一部分具体的cpo实现,对这部分的实现做深入分析。

一、Structured Concurrency

我们以一个简短的示例代码开启本章的内容:

single_thread_context tcontext;
int count = 0;
schedule(tcontext.get_scheduler()) 
  | then([&]  ++count; ) 
  | sync_wait();

这段代码的表达方式前面我们也介绍过,主要使用了ranges类同的pipeline表达,我们可以简单将这种表达方式看成是C++的一种特殊LINQ实现,一个专有的DSL,当然,作为一个DSL来说,就execution的整体设计而言,它被赋予了一些专有的特性和意义:所以,如果从一个DSL的角度来看execution的结构化 concurrency,我们容易得出类似以下的观点,对于execution的pipeline表达:


  • DSL定义(BNF组成)-首先是范式的组成,如上图所示,业务使用结构化并发表达的时候,整个范式是由Concurrency Pipeline::= Sender Factory '|' Sender Adapter '|' Receiver组成的。

  • Compiler-通常情况我们可以将|操作以及connect()加起来成是编译过程, 借由Compiler Time的特性支持, 我们可以通过connect()产生runtime所需的OperationState。

  • Execute-这个阶段就很自然了,OperationState的start()就是DSL本身执行的入口点,当然,执行结果最后是通过:set_value,set_error,set_done这几个receiver cpos来传递的。

本篇中我们将以这种思路结合一些Sender Factory,Send Adapter节点,以及这种结果处理节点的具体实现来展开.sync_wait()。


二、Sender Factory

各种Sender Factory cpos用于产生各类sender,前面我们也介绍过, sender最大的特征就是会触发set_value,set_error,set_done这几个用于结束通知的receiver cpos。此处我们以just() 为例,来看一下一个Sender Factory需要包含的实现内容,在后续文章中我们会再介绍另外一个schedule() cpo的实现。


(一)just实现解析

首先just(values...)的语义, 就是生成一个sender, 该sender可以向后续的节点通过receiver cpos传递values..., 我们具体来看一下libunifex的just()实现, 会比大家想的复杂一些, 这主要还是因为execution实现的整体思路就是在尝试定义一个DSL, 然后这个DSL本身是自恰的, 比如对于just()来说, 必然会包含以下几个部分:

  • sender生产方法-just() cpo本身。

  • just::sender的实现-具体的sender实现。

  • 相关的OperationState-节点可参与异步操作执行,则必然可以通过connect()来产生其OperationState对象,最后对start()作出响应。

我们来分别看一下这三部分的具体代码实现:


  • sender生产方法

constexpr auto just_cpo::operator()(Values&&... values) const 
  return _just::sender<Values...>std::in_place, (Values&&)values...;

just()的入口定义比较简单,主要是根据输入的变参values...构造一个_just::sender<Values...>对象并返回。这就是我们下一节要介绍的sender实现。

bind_back()版的operator()重载主要用于pipeline组织,代码大量雷同,本篇将统一忽略,方便源码的阅读,有兴趣的读者可以自行翻阅相关的实现。

  • just::sender的实现

_just::sender<> 其实真实类型是 _just::_sender<>::type,这个只是libunifex惯用的一种封装方式,具体的实现如下:

template <typename... Values>
class just::sender<Values...> 
 private:
  std::tuple<Values...> values_;
 public:
  template </*...*/>
  using value_types = Variant<Tuple<Values...>>;


  template </*...*/>
  using error_types = Variant<std::exception_ptr>;


  static constexpr bool sends_done = false;


  template<typename... Values2>
  explicit sender(std::in_place_t, Values2&&... values)
    : values_((Values2 &&) values...) 


  template<typename This, typename Receiver>
  friend auto tag_invoke(tag_t<connect>, This&& that, Receiver&& r)
      -> operation<Receiver, Values...> 
    return static_cast<This&&>(that).values_, static_cast<Receiver&&>(r);
  
;

这是一个很标准的sender实现,如我们在《C++异步:libunifex中的concepts详解!》中介绍的一样。首先是sender traits需要的类型定义部分,决定了sender可能触发的receiver cpos的参数和类型:

template </*...*/>
  using value_types = Variant<Tuple<Values...>>;


  template </*...*/>
  using error_types = Variant<std::exception_ptr>;


  static constexpr bool sends_done = false;

其次是通过tag_invoke定义的connect()实现:

template<typename This, typename Receiver>
  friend auto tag_invoke(tag_t<connect>, This&& that, Receiver&& r)
      -> operation<Receiver, Values...> 
    return static_cast<This&&>(that).values_, static_cast<Receiver&&>(r);
  

此处返回的operation<>也是我们下一节要介绍的just()专用的OperationState实现。

  • 相关的OperationState

template <typename Receiver, typename... Values>
struct just::operation<Receiver, Values...>::type 
  std::tuple<Values...> values_;
  Receiver receiver_;


  void start() & noexcept 
    try 
      std::apply(
          [&](Values&&... values) 
            execution::set_value((Receiver &&) receiver_, (Values &&) values...);
          ,
          std::move(values_));
     catch (...) 
      execution::set_error((Receiver &&) receiver_, std::current_exception());
    
  
;

抛开绕来绕去的alias name来说, 这个OperationState的实现很简单, 存储了传入的values...和connect()时关联的Receiver, 并且在start()时向存储的Receiver调用set_value()传递存储下来的values...


(二)本章小结

对于一个sender factory类型的cpo来说,我们始终可以将其实现简单的分成以下几部分:

  • sender生产方法-如just()。

  • sender的实现-具体的sender实现。

  • 相关的OperationState-节点可参与异步操作执行,则必然可以通过connect()来产生其OperationState对象,最后对start()作出响应。因为用于产生一个sender,这类节点一般都出现在structured concurrency描述的最左侧,负责作为后续节点的数据来源,如最开始的示例代码中那样。

三、Sender Adapter

首先我们知道Sender Adapter是作为中间节点存在的:

Concurrency Pipeline ::= Sender Factory  '|' Sender Adapter  '|' Receiver

我们先来看一下Sender Adapter语义层面的特征:

  • Sender Adapter是Sender的包装器,接收前置Sender对象后形成新的Sender对象。

  • 新的Sender对象有自己的异步类型定义,同样也通过receiver cpos向后续节点传递异步操作结果。

Sender Adapter其实就像一个filter,它对原始的异步处理结果进行加工,产生新的结果,大致的工作情况如下图所示:

如上图所示,对于一个Sender Adapter定义,至少会包含两个对象:

Internal Receiver-用于接收Previous Sender发送的结果,处理自己的逻辑后再将结果发往后续节点。

Internal Sender-SenderAdapter(Sender,args...)形成一个新的Sender,连接到后续节点。当然,还会有一个用于作为入口的cpo。

我们具体以比较常用的then()的实现来具体看一下libunifex中一个典型的Sender Adapter是如何实现的:


(一)then() cpo

then()节点的作用是从上一个节点中获取异步返回值后,用该返回值作为输入值调用传入then()节点的函数后,将调用结果作为异步操作的结果返回后续节点,简单的图示如下:

对应的代码实现为:

template<typename Sender, typename Func>
auto then_cpo::operator()(Sender&& predecessor, Func&& func) const
  -> _result_t<Sender, Func> 
  return execution::tag_invoke(_fn, (Sender&&)predecessor, (Func&&)func);



template<typename Sender, typename Func>
auto then_cpo::operator()(Sender&& predecessor, Func&& func) const
  -> _result_t<Sender, Func> 
  return _then::sender<Sender, Func>(Sender &&) predecessor, (Func &&) func;

;

then()调用的处理区分了传入的Func是否可tag_invoke的判断,我们直接看最通常的情况,传入的是普通函数:

template<typename Sender, typename Func>
auto then_cpo::operator()(Sender&& predecessor, Func&& func) const
  -> _result_t<Sender, Func> 
  return _then::sender<Sender, Func>(Sender &&) predecessor, (Func &&) func;

最后返回的是一个_then命名空间下定义的_then::sender<>对象,并且这个对象将前置的Sender对象和传入的func作为构造这个对象的参数。我们来看一下这个sender的具体实现:


(二)then()的Internal Sender实现

template <typename Predecessor, typename Func>
struct then::sender<Predecessor, Func>::type 
  Predecessor pred_;
  Func func_;
 private:


  template <typename... Args>
  using result = /*unspecified*/;
public:


  template </*unspecified*/>
  using value_types = /*unspecified*/;


  template </*unspecified*/>
  using error_types = /*unspecified*/;


  static constexpr bool sends_done = sender_traits<Predecessor>::sends_done;


  template <typename Receiver>
  using receiver_t = receiver_t<Receiver, Func>;




  template<typename Sender, typename Receiver>
  friend auto tag_invoke(tag_t<execution::connect>, Sender&& s, Receiver&& r)
      -> connect_result_t<member_t<Sender, Predecessor>, receiver_t<remove_cvref_t<Receiver>>> 
    return execution::connect(
      static_cast<Sender&&>(s).pred_,
      receiver_t<remove_cvref_t<Receiver>>
        static_cast<Sender&&>(s).func_,
        static_cast<Receiver&&>(r));
  
;

跟我们前面看到的just()内的sender实现一样,包含了基本的sender types定义,以及sender相关的connect()tag_invoke定义:

template<typename Sender, typename Receiver>
  friend auto tag_invoke(tag_t<execution::connect>, Sender&& s, Receiver&& r)
      -> connect_result_t<member_t<Sender, Predecessor>, receiver_t<remove_cvref_t<Receiver>>> 
    return execution::connect(
      static_cast<Sender&&>(s).pred_,
      receiver_t<remove_cvref_t<Receiver>>
        static_cast<Sender&&>(s).func_,
        static_cast<Receiver&&>(r));
  

我们可以看到,对then()的sender进行connect()的时候,真正发生connect()的是我们之前在then(Previous Sender,Func)调用时缓存下来的上一节点,以及新构建出的receiver_t<>对象,这个对象也是Func真正被执行的地方,同时这个对象也保存了后续的Reciver节点,方便向后续节点传递异步执行结果。


(三)then()的Internal Receiver实现

template <typename Receiver, typename Func>
struct then::receiver_t<Receiver, Func>::type 
  Func func_;
  Receiver receiver_;


  template <typename... Values>
  void set_value(Values&&... values) && noexcept 
    using result_type = std::invoke_result_t<Func, Values...>;
    if constexpr (std::is_void_v<result_type>) 
      if constexpr (noexcept(std::invoke(
                        (Func &&) func_, (Values &&) values...))) 
        std::invoke((Func &&) func_, (Values &&) values...);
        execution::set_value((Receiver &&) receiver_);
       else 
        try 
          std::invoke((Func &&) func_, (Values &&) values...);
          execution::set_value((Receiver &&) receiver_);
         catch (...) 
          execution::set_error((Receiver &&) receiver_, std::current_exception());
        
      
     else 
      if constexpr (noexcept(std::invoke(
                        (Func &&) func_, (Values &&) values...))) 
        execution::set_value(
            (Receiver &&) receiver_,
            std::invoke((Func &&) func_, (Values &&) values...));
       else 
        try 
          execution::set_value(
              (Receiver &&) receiver_,
              std::invoke((Func &&) func_, (Values &&) values...));
         catch (...) 
          execution::set_error((Receiver &&) receiver_, std::current_exception());
        
      
    
  


  template <typename Error>
  void set_error(Error&& error) && noexcept 
    execution::set_error((Receiver &&) receiver_, (Error &&) error);
  


  void set_done() && noexcept 
    execution::set_done((Receiver &&) receiver_);
  
;

到receiver的实现这里就很自然了,通过set_value()接受前面的Sender传递过来的结果,将结果作为输入参数调用Func后,再通过set_value()向后续节点传递Func的返回值。


(四)本章小结

对于一个Sender Adapater类型的cpo来说,主要需要完成以下几件事情:

  • 入口cpo(如then())-完成对前置Sender的接收和需要的参数的接收处理,创建一个专用的Internal Sender对象并返回。

  • Internal Sender-存储前置Sender和需要的参数,并实现tag_invoke(tag_t<execution::connect>)用于构建InternalReceiver,并将实际的connect()操作重定向到保存下来的前置Sender和新创建的InternalReceiver上。

  • InternalReceiver-获取前置Sender的异步结果,并在处理自身逻辑后,将最终的结果返回给后续节点。整体上来说可以将这看成一种wrapper机制, set_value是拦截点,在拦截点上插入自身逻辑,最后依然还是通过set_value返回下一步需要的异步执行结果。


四、sync_wait_r()与sync_wait()

libunifex的实现并没有提供一个类似default receiver的节点,但提供了工具节点sync_wait_r()和sync_wait(),当然,除了通过这种方式来处理返回结果,你也可以自行实现一个自己的Receiver来接收异步返回值。本章我们主要介绍sync_wait_r()和sync_wait()的实现,通过这两者,我们也能更深入理解libunifex常规状态下是如何发起一个异步操作执行并接收其返回结果的。


(一)cpo入口

sync_wait():

template<typename Sender>
auto sync_wait_cpo::operator()(Sender&& sender) const
    -> std::optional</*...*/> 
  using Result = /*...*/;
  return _sync_wait::_impl<Result>((Sender&&) sender);

sync_wait_r():

template <typename Result>
decltype(auto) sync_wait_r_cpo::operator()(Sender&& sender) const 
  using Result2 = non_void_t<wrap_reference_t<decay_rvalue_t<Result>>>;
  return _sync_wait::_impl<Result2>((Sender&&) sender);

两者代码高度相似:

  • 输入参数都是Sender。

  • 利用_sync_wait::_impl<>来完成具体的实现。

两者的差异:

  • sync_wait_r<Result>允许业务侧指定返回值的类型,不支持pipeline操作,一般直接以sync_wait_r<Result>(Sender)的方式来使用。

  • sync_wait 直接使用传入的Sender来推导返回值类型,可以作为pipeline的终结节点使用,如just(1)|sync_wait()。

我们接下来看一看sync_wait和sync_wait_r都引用的_sync_wait::_impl的实现:

(二)sync_wait::_impl的实现

auto _impl(Sender&& sender) 
  manual_event_loop ctx;
  // Store state for the operation on the stack.
  auto operation = connect(
      (Sender&&)sender,
      _sync_wait::receiver_t<Result>promise, ctx);


  start(operation);


  ctx.run();


  // ... (retsult handling here)

整体实现比较简洁,我们主要关注几点:

  • _impl()最终的返回值类型为std::optional<Result>。

  • 整个函数的实现完成了前面的们提到的connect()产生OperationState,再执行start()的过程。

  • connect()时与传入的Sender进行连接的Receiver是自定义的_sync_wait::_receiver<T>::type类型。

  • ctx.run()等待最终执行的完成(相关详细分析可参考后续文章)。

  • 根据promise.state_记录的类型对返回值进行处理(正确返回值还是抛异常)。

剩下的就只有_sync_wait::receiver_t<>的实现了,我们接着来看一下这部分的实现:


(三)_sync_wait::receiver_t<>的实现

template <typename T>
struct sync_wait::receiver_t 
    promise<T>& promise_;
    manual_event_loop& ctx_;


    template <typename... Values>
    void set_value(Values&&... values) && noexcept 
      try 
        execution::activate_union_member(promise_.value_, (Values&&)values...);
        promise_.state_ = promise<T>::state::value;
       catch (...) 
        execution::activate_union_member(promise_.exception_, std::current_exception());
        promise_.state_ = promise<T>::state::error;
      
      signal_complete();
    


    void set_error(std::exception_ptr err) && noexcept 
      execution::activate_union_member(promise_.exception_, std::move(err));
      promise_.state_ = promise<T>::state::error;
      signal_complete();
    


    void set_error(std::error_code ec) && noexcept 
      std::move(*this).set_error(make_exception_ptr(std::system_errorec, "sync_wait"));
    


    template <typename Error>
    void set_error(Error&& e) && noexcept 
      std::move(*this).set_error(make_exception_ptr((Error&&)e));
    


    void set_done() && noexcept 
      promise_.state_ = promise<T>::state::done;
      signal_complete();
    
  private:
    void signal_complete() noexcept 
      ctx_.stop();
    
;

这就是一个很标准的receiver实现,利用set_value,set_error,set_done的重载来完成对前置Sender执行结果的获取,通过前面的代码我们容易知道,如果是无异常的状态,则正常的通过std::optional<>来返回执行结果,否则抛出异常。另外,代码中的signal_complete()用于通知_impl函数中的 ctx.run()返回,最终向用户返回异步操作的结果。


五、总结

本篇我们从libunifex的structured concurrency设计开始,简述了整套execution整套DSL的组织和执行的逻辑,并结合具体的:

  • Sender Factory实现举例-just()。

  • Sender Adapter实现举例-then()。

  • 终结节点-sync_wait()和sync_wait_r()加深大家对execution各类节点实现的理解。

structured concurrency的设计是整个库的核心,理解了它,也能方便我们理解一些基础节点的实现,也为自己定制更多业务化的节点提供良好的基础。这也是为什么execution库也被当成一个库作者向的特性的原因,与其说它是一个异步库,不如说它在尝试定义一套从DSL到执行态都比较完备的c++异步专用语言。当然,后者的学习成本比学习一个库明显会高出比较多。

参考资料:

1.libunifex源码库

 作者简介

沈芳

腾讯后台开发工程师

IEG研发效能部开发人员,毕业于华中科技大学。目前负责CrossEngine Server的开发工作,对GamePlay技术比较感兴趣。

 推荐阅读

C++异步:libunifex中的concepts详解!

图文并茂!带你深度解析Kubernetes

万卷共知,一书一页总关情,TVP读书会带你突围阅读迷障!

C++异步变化:libunifex实现!

温馨提示:因公众号平台更改了推送规则,公众号推送的文章文末需要点一下“赞”和“在看”,新的文章才会第一时间出现在你的订阅列表里噢~

以上是关于C++异步:structured concurrency实现解析!的主要内容,如果未能解决你的问题,请参考以下文章

C++文档阅读笔记-Difference Between C Structures and C++ Structures

C++文档阅读笔记-Difference Between C Structures and C++ Structures

C++文档阅读笔记-Difference Between C Structures and C++ Structures

python异步并发模块concurrent.futures入门详解

线程-线程池-concurrent.futures模块

线程-线程池-concurrent.futures模块