Scala 中的异步 IO 与期货
Posted
技术标签:
【中文标题】Scala 中的异步 IO 与期货【英文标题】:Asynchronous IO in Scala with futures 【发布时间】:2012-10-17 08:35:53 【问题描述】:假设我要从某些 URL 下载一个(可能很大的)图像列表。我正在使用 Scala,所以我会做的是:
import scala.actors.Futures._
// Retrieve URLs from somewhere
val urls: List[String] = ...
// Download image (blocking operation)
val fimages: List[Future[...]] = urls.map (url => future download url )
// Do something (display) when complete
fimages.foreach (_.foreach (display _))
我对 Scala 有点陌生,所以这对我来说仍然有点像魔术:
这是正确的做法吗?如果不是,有什么替代方案? 如果我要下载 100 张图片,这会一次创建 100 个线程,还是会使用线程池? 最后一条指令 (display _
) 是否会在主线程上执行,如果没有,我如何确保它是?
感谢您的建议!
【问题讨论】:
【参考方案1】:在 Scala 2.10 中使用期货。他们是 Scala 团队、Akka 团队和 Twitter 之间的联合工作,旨在实现更标准化的未来 API 和跨框架使用的实现。我们刚刚发布了一份指南:http://docs.scala-lang.org/overviews/core/futures.html
除了完全非阻塞(默认情况下,尽管我们提供了托管阻塞操作的能力)和可组合性之外,Scala 的 2.10 未来还带有一个隐式线程池来执行您的任务,以及一些用于管理时间的实用程序出局。
import scala.concurrent.future, blocking, Future, Await, ExecutionContext.Implicits.global
import scala.concurrent.duration._
// Retrieve URLs from somewhere
val urls: List[String] = ...
// Download image (blocking operation)
val imagesFuts: List[Future[...]] = urls.map
url => future blocking download url
// Do something (display) when complete
val futImages: Future[List[...]] = Future.sequence(imagesFuts)
Await.result(futImages, 10 seconds).foreach(display)
上面,我们先导入一些东西:
future
:创造未来的 API。
blocking
:用于托管阻塞的 API。
Future
:Future 伴随对象,其中包含许多用于 集合 futures 的有用方法。
Await
:用于阻塞未来的单例对象(将其结果传输到当前线程)。
ExecutionContext.Implicits.global
:默认全局线程池,一个 ForkJoin 池。
duration._
:用于管理超时持续时间的实用程序。
imagesFuts
与您最初所做的基本相同 - 这里唯一的区别是我们使用托管阻塞 - blocking
。它通知线程池您传递给它的代码块包含长时间运行或阻塞操作。这允许池临时生成新的工作人员,以确保不会发生所有工作人员都被阻塞的情况。这样做是为了防止阻塞应用程序中的饥饿(锁定线程池)。请注意,线程池还知道托管阻塞块中的代码何时完成 - 因此它将在该点移除备用工作线程,这意味着池将收缩回其预期大小。
(如果你想绝对避免创建额外的线程,那么你应该使用 AsyncIO 库,例如 Java 的 NIO 库。)
然后我们使用Future伴随对象的集合方法将imagesFuts
从List[Future[...]]
转换为Future[List[...]]
。
Await
对象是我们如何确保在调用线程上执行display
的方法——Await.result
只是强制当前线程等待,直到它传递的未来完成。 (这在内部使用托管阻塞。)
【讨论】:
感谢您的深入回答!如果我理解正确,如果您没有指定“阻塞”,那么如果每个工作人员都无限期地保持忙碌,那么线程池可能会用完工作人员并永远阻塞?另外,我可以创建自己的ExecutionContext
来强制完成回调(当然不是实际的后台进程)在特定线程(即 UI 线程,使用特定于框架的方法)上异步执行吗?跨度>
技术上:不惜一切代价避免阻塞。只有在别无选择的情况下才进行屏蔽。
但要回答您之前的问题 - 是的,如果所有线程都阻塞并且您不使用托管阻塞,则默认 FJPool 可能会耗尽工作人员。是的,您可以创建自己的ExecutionContext
,例如使用 Swing 的invokeLater
,然后将其显式传递给futImages
上的foreach
,而不是使用Await.result
blocking
背后的东西实际上是如何工作的?它有自己的线程池,还是在我们通过blocking
提交任务时创建新线程?
你为什么在那里使用阻塞url => future blocking download url
,为什么不只使用url => future download url
?【参考方案2】:
val all = Future.traverse(urls) url =>
val f = future(download url) /*(downloadContext)*/
f.onComplete(display)(displayContext)
f
Await.result(all, ...)
-
在 2.10 中使用 scala.concurrent.Future,现在是 RC。
使用隐式 ExecutionContext
新的 Future 文档明确指出,如果值可用,onComplete(和 foreach)可以立即评估。老演员Future也做同样的事情。根据您对显示的要求,您可以提供合适的 ExecutionContext(例如,单线程执行程序)。如果你只是想让主线程等待加载完成,traverse 会给你一个等待的未来。
【讨论】:
【参考方案3】:是的,我觉得不错,但您可能想研究更强大的 twitter-util 或 Akka Future API(Scala 2.10 将有一个这种风格的新 Future 库)。
李>它使用线程池。
不,不会。为此,您需要使用 GUI 工具包的标准机制(SwingUtilities.invokeLater
用于 Swing,Display.asyncExec
用于 SWT)。例如。
fimages.foreach (_.foreach(im => SwingUtilities.invokeLater(new Runnable display im )))
【讨论】:
感谢您的回答,我很高兴知道我的方法是明智的!我实际上正在尝试 android 的 Scala,所以与可怕的 Java 语法相比,它会派上用场! 关于#3,在你写答案之前,我正在考虑并尝试一些简单的测试用例,它似乎确实在主线程上执行。我刚刚创建了一个简单的future"test"
并在上面运行foreach(s => println(Thread.currentThread.getName())
,它打印了main
。我是不是误会了什么?
@F.X.我只是在 Scala 控制台中做了两次同样的事情(为了同样的未来),得到了 Thread-15
和 Thread-16
。它可能取决于 Scala 版本。
我认为 Scala 控制台会为您键入的每个命令生成线程。我刚刚尝试了println(...getName()); f.foreach(s => ...getName())
(一行)并得到了两次Thread-20
。很奇怪。
是的,看起来是这样。至少,由于文档没有说它是在主线程中调用的,我不会这么认为。以上是关于Scala 中的异步 IO 与期货的主要内容,如果未能解决你的问题,请参考以下文章
Scala:带有地图的期货,用于 IO/CPU 绑定任务的 flatMap