解决并发编程之痛的良药--结构化并发编程

Posted 高可用架构

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了解决并发编程之痛的良药--结构化并发编程相关的知识,希望对你有一定的参考价值。

作者简介:曹家锋,Westar实验室技术专家。Westar实验室( westar.io),成立于 2018 年,关注于区块链及分布式前沿技术,包括区块链分层架构、二层路由,网络性能、智能合约、PoW 优化等。


并发,是程序员在日常编程中难以绕开的话题,本文介绍一种并发编程范式-结构化并发(Structured Concurrency)。首先给出它的概念和现状,然后着重介绍 Rust 的一个实现 - task_scope,最后给出一个例子展示如何在实践中使用。


The Pain of concurrency programming


熟悉 Go 语言的朋友都知道,可以通过 go myfunc() 轻易的创建一个和当前协程并发执行的 task。但是,当程序变复杂, go statement 变的越来越多时,就会遇到各种 task 生命周期的问题。

  • 这个 task 什么时候开始,什么时候结束?

  • 怎么做到当所有 subtask 都结束,main task再结束?

  • 假如某个 subtask 失败,main task 如何cancel 掉其他subtask?

  • 如何保证所有 subtask 在某个特定的超时时间内返回,无论它成功还是失败?

  • 更进一步,如何保证 main task 在规定的时间内返回,无论其成功还是失败,同时 cancel 掉它产生的所有 subtask?

  • main task 已经结束了,subtask 还在 running,是不是存在资源泄漏?


以上只是我根据自己过往的经验,随便列举的几类问题。当然这些问题在 Golang 里面都是可以解的,具体可以参考 Golang Official Blog 里几篇讲 Golang Concurrency Patterns 的文章。它需要程序按照一些特定的行为方式去组织,比如说方法参数带上 Context,通过它去传递 cancellation 信号。

Go Concurrency Patterns: Context[1]

Go Concurrency Patterns: Pipelines and cancellation[2]

Go Concurrency Patterns: Timing out, moving on[3]

Advanced Go Concurrency Patterns[4]

在多线程模型中,上面几个问题给程序员带来了更多复杂性和更重的心智负担。我相信大部分 Java 程序员都无法准确的把上面几个问题都解决掉,这不是嘲讽,而是线程模型本身给使用者带来的诸多问题,这对使用者的要求实在是太高了。

那么,有没有一种编程范式,既可以解决这些问题,又具有相对比较低的认知门槛,同时也不需要像 Golang Context 那样侵入应用程序的接口?结构化并发(Structured Concurrency) 就是这样一种并发编程范式。


Structured Concurrency


2016年,ZerMQ 的作者 Martin Sústrik 在他的文章[5]中第一次形式化的提出结构化并发这个概念。2018 年 Nathaniel J. Smith (njs) 在 Python 中实现了这一范式 - trio[6],并在 Notes on structured concurrency, or: Go statement considered harmful[7] 一文中进一步阐述了 Structured Concurrency。同时期,Roman Elizarov 也提出了相同的理念[8],并在 Kotlin 中实现了大家熟知的kotlinx.coroutine[9]。2019年,OpenJDK loom project 也开始引入 structured concurrency,作为其轻量级线程和协程的一部分。


废话这么多,一方面是想提供更多的历史,方便读者更深入的了解,另一方面也是想说明,结构化并发虽然还是一个比较新的概念,具体的细节也在不断演进中,但已经有成熟的工业界实现,读者可以在自己熟悉的语言中应用该范式。

lidill(C): http://libdill.org/

trio(Python): https://trio.readthedocs.io/en/stable/

kotin.coroutine: https://github.com/Kotlin/kotlinx.coroutines

Venice(Swift): https://github.com/Zewo/Venice

Structured Concurrency 核心在于通过一种 structured 的方法实现并发程序,用具有明确入口点和出口点的控制流结构来封装并发“线程”(可以是系统级线程也可以是用户级线程,也就是协程,甚至可以是进程)的执行,确保所有派生“线程”在出口之前完成。


说的可能有点抽象,举个例子。


 
func main_func() { go myfunc() go anotherfunc() <rest of program>...}


假设上图中的代码具有 structured concurrency 特性(这里用的是 golang 的语法来展示)。main_func 里,创建了两个子任务:myfunc(), anotherfunc,这里的 func 是一个控制流结构,入口就是 func 调用开始,出口是 func 调用结束,派生出来的两个子任务需要在 main_func 调用结束之前先完成。当 main_func 结束,它涉及到的资源也都会被释放掉。外部调用者无法也无需感知 main_func 里面到底是串行的还是并行的,它只需要调用 main_func,然后等待它结束即可。这就是所谓的 Structured。

 

大家应该都知道 goto 语句,一般不推荐使用它(见Dijkstra: Go To Statement Considered Harmful),使用 goto 的场景基本都可以用 if, else, for loop, while loop 这些控制结构组合表达,可以把这些控制结构叫做 structured statement。

Structured Concurrency 的概念和 structured statement 类似,通过控制流来保证并发语义的可控,而不是 go coroutine 满天飞。

关于这方面的类比,njs 在 Notes on structured concurrency, or: Go statement considered harmful 中做了详细的说明,推荐阅读。

以上是 Structured Concurrency 的核心概念,看起来是不是很简单。下面就跟着我去看看,在 Rust 里你可以怎样实现 Structured Concurrency。


Implement Structured Concurrency in Rust


目前 Rust 并没有一个成熟的库支持 Structured Concurrency 的编程范式。但是 tokio#1879[10] 这个 issue 中已经开始讨论了如何在 tokio 这个底层库中提供支持,以实现 structured concurrency 风格的编程。如果你比较感兴趣,欢迎去这里贡献你的力量。


本节以 Rust 社区另外一个库 - task_scope 来介绍这种编程范式。task_scope 是一个日本小哥写的一个试验性质的库。在阅读和试验它时,我认为它提供的接口在使用上很别扭,不便于实现更复杂的并发逻辑,于是基于自己的经验,我把它的对外接口抽象成 Scope 和 CancelScope。这两个概念是继承自 trio 的实现,Scope 对应 trio 的 nursery,CancelScope 对应 trio 的 cancel_token。fork 版本见 startcoinorg/task_scope[11]。


Scope


为了将具有 Structured Concurrency 行为的代码与普通的异步代码区别出来,我在 task_scope 中引入了Scope 这个实体。所有 structured concurrency 的异步代码都必须在 Scope 的作用域中完成。下面给出用 task_scope 实现之前例子的伪代码。


 
let scope = Scope::new();scope.run(|spawner| async { spawner.spawn(myfunc()); spawner.spawn(anotherfunc()); <rest of program>...}).await;


Scope 作为 Structured Concurrency 的控制结构,任何想要进行 structured concurrency 编程的代码都必须初始化出一个 Scope 对象,调用 Scope.run 打开了这个控制结构的入口,在控制结构里面,可以随意的 spawn 子任务。myfunc 和 anotherfunc 都是运行在这个 scope 里。没有 Scope,父任务无法开启新的子任务,这保证了 Scope 是 Structured Concurrency 的唯一入口。最重要的是,只有当所有子任务都结束时,父任务才会结束,如果父任务在子任务结束前,就已经执行完自己的代码块,那么它需要暂停自己,并等待所有子任务结束。


 

Golang 的 go 语句最主要的问题是,当你调用了一个函数,并且函数返回了,然而你不知道它是否开启了一个/些后台任务,这些后台任务在函数返回后也不会结束(无论是有意的还是无意的)。这打破了函数的抽象,破坏了它的封装性。通过 Scope 抽象的 Structured Concurrency,就没有这个问题。任何一个函数都可以通过 scope 来运行多个并发的任务,但是函数无法返回,除非所有的子任务都完成了。因此,当一个函数返回了,你知道它是真的返回了,而不会有其他遗漏的子任务。


Timeout and Cancellation


还记得我们在开篇提到的几个问题吗,里面涉及到超时和取消:如何保证 main task 在规定的时间内返回,无论其成功还是失败,同时 cancel 掉它产生的所有 subtask? 这一节,我们来聊聊这个问题。


我在 task_scope 中为 Scope 提供了一个方法 pub fn cancel_scope(&self) -> CancelScope,来获取这个 scope 的 cancel_token。调用 CancelScope.cancel/force_cancel 方法可以取消正在执行的 scope,前者给予 scope 一定的机会做优雅退出,后者则没有。以下是一个更加完善的例子,加入 cancel scope 的概念。


 
let scope = Scope::new();let cancel_token = scope.cancel_scope();scope.run(|spawner| async { let handle1 = spawner.spawn(myfunc()); let handle2 = spawner.spawn(anotherfunc()); <rest of program>... let result = select(handle1, handle2, delay(1000)).await; if let Err(Timeout) = result { cancel_token.cancel(); }}).await;


main task 的最后,加入一个超时判断,select(handle1, handle2, delay(1000)).await,如果 handle1 和handle2 都没有在 delay(1000) 之后完成,那么就返回 Timeout,然后通过 cancel_token.cancel()取消scope的执行,这会导致 scope 里所有 child tasks 都收到 Cancel 信号,这些 child task 在下一次被调度器调度执行时,会直接退出执行。(task_scope 无法打断正在被调度器执行的 future,所以只能等到 future yield 后,下次被调度时退出,也就是说,future 中 await 的地方就是 cancel 信号的 checkpoint)。


 

Scope in Scope


当并发逻辑变得复杂,我们就会遇到在 Scope 里面开启新 Scope 的情况。一般来讲, scope 会被封装在函数里,函数的外部调用者不知道函数里是否开启了 scope。假如说,外部调用者本身是在一个 scope 里调用这个函数,就会出现 scope in scope。这种情况下,Structured Concurrency 的特性依然保持不变。


 
let scope = Scope::new();scope.run(|spawner| async { spawner.spawn(func_a()); spawner.spawn(anotherfunc()); <rest of program>...}).await;
async fn func_a() { let scope = Scope::new(); scope.run(|spawner| async { spawner.spawn(myfunc()); <rest of program>... }).await}


func_a 是一个封装有 scope 的函数,当它的子任务都完成时,它才会返回。外部调用 spawn 了 func_a 作为子任务执行,它也会等 func_a 完成后再结束。


 

timeout 和 cancellation 又如何处理的呢?当外部调用者的 scope 被 cancel 时, cancel 信号传递到每个 child task 里,child task future 检查自己是否被外部的 scope cancel 掉。


  • 如果是 graceful cancel,它会给自己的子任务也发送 graceful cancel 信号,然后继续执行或者等待,直到所有子任务都退出;

  • 如果是 force cancel,那它就给自己的子任务发送 force cancel ,然后直接退出。


这样,cancel 信号,就会通过 scope -> subscope -> sub-subscope 一层一层的往下传递,形成一个 cancel tree,通过 root 往下派发。


Practice


讲完这些基本概念,最后给读者留一个比较经典的练习题去尝试下使用 Structured Concurrency 编程。


Happy Eyeballs[12] 算法是 njs 在演示 trio 时使用的示例,他给出的 Python 实现[13]可以在这里找到。这是一个非常好的示例,强烈建议读者动手去试一试如何利用自己已有的经验去实现它(很有可能你写不出来),然后再尝试用 Strucutred Concurrency 的范式去实现。


最后,给出我用 task_scope 实现的 Rust 版本(https://github.com/starcoinorg/task_scope/blob/master/examples/happy_eyeball.rs)。


文中链接

[1] https://blog.golang.org/context

[2] https://blog.golang.org/pipelines

[3] https://blog.golang.org/go-concurrency-patterns-timing-out-and

[4] https://blog.golang.org/advanced-go-concurrency-patterns

[5] http://250bpm.com/blog:71

[6] https://trio.readthedocs.io/en/stable/

[7] https://vorpus.org/blog/notes-on-structured-concurrency-or-go-statement-considered-harmful/

[8] https://medium.com/@elizarov/structured-concurrency-722d765aa952

[9] https://github.com/Kotlin/kotlinx.coroutines

[10] https://github.com/tokio-rs/tokio/issues/1879

[11] https://github.com/starcoinorg/task_scope

[12] https://en.wikipedia.org/wiki/Happy_Eyeballs

[13] https://github.com/python-trio/trio/blob/master/trio/_highlevel_open_tcp_stream.py


参考阅读:





高可用架构

改变互联网的构建方式


以上是关于解决并发编程之痛的良药--结构化并发编程的主要内容,如果未能解决你的问题,请参考以下文章

Java并发编程实战 04死锁了怎么办?

Java并发编程实战 04死锁了怎么办?

Java编程思想之二十 并发

全栈编程系列SpringBoot整合Shiro(含KickoutSessionControlFilter并发在线人数控制以及不生效问题配置启动异常No SecurityManager...)(代码片段

《java并发编程实战》

JUC并发编程 共享模式之工具 JUC CountdownLatch(倒计时锁) -- CountdownLatch应用(等待多个线程准备完毕( 可以覆盖上次的打印内)等待多个远程调用结束)(代码片段