Scala并发编程
Posted 小熊的技术之路
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Scala并发编程相关的知识,希望对你有一定的参考价值。
我之前总结过一篇的文章,可以看到Java本身的多线程开发是挺复杂的,涉及到很多底层的并发组件,例如锁,信号量等,一般来说使用这种方式写出的代码适合底层库的实现,但是对于业务代码,更合适的选择是使用一些并发编程框架,这个在《七周七并发模型》里面也有阐述,这篇文章我的目的是总结一下最近1-2年使用Scala并发编程的感受以及基础的Scala并发知识点。
Scala的并发编程相对于Java更加的抽象,开发效率要高不少,这得益于Scala的异步编程模型:Future和Promise,基于这两个可以写出功能强大且不容易出错的代码。当然更加强大的是Scala的Actor模型,也就是AKKA,不过这个不在本文阐述。下面分为几个步骤总结:
理解Future
理解Promise
Future和Promise的实现原理
理解Future
Future的定义
Future的英文含义是:未来,在Java系后端开发中,Future一般指一个未来某个时间点能够获取的数据,而这个过程中会发生异步计算(一般情况下是从线程池获取另一个线程进行计算)。Scala就是通过Future及相关类这种高维抽象把异步计算封装起来,使之可以方便的被开发者使用。下面是一个定义(来自参考文献1):
A Future is an object holding a value which may become available at some point
Java的Future是随着JDK1.5一起推出的,目的是通过引入Future来简化多线程开发,它在 java.concurrent.Future包中,但是它对异步编程还是不够友好,其主要问题在于需要get才能获取这个Future的值,而get操作会阻塞当前线程。相比之下,Scala的Future可以不需要get就能够通过回调来处理Future获得的值。同时Scala还通过ExecutionContext保证线程管理,整个使得Scala本身的异步编程非常方便。(注意:Jdk 1.8引入了CompletedFuture使得Java里面Future的使用也挺方便了)
所以Future的推出就是为了方便的处理异步逻辑。一个基本的问题是,为何需要异步化?我们试想,假如线程池有10个线程,如果代码都是同步的,那么每个线程都有一大段的时间(特别是微服务盛行的今天)在等待IO之类的操作,假如线程池的大部分线程都被阻塞了,那么即使后面有任务,CPU也没有线程可以调度,只能等待现有的线程被释放,这会导致其他线程“饿死“,影响整个系统的吞吐量。你要提高吞吐量只能不断加大驻留的线程数量,而这个是有上线的。但如果是异步代码,那么这10个主线程可以一直处于高速运转接受请求的状态,所有的异步计算,等待逻辑交由其他线程处理,这样的系统设计吞吐量会大的多,CPU的利用率会更高。换言之,我们的设计目标是尽可能接收用户请求,而不是一定要把请求处理完成再接收其他的请求。介于此我们可以实现响应式web程序(代表是 Play),这方面的更多信息可以参考这个,这篇来自Spring的文章对响应式编程的来龙去脉都有所讲解,强烈推荐。本文仅仅探讨Scala的Future的一些基础知识,使用方法以及一些需要理解的点。
Future的状态
Scala的Future根据之前定义就是用于存放未来某个时间点可以获取的值的对象。这个值按照定义有两种类型:
未完成:如果在某个时间点计算没有完成,则是未完成状态;
已完成:如果某个时间点计算已经执行了,并且有结果(成功或者异常),就是已完成状态。它又包含两种状态:
成功:如果计算成功了,就是成功状态;
失败:如果计算过程中抛了异常,则是失败状态;
怎么创建Future?
创建Future的方法有很多种,最简单的方式是使用Future.Apply方法
Future {
timeCostWork()
}
下面是apply方法(Scala.Concurrent.Future Object)的签名:
def apply[T](body : => T)(implicit executor : scala.concurrent.ExecutionContext) : scala.concurrent.Future[T]
我们看到,这个方法接收一个函数用来异步计算,以及一个ExecutionContext来处理线程管理逻辑,整个方法返回一个Future对象。
Future Object里面也有其他创建Future的方法,例如Future.successful可以直接创建一个成功的future,具体可以仔细研究下Future Object。
Future的重要伙伴ExecutionContext
在之前创建Future的小节里面提到了创建Future的方法之一是使用apply,而这个方法必须要一个隐式的ExecutionContext,那么到底什么是ExecutionContext?
下面是一个基本的定义;
An ExecutionContext is similar to an Executor: it is free to execute computations in a new thread, in a pooled thread or in the current thread。
这里提到的Executor是java.util.concurrent.Executor,它是Java的线程管理的抽象。Java这么做的目的是把线程的执行和线程的管理分开,让计算代码只考虑计算的业务问题而不必理会线程的管理细节。ExecutionContext是Java的Executor在Scala的定义,所以可以把ExecutionContext想象成一个自动管理的线程池,我们把任务交给它就会运行,我们只需要处理好任务本身的业务逻辑就行,这样处理还有一个好处,就是不用频繁的创建线程,这样可以节省系统开销。Scala的Future底层是基于JDK的ForkJoinPool,它也是一种Executor实现
Future的Callback机制
我们知道Java的老Future需要get来阻塞线程以获得结果,相比之下scala的则不需要,为什么可以这样?因为回调机制。Scala允许开发者在一个Future上设置回调,在任务完成之后会自动的执行回调里面的代码,这样的机制真正实现了完全的异步,使得整个系统执行效率得以提高。
一个常见的回调方法有:
OnComplete(Scala.concurrent.Future class):
def onComplete[U](f : scala.Function1[scala.util.Try[T], U])(implicit executor : scala.concurrent.ExecutionContext) : scala.Unit
这个方法接收一个Try参数,返回一个U类型,也就是说如果Future成功,参数是Success否则是Failure
除了OnComplete,Scala还有不少其他回调函数,且这些函数本身是函数式的(没有副作用),所以很容易就能嵌套使用。我们可能经常使用的组合函数有:
map
flatMap
foreach
filter
foldLeft/foldRight
andThen
大家妥善使用这些组合函数可以写出功能强大且优雅的代码。有一个问题需要注意,Future组合在一起,假如发生异常是怎么处理的?如果该Future嵌套的Future成功完成,那么该Future可以正确拿到嵌套Future的值;如果嵌套Future发生了异常,那么Future获得的就是失败的Future。
关于Future在For中的并发
在Scala中,如果combinator比较复杂,可以使用For语句来简化,一个常见的操作如下:
for {
a <- Future {println(s"${Thread.currentThread().getName} 1")
Thread.sleep(1000)}
b <- Future {println(s"${Thread.currentThread().getName} 2")
Thread.sleep(300)}
c <- Future {
println(s"${Thread.currentThread().getName} 3")
Thread.sleep(300)}
} yield {
println(s"${Thread.currentThread().getName} 4 ")
}
正常情况你要依赖a,b,c的结果需要a.flatMap b.FlatMap在c.map,这样太啰嗦,直接for一下就搞定了,所以其实for是通过map,flatMap和withFilter实现的。这样一转换整个代码清爽简洁了不少。
不过这里有一个可能的坑,就是for里面的Future.apply不是并发的,也即a,b,c这三个future是按顺序执行的,所以如果对性能要求高请不要这么写。这也是因为for的底层机制是map,它是在a的Future返回后再执行的b,因为b的Future是在a的回调中定义的。
理解Promise
Promise的定义
Promises are objects that can be assigned a value or an exception only once. This is why promises are sometimes also called single-assignment variables.
Promise一个关键特性是,promise只可以被赋值一次,多次赋值会抛异常。
Promise的作用
首先,Future的实现基于Promise
下面的代码来自 Learning Concurrent Programming in Scala
def myFuture[T](b: =>T): Future[T] = {
val p = Promise[T]
global.execute(new Runnable {
override def run(): Unit = try {
p.success(b)
} catch {
case NonFatal(e) => p.failure(e)
}
})
p.future
}
通过在另一个线程去success 或者 failure一个promise,我们能够实现一个Future。
正是因为Future背后实现的原理基于Promise,我们理解了Promise就可以很方便的给Future添加功能。
例如下面的例子是让Future在一定时间后timeout,代码来自Learning Concurrent Programming in Scala:
def timeout(t: Long): Future[Unit] = {
val p = Promise[Unit]
timer.schedule(new TimerTask {
def run(): Unit = {
p.success()
timer.cancel()
}
}, t)
p.future
}
timeout(1000) foreach {
case _ => println("Timed out!")
}
Future和Promise的实现原理
Future和Promise的关系
我们在之前已经完成了Future和Promise的介绍,那么一个常见的问题是,它们两个到底是什么关系?我们可以用下面一段话来概括:
A promise and a future represent two aspects of a single-assignment variable, the promise allows you to assign a value to the future object, whereas the future allows you to read the value.
简而言之,你可以把Promise和Future想象成数据流的两端,数据从Promise进,从Future读。因为Promise和Future都是只能赋值一次,所以不存在脏读的问题。Scala也正是通过这两个工具实现了一个基本的并发编程的抽象,能够解决不少并发编程的问题。
Future的实现原理
为了了解Future的实现原理,我这里查看了Scala的Library:scala-library-2.11.7的源码:
当我们调用Future.apply方法时,实际上是调用impl.Future,这是一个object,实际的实现在这个obj里面,下面是Future.apply的函数实现:
def apply[T](body: =>T)(implicit @deprecatedName('execctx) executor: ExecutionContext): Future[T] = impl.Future(body)
接下来我们看看Impl包中Future object的实现(注释是我加的):
private[concurrent] object Future {
// 线程执行的对象
class PromiseCompletingRunnable[T](body: => T) extends Runnable {
val promise = new Promise.DefaultPromise[T]()
override def run() = {
// 根据body的结果设置Promise的值
promise complete {
try Success(body) catch { case NonFatal(e) => Failure(e) }
}
}
}
def apply[T](body: =>T)(implicit executor: ExecutionContext): scala.concurrent.Future[T] = {
val runnable = new PromiseCompletingRunnable(body)
// 通过ExecutionContext来分配线程执行runnable中的代码
executor.prepare.execute(runnable)
runnable.promise.future
}
}
这段代码跟我们之前自己实现Future大同小异,通过ExecutionContext处理一个Runnable的对象,这个对象的run方法是被线程池里的线程调度执行的。通过这段代码,我们了解到调用Future.apply之后,其apply中的代码就被提交给线程池执行了。
所以,Future本身很简单,但是要理解Scala的Future最关键的回调注册执行的机制,就需要先理解Promise了:
Promise的实现原理
Scala的Promise是有状态的,这个状态有三种可选(来自scala-library-2.11.7中DefaultPromise的注释):
* A DefaultPromise has three possible states. It can be:
*
* 1. Incomplete, with an associated list of callbacks waiting on completion.
* 2. Complete, with a result.
* 3. Linked to another DefaultPromise.
1 未完成状态,这个时候会关联一个回调列表,在Promise被完成的时候向线程池申请资源执行这些回调列表;
2 完成状态,就是这个promise已经被赋值了,已经完成了,那么就会关联一个值;
3 链接到其他DefaultPromise,据介绍是为了处理flatMap的内存泄漏设置的,具体先可以不用理解;
Scala默认的Promise是DefaultPromise实现的,promise本身实现了Future的接口,所以真正的Future对象其实是DefaultPromise。
Future的回调注册是如何实现的?
有了上面的基础,我们先看下DefaultPromise状态的初始值:
updateState(null, Nil) // The promise is incomplete and has no callback
这行代码来自DefaultPromise的构造函数,也就是说一个Promise在被初始化之后,是未完成状态,被关联一个空列表。
接着,我们来试图通过代码理解Future的回调注册机制,这个机制可以分为两块:
1 回调执行;
2 回调注册;
我们先来理解回调执行,我们仔细看Impl.Future中线程执行的代码,会发现promise是通过调用complete来结束的,那么这个complete会做什么?
def complete(result: Try[T]): this.type =
if (tryComplete(result)) this else throw new IllegalStateException("Promise already completed.")
从代码中,我们看到Promise只能被complete一次,多次complete会抛一个IllegalStateException。同时,我们看到它调用的是tryComplete,这个tryComplete的实现如下:
def tryComplete(value: Try[T]): Boolean = {
val resolved = resolveTry(value)
tryCompleteAndGetListeners(resolved) match {
case null => false
case rs if rs.isEmpty => true
case rs => rs.foreach(r => r.executeWithValue(resolved)); true
}
}
这个主要是看执行的代码结果是否成功,接着调用tryCompleteAndGetListeners:
@tailrec
private def tryCompleteAndGetListeners(v: Try[T]): List[CallbackRunnable[T]] = {
getState match {
case raw: List[_] =>
val cur = raw.asInstanceOf[List[CallbackRunnable[T]]]
if (updateState(cur, v)) cur else tryCompleteAndGetListeners(v)
case _: DefaultPromise[_] =>
compressedRoot().tryCompleteAndGetListeners(v)
case _ => null
}
}
到这里就很清晰了,如果这个Promise的状态关联了列表(未完成状态),则首先把Promise设置成完成状态并关联完成的值,接着返回回调列表;如果是代理到另一个Promise,则继续调用另一个promise的函数,否则返回一个null。
而这个回调列表会被tryComplete通过executeWithValue提交给线程池执行。
接下来,我们再来看下注册流程,以map函数为例,它的代码在Future的Object里面:
def map[S](f: T => S)(implicit executor: ExecutionContext): Future[S] = { // transform(f, identity)
val p = Promise[S]()
onComplete { v => p complete (v map f) }
p.future
}
这个map同样通过promise来返回新的Future,接着就是调用Future的OnComplete, 在等待当前Future执行结束后,执行f函数返回值。
我们看下DefaultPromise类中onComplete的实现:
def onComplete[U](func: Try[T] => U)(implicit executor: ExecutionContext): Unit = {
val preparedEC = executor.prepare()
val runnable = new CallbackRunnable[T](preparedEC, func)
dispatchOrAddCallback(runnable)
}
上面的代码反应回调也是要申请一个新的线程来执行,接着,我们看看dispatchOrAddCallback:
private def dispatchOrAddCallback(runnable: CallbackRunnable[T]): Unit = {
getState match {
case r: Try[_] => runnable.executeWithValue(r.asInstanceOf[Try[T]])
case _: DefaultPromise[_] => compressedRoot().dispatchOrAddCallback(runnable)
case listeners: List[_] => if (updateState(listeners, runnable :: listeners)) () else dispatchOrAddCallback(runnable)
}
}
如果当前的Promise是已完成状态,则直接执行这个注册;如果是未完成,会先通过updateState来把当前的回调添加到state的回调列表中,注意(这里的updateState用的CAS,所以失败了会继续重试)
我们通过图表来总结下,回调执行和回调注册的流程:
总结
通过上面的研究,我们不难发现,Scala的Future本质上是Promise,而Promise是更高层次的抽象,开发者通常接触的是Future,如果要更深层次了解才会接触到Promise。不管怎样,Scala的Future通过强大的注册回调机制,使得Scala原生的(未使用AKKA)的多线程代码写起来就足够的简洁优雅,当然如果代码很复杂,多层回调看起来还是挺痛苦的,这个时候,就该轮到AKKA上场了。
下面是相关的思维导图:
参考资料:
https://docs.scala-lang.org/overviews/core/futures.html
https://docs.oracle.com/javase/tutorial/essential/concurrency/forkjoin.html
https://spring.io/blog/2016/06/07/notes-on-reactive-programming-part-i-the-reactive-landscape
http://www.reactive-streams.org/
https://www.reactivemanifesto.org/zh-CN
Learning Concurrent Programming in Scala
以上是关于Scala并发编程的主要内容,如果未能解决你的问题,请参考以下文章
linux打开终端如何启动scala,如何在终端下运行Scala代码片段?
大数据学习:Scala隐式转换和并发编程(DT大数据梦工厂)
全栈编程系列SpringBoot整合Shiro(含KickoutSessionControlFilter并发在线人数控制以及不生效问题配置启动异常No SecurityManager...)(代码片段