Scala并发编程

Posted 小熊的技术之路

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Scala并发编程相关的知识,希望对你有一定的参考价值。

我之前总结过一篇的文章,可以看到Java本身的多线程开发是挺复杂的,涉及到很多底层的并发组件,例如锁,信号量等,一般来说使用这种方式写出的代码适合底层库的实现,但是对于业务代码,更合适的选择是使用一些并发编程框架,这个在《七周七并发模型》里面也有阐述,这篇文章我的目的是总结一下最近1-2年使用Scala并发编程的感受以及基础的Scala并发知识点。


Scala的并发编程相对于Java更加的抽象,开发效率要高不少,这得益于Scala的异步编程模型:Future和Promise,基于这两个可以写出功能强大且不容易出错的代码。当然更加强大的是Scala的Actor模型,也就是AKKA,不过这个不在本文阐述。下面分为几个步骤总结:

  1. 理解Future

  2. 理解Promise

  3. Future和Promise的实现原理


理解Future


Future的定义

Future的英文含义是:未来,在Java系后端开发中,Future一般指一个未来某个时间点能够获取的数据,而这个过程中会发生异步计算(一般情况下是从线程池获取另一个线程进行计算)。Scala就是通过Future及相关类这种高维抽象把异步计算封装起来,使之可以方便的被开发者使用。下面是一个定义(来自参考文献1):


A Future is an object holding a value which may become available at some point


Java的Future是随着JDK1.5一起推出的,目的是通过引入Future来简化多线程开发,它在 java.concurrent.Future包中,但是它对异步编程还是不够友好,其主要问题在于需要get才能获取这个Future的值,而get操作会阻塞当前线程。相比之下,Scala的Future可以不需要get就能够通过回调来处理Future获得的值。同时Scala还通过ExecutionContext保证线程管理,整个使得Scala本身的异步编程非常方便。(注意:Jdk 1.8引入了CompletedFuture使得Java里面Future的使用也挺方便了)

所以Future的推出就是为了方便的处理异步逻辑。一个基本的问题是,为何需要异步化?我们试想,假如线程池有10个线程,如果代码都是同步的,那么每个线程都有一大段的时间(特别是微服务盛行的今天)在等待IO之类的操作,假如线程池的大部分线程都被阻塞了,那么即使后面有任务,CPU也没有线程可以调度,只能等待现有的线程被释放,这会导致其他线程“饿死“,影响整个系统的吞吐量。你要提高吞吐量只能不断加大驻留的线程数量,而这个是有上线的。但如果是异步代码,那么这10个主线程可以一直处于高速运转接受请求的状态,所有的异步计算,等待逻辑交由其他线程处理,这样的系统设计吞吐量会大的多,CPU的利用率会更高。换言之,我们的设计目标是尽可能接收用户请求,而不是一定要把请求处理完成再接收其他的请求。介于此我们可以实现响应式web程序(代表是 Play),这方面的更多信息可以参考这个,这篇来自Spring的文章对响应式编程的来龙去脉都有所讲解,强烈推荐。本文仅仅探讨Scala的Future的一些基础知识,使用方法以及一些需要理解的点。


Future的状态

Scala的Future根据之前定义就是用于存放未来某个时间点可以获取的值的对象。这个值按照定义有两种类型:

  1. 未完成:如果在某个时间点计算没有完成,则是未完成状态;

  2. 已完成:如果某个时间点计算已经执行了,并且有结果(成功或者异常),就是已完成状态。它又包含两种状态:

    1. 成功:如果计算成功了,就是成功状态;

    2. 失败:如果计算过程中抛了异常,则是失败状态;


怎么创建Future?

创建Future的方法有很多种,最简单的方式是使用Future.Apply方法

Future { timeCostWork()}

下面是apply方法(Scala.Concurrent.Future Object)的签名:

def apply[T](body : => T)(implicit executor : scala.concurrent.ExecutionContext) : scala.concurrent.Future[T]

我们看到,这个方法接收一个函数用来异步计算,以及一个ExecutionContext来处理线程管理逻辑,整个方法返回一个Future对象。

Future Object里面也有其他创建Future的方法,例如Future.successful可以直接创建一个成功的future,具体可以仔细研究下Future Object。


Future的重要伙伴ExecutionContext

在之前创建Future的小节里面提到了创建Future的方法之一是使用apply,而这个方法必须要一个隐式的ExecutionContext,那么到底什么是ExecutionContext?

下面是一个基本的定义;


An ExecutionContext is similar to an Executor: it is free to execute computations in a new thread, in a pooled thread or in the current thread。


这里提到的Executor是java.util.concurrent.Executor,它是Java的线程管理的抽象。Java这么做的目的是把线程的执行和线程的管理分开,让计算代码只考虑计算的业务问题而不必理会线程的管理细节。ExecutionContext是Java的Executor在Scala的定义,所以可以把ExecutionContext想象成一个自动管理的线程池,我们把任务交给它就会运行,我们只需要处理好任务本身的业务逻辑就行,这样处理还有一个好处,就是不用频繁的创建线程,这样可以节省系统开销。Scala的Future底层是基于JDK的ForkJoinPool,它也是一种Executor实现


Future的Callback机制

我们知道Java的老Future需要get来阻塞线程以获得结果,相比之下scala的则不需要,为什么可以这样?因为回调机制。Scala允许开发者在一个Future上设置回调,在任务完成之后会自动的执行回调里面的代码,这样的机制真正实现了完全的异步,使得整个系统执行效率得以提高。

一个常见的回调方法有:                                    

 OnComplete(Scala.concurrent.Future class):

def onComplete[U](f : scala.Function1[scala.util.Try[T], U])(implicit executor : scala.concurrent.ExecutionContext) : scala.Unit

这个方法接收一个Try参数,返回一个U类型,也就是说如果Future成功,参数是Success否则是Failure

除了OnComplete,Scala还有不少其他回调函数,且这些函数本身是函数式的(没有副作用),所以很容易就能嵌套使用。我们可能经常使用的组合函数有:

  1. map

  2. flatMap

  3. foreach

  4. filter

  5. foldLeft/foldRight

  6. andThen

大家妥善使用这些组合函数可以写出功能强大且优雅的代码。有一个问题需要注意,Future组合在一起,假如发生异常是怎么处理的?如果该Future嵌套的Future成功完成,那么该Future可以正确拿到嵌套Future的值;如果嵌套Future发生了异常,那么Future获得的就是失败的Future。


关于Future在For中的并发

在Scala中,如果combinator比较复杂,可以使用For语句来简化,一个常见的操作如下:

for { a <- Future {println(s"${Thread.currentThread().getName} 1") Thread.sleep(1000)} b <- Future {println(s"${Thread.currentThread().getName} 2") Thread.sleep(300)} c <- Future { println(s"${Thread.currentThread().getName} 3") Thread.sleep(300)}} yield { println(s"${Thread.currentThread().getName} 4 ")}

正常情况你要依赖a,b,c的结果需要a.flatMap b.FlatMap在c.map,这样太啰嗦,直接for一下就搞定了,所以其实for是通过map,flatMap和withFilter实现的。这样一转换整个代码清爽简洁了不少。

不过这里有一个可能的坑,就是for里面的Future.apply不是并发的,也即a,b,c这三个future是按顺序执行的,所以如果对性能要求高请不要这么写。这也是因为for的底层机制是map,它是在a的Future返回后再执行的b,因为b的Future是在a的回调中定义的。


理解Promise


Promise的定义

Promises are objects that can be assigned a value or an exception only once. This is why promises are sometimes also called single-assignment variables.

Promise一个关键特性是,promise只可以被赋值一次,多次赋值会抛异常。


Promise的作用

首先,Future的实现基于Promise

下面的代码来自 Learning Concurrent Programming in Scala

def myFuture[T](b: =>T): Future[T] = { val p = Promise[T] global.execute(new Runnable {    override def run(): Unit = try { p.success(b) } catch { case NonFatal(e) => p.failure(e) } }) p.future}

通过在另一个线程去success 或者 failure一个promise,我们能够实现一个Future。


正是因为Future背后实现的原理基于Promise,我们理解了Promise就可以很方便的给Future添加功能。

例如下面的例子是让Future在一定时间后timeout,代码来自Learning Concurrent Programming in Scala:

def timeout(t: Long): Future[Unit] = { val p = Promise[Unit] timer.schedule(new TimerTask { def run(): Unit = { p.success() timer.cancel() } }, t)

p.future }

timeout(1000) foreach { case _ => println("Timed out!") }


Future和Promise的实现原理


Future和Promise的关系

我们在之前已经完成了Future和Promise的介绍,那么一个常见的问题是,它们两个到底是什么关系?我们可以用下面一段话来概括:

A promise and a future represent two aspects of a single-assignment variable, the promise allows you to assign a value to the future object, whereas the future allows you to read the value.

简而言之,你可以把Promise和Future想象成数据流的两端,数据从Promise进,从Future读。因为Promise和Future都是只能赋值一次,所以不存在脏读的问题。Scala也正是通过这两个工具实现了一个基本的并发编程的抽象,能够解决不少并发编程的问题。


Future的实现原理

为了了解Future的实现原理,我这里查看了Scala的Library:scala-library-2.11.7的源码:

当我们调用Future.apply方法时,实际上是调用impl.Future,这是一个object,实际的实现在这个obj里面,下面是Future.apply的函数实现:

def apply[T](body: =>T)(implicit @deprecatedName('execctx) executor: ExecutionContext): Future[T] = impl.Future(body)

接下来我们看看Impl包中Future object的实现(注释是我加的):

private[concurrent] object Future { // 线程执行的对象 class PromiseCompletingRunnable[T](body: => T) extends Runnable { val promise = new Promise.DefaultPromise[T]() override def run() = { // 根据body的结果设置Promise的值 promise complete { try Success(body) catch { case NonFatal(e) => Failure(e) } } } } def apply[T](body: =>T)(implicit executor: ExecutionContext): scala.concurrent.Future[T] = { val runnable = new PromiseCompletingRunnable(body) // 通过ExecutionContext来分配线程执行runnable中的代码 executor.prepare.execute(runnable) runnable.promise.future }}

这段代码跟我们之前自己实现Future大同小异,通过ExecutionContext处理一个Runnable的对象,这个对象的run方法是被线程池里的线程调度执行的。通过这段代码,我们了解到调用Future.apply之后,其apply中的代码就被提交给线程池执行了。

所以,Future本身很简单,但是要理解Scala的Future最关键的回调注册执行的机制,就需要先理解Promise了:


Promise的实现原理

Scala的Promise是有状态的,这个状态有三种可选(来自scala-library-2.11.7中DefaultPromise的注释):

*  A DefaultPromise has three possible states. It can be:

*

*  1. Incomplete, with an associated list of callbacks waiting on completion.

*  2. Complete, with a result.

*  3. Linked to another DefaultPromise.

1 未完成状态,这个时候会关联一个回调列表,在Promise被完成的时候向线程池申请资源执行这些回调列表;

2 完成状态,就是这个promise已经被赋值了,已经完成了,那么就会关联一个值;

3 链接到其他DefaultPromise,据介绍是为了处理flatMap的内存泄漏设置的,具体先可以不用理解;

Scala默认的Promise是DefaultPromise实现的,promise本身实现了Future的接口,所以真正的Future对象其实是DefaultPromise。


Future的回调注册是如何实现的?

有了上面的基础,我们先看下DefaultPromise状态的初始值:

updateState(null, Nil) // The promise is incomplete and has no callback

这行代码来自DefaultPromise的构造函数,也就是说一个Promise在被初始化之后,是未完成状态,被关联一个空列表。

接着,我们来试图通过代码理解Future的回调注册机制,这个机制可以分为两块:

1 回调执行;

2 回调注册;

我们先来理解回调执行,我们仔细看Impl.Future中线程执行的代码,会发现promise是通过调用complete来结束的,那么这个complete会做什么?

def complete(result: Try[T]): this.type = if (tryComplete(result)) this else throw new IllegalStateException("Promise already completed.")

从代码中,我们看到Promise只能被complete一次,多次complete会抛一个IllegalStateException。同时,我们看到它调用的是tryComplete,这个tryComplete的实现如下:

def tryComplete(value: Try[T]): Boolean = { val resolved = resolveTry(value) tryCompleteAndGetListeners(resolved) match { case null => false case rs if rs.isEmpty => true case rs => rs.foreach(r => r.executeWithValue(resolved)); true }}

这个主要是看执行的代码结果是否成功,接着调用tryCompleteAndGetListeners:

@tailrecprivate def tryCompleteAndGetListeners(v: Try[T]): List[CallbackRunnable[T]] = { getState match { case raw: List[_] => val cur = raw.asInstanceOf[List[CallbackRunnable[T]]] if (updateState(cur, v)) cur else tryCompleteAndGetListeners(v) case _: DefaultPromise[_] => compressedRoot().tryCompleteAndGetListeners(v) case _ => null }}


到这里就很清晰了,如果这个Promise的状态关联了列表(未完成状态),则首先把Promise设置成完成状态并关联完成的值,接着返回回调列表;如果是代理到另一个Promise,则继续调用另一个promise的函数,否则返回一个null。

而这个回调列表会被tryComplete通过executeWithValue提交给线程池执行。

接下来,我们再来看下注册流程,以map函数为例,它的代码在Future的Object里面:

def map[S](f: T => S)(implicit executor: ExecutionContext): Future[S] = { // transform(f, identity) val p = Promise[S]() onComplete { v => p complete (v map f) } p.future}

这个map同样通过promise来返回新的Future,接着就是调用Future的OnComplete, 在等待当前Future执行结束后,执行f函数返回值。

我们看下DefaultPromise类中onComplete的实现:

def onComplete[U](func: Try[T] => U)(implicit executor: ExecutionContext): Unit = { val preparedEC = executor.prepare() val runnable = new CallbackRunnable[T](preparedEC, func) dispatchOrAddCallback(runnable)}


上面的代码反应回调也是要申请一个新的线程来执行,接着,我们看看dispatchOrAddCallback:

private def dispatchOrAddCallback(runnable: CallbackRunnable[T]): Unit = { getState match { case r: Try[_] => runnable.executeWithValue(r.asInstanceOf[Try[T]]) case _: DefaultPromise[_] => compressedRoot().dispatchOrAddCallback(runnable) case listeners: List[_] => if (updateState(listeners, runnable :: listeners)) () else dispatchOrAddCallback(runnable) }}

如果当前的Promise是已完成状态,则直接执行这个注册;如果是未完成,会先通过updateState来把当前的回调添加到state的回调列表中,注意(这里的updateState用的CAS,所以失败了会继续重试)

我们通过图表来总结下,回调执行和回调注册的流程:




总结


通过上面的研究,我们不难发现,Scala的Future本质上是Promise,而Promise是更高层次的抽象,开发者通常接触的是Future,如果要更深层次了解才会接触到Promise。不管怎样,Scala的Future通过强大的注册回调机制,使得Scala原生的(未使用AKKA)的多线程代码写起来就足够的简洁优雅,当然如果代码很复杂,多层回调看起来还是挺痛苦的,这个时候,就该轮到AKKA上场了。

下面是相关的思维导图:



参考资料:


  1. https://docs.scala-lang.org/overviews/core/futures.html

  2. https://docs.oracle.com/javase/tutorial/essential/concurrency/forkjoin.html

  3. https://spring.io/blog/2016/06/07/notes-on-reactive-programming-part-i-the-reactive-landscape

  4. http://www.reactive-streams.org/

  5. https://www.reactivemanifesto.org/zh-CN

  6. Learning Concurrent Programming in Scala


以上是关于Scala并发编程的主要内容,如果未能解决你的问题,请参考以下文章

linux打开终端如何启动scala,如何在终端下运行Scala代码片段?

大数据学习:Scala隐式转换和并发编程(DT大数据梦工厂)

Scala 学习 并发编程模型Akka

scala当中的Actor并发编程

全栈编程系列SpringBoot整合Shiro(含KickoutSessionControlFilter并发在线人数控制以及不生效问题配置启动异常No SecurityManager...)(代码片段

scala并发编程原生线程ActorCase Class下的消息传递和偏函数实战