Scala 中的异步 IO 与期货

Posted

技术标签:

【中文标题】Scala 中的异步 IO 与期货【英文标题】:Asynchronous IO in Scala with futures 【发布时间】:2012-10-17 08:35:53 【问题描述】:

假设我要从某些 URL 下载一个(可能很大的)图像列表。我正在使用 Scala,所以我会做的是:

import scala.actors.Futures._

// Retrieve URLs from somewhere
val urls: List[String] = ...

// Download image (blocking operation)
val fimages: List[Future[...]] = urls.map (url => future  download url )

// Do something (display) when complete
fimages.foreach (_.foreach (display _))

我对 Scala 有点陌生,所以这对我来说仍然有点像魔术:

这是正确的做法吗?如果不是,有什么替代方案? 如果我要下载 100 张图片,这会一次创建 100 个线程,还是会使用线程池? 最后一条指令 (display _) 是否会在主线程上执行,如果没有,我如何确保它是?

感谢您的建议!

【问题讨论】:

【参考方案1】:

在 Scala 2.10 中使用期货。他们是 Scala 团队、Akka 团队和 Twitter 之间的联合工作,旨在实现更标准化的未来 API 和跨框架使用的实现。我们刚刚发布了一份指南:http://docs.scala-lang.org/overviews/core/futures.html

除了完全非阻塞(默认情况下,尽管我们提供了托管阻塞操作的能力)和可组合性之外,Scala 的 2.10 未来还带有一个隐式线程池来执行您的任务,以及一些用于管理时间的实用程序出局。

import scala.concurrent.future, blocking, Future, Await, ExecutionContext.Implicits.global
import scala.concurrent.duration._

// Retrieve URLs from somewhere
val urls: List[String] = ...

// Download image (blocking operation)
val imagesFuts: List[Future[...]] = urls.map 
  url => future  blocking  download url  


// Do something (display) when complete
val futImages: Future[List[...]] = Future.sequence(imagesFuts)
Await.result(futImages, 10 seconds).foreach(display)

上面,我们先导入一些东西:

future:创造未来的 API。 blocking:用于托管阻塞的 API。 Future:Future 伴随对象,其中包含许多用于 集合 futures 的有用方法。 Await:用于阻塞未来的单例对象(将其结果传输到当前线程)。 ExecutionContext.Implicits.global:默认全局线程池,一个 ForkJoin 池。 duration._:用于管理超时持续时间的实用程序。

imagesFuts 与您最初所做的基本相同 - 这里唯一的区别是我们使用托管阻塞 - blocking。它通知线程池您传递给它的代码块包含长时间运行或阻塞操作。这允许池临时生成新的工作人员,以确保不会发生所有工作人员都被阻塞的情况。这样做是为了防止阻塞应用程序中的饥饿(锁定线程池)。请注意,线程池还知道托管阻塞块中的代码何时完成 - 因此它将在该点移除备用工作线程,这意味着池将收缩回其预期大小。

(如果你想绝对避免创建额外的线程,那么你应该使用 AsyncIO 库,例如 Java 的 NIO 库。)

然后我们使用Future伴随对象的集合方法将imagesFutsList[Future[...]]转换为Future[List[...]]

Await 对象是我们如何确保在调用线程上执行display 的方法——Await.result 只是强制当前线程等待,直到它传递的未来完成。 (这在内部使用托管阻塞。)

【讨论】:

感谢您的深入回答!如果我理解正确,如果您没有指定“阻塞”,那么如果每个工作人员都无限期地保持忙碌,那么线程池可能会用完工作人员并永远阻塞?另外,我可以创建自己的ExecutionContext 来强制完成回调(当然不是实际的后台进程)在特定线程(即 UI 线程,使用特定于框架的方法)上异步执行吗?跨度> 技术上:不惜一切代价避免阻塞。只有在别无选择的情况下才进行屏蔽。 但要回答您之前的问题 - 是的,如果所有线程都阻塞并且您不使用托管阻塞,则默认 FJPool 可能会耗尽工作人员。是的,您可以创建自己的ExecutionContext,例如使用 Swing 的invokeLater,然后将其显式传递给futImages 上的foreach,而不是使用Await.result blocking 背后的东西实际上是如何工作的?它有自己的线程池,还是在我们通过blocking提交任务时创建新线程? 你为什么在那里使用阻塞url => future blocking download url ,为什么不只使用url => future download url 【参考方案2】:
val all = Future.traverse(urls) url =>
  val f = future(download url) /*(downloadContext)*/
  f.onComplete(display)(displayContext)
  f

Await.result(all, ...)
    在 2.10 中使用 scala.concurrent.Future,现在是 RC。 使用隐式 ExecutionContext 新的 Future 文档明确指出,如果值可用,onComplete(和 foreach)可以立即评估。老演员Future也做同样的事情。根据您对显示的要求,您可以提供合适的 ExecutionContext(例如,单线程执行程序)。如果你只是想让主线程等待加载完成,traverse 会给你一个等待的未来。

【讨论】:

【参考方案3】:

    是的,我觉得不错,但您可能想研究更强大的 twitter-util 或 Akka Future API(Scala 2.10 将有一个这种风格的新 Future 库)。

    李>

    它使用线程池。

    不,不会。为此,您需要使用 GUI 工具包的标准机制(SwingUtilities.invokeLater 用于 Swing,Display.asyncExec 用于 SWT)。例如。

    fimages.foreach (_.foreach(im => SwingUtilities.invokeLater(new Runnable  display im )))
    

【讨论】:

感谢您的回答,我很高兴知道我的方法是明智的!我实际上正在尝试 android 的 Scala,所以与可怕的 Java 语法相比,它会派上用场! 关于#3,在你写答案之前,我正在考虑并尝试一些简单的测试用例,它似乎确实在主线程上执行。我刚刚创建了一个简单的future"test" 并在上面运行foreach(s => println(Thread.currentThread.getName()),它打印了main。我是不是误会了什么? @F.X.我只是在 Scala 控制台中做了两次同样的事情(为了同样的未来),得到了 Thread-15Thread-16。它可能取决于 Scala 版本。 我认为 Scala 控制台会为您键入的每个命令生成线程。我刚刚尝试了println(...getName()); f.foreach(s => ...getName())(一行)并得到了两次Thread-20。很奇怪。 是的,看起来是这样。至少,由于文档没有说它是在主线程中调用的,我不会这么认为。

以上是关于Scala 中的异步 IO 与期货的主要内容,如果未能解决你的问题,请参考以下文章

Scala:带有地图的期货,用于 IO/CPU 绑定任务的 flatMap

Scala 捕获一系列期货引发的错误

如何将 Scala 期货与超时联系起来?

Scala 期货 - 如何在完成时结束?

在 Scala Akka 期货中,map 和 flatMap 有啥区别?

期货未终止的Scala主类