如何将 Scala ARM 与期货一起使用?
Posted
技术标签:
【中文标题】如何将 Scala ARM 与期货一起使用?【英文标题】:How to use Scala ARM with Futures? 【发布时间】:2014-01-10 07:46:59 【问题描述】:我想实现 ARM(自动资源管理)模式,其中资源是异步使用的。
问题
假设我的资源如下所示:
class MyResource
def foo() : Future[MyResource] = ???
// Other methods returning various futures
def close() : Unit = ???
object MyResource
def open(name: String): Future[MyResource] = ???
所需的使用模式是:
val r : Future[MyResource] = MyResource.open("name")
r flatMap (r =>
r.foo() /* map ... */ andThen
case _ => r.close()
)
省略的映射函数可能很复杂,包括对返回 Futures 的 r
方法进行重复调用的分支和链接期货。
我想确保r.close()
在所有 未来的延续完成(或失败)之后被调用。在每个呼叫站点手动执行此操作很容易出错。这需要 ARM 解决方案。
尝试的解决方案
scala-arm 库通常是同步的。这段代码不会做正确的事情,因为 close() 会在块内的期货完成之前被调用:
for (r <- managed(MyResource.open("name")))
r map (_.foo()) // map ...
我想使用这个包装器:
def usingAsync[T](opener: => Future[MyResource]) (body: MyResource => Future[T]) : Future[T] =
opener flatMap
myr => body(myr) andThen case _ => myr.close()
然后调用站点将如下所示:
usingAsync(MyResource.open("name")) ( myr =>
myr.foo // map ...
)
但是,块内的代码将负责返回一个 Future,该 Future 在该块创建的所有其他期货完成时完成。如果它不小心没有,那么在所有使用它的期货完成之前,该资源将再次关闭。并且不会有静态验证来捕获此错误。例如,这将是一个运行时错误:
usingAsync(MyResource.open("name")) ( myr =>
myr.foo() // Do one thing
myr.bar() // Do another
)
如何解决?
显然,我可以使用 scala-arm 的定界延续支持 (CPS)。它看起来有点复杂,我害怕弄错。它需要启用编译器插件。另外,我的团队对 scala 还很陌生,我不想要求他们使用 CPS。
CPS 是唯一的出路吗?是否有使用 Futures 更简单地做到这一点的库或设计模式,或者使用 scala-arm 做到这一点的示例?
【问题讨论】:
检查 Monix 任务,它有一个完成触发器monix.io/docs/3x/eval/task.html#clean-up-resources-on-finish 【参考方案1】:响应式扩展 (Rx) 可能是一种替代解决方案。 这种编程范式的势头越来越大,现在可以在包括 Scala 在内的多种语言中使用。
Rx 的基础是创建一个 Observable,它是异步事件的来源。 Observable 可以以复杂的方式链接起来,这就是它的力量。你订阅一个 Observable 来监听 onNext、onError 和 onComplete 事件。您还会收到一个允许您取消的订阅。
我认为您可能会在 onCompleted 和/或 onError 处理程序中添加一个 resource.close() 调用。
请参阅 RxScala 文档:
Observable.subscribe(
onNext: (T) ⇒ Unit,
onError: (Throwable) ⇒ Unit,
onCompleted: () ⇒ Unit): Subscription
更多信息:
RxScala 网站:http://rxscala.github.io/ RxScala 可观察:http://rxscala.github.io/scaladoc/index.html#rx.lang.scala.Observable NetFlix 上 Ben Christensen 的精彩介绍:http://www.infoq.com/presentations/netflix-functional-rx Erik Meijer 在 Martin Odersky、Erik Meijer 和 Roland Kuhn 的 Coursera 课程响应式编程原理中提供了链式 Observable 的代码示例。【讨论】:
(替换了之前的评论)我们已经使用了 play-iteratee,它看起来与 Observable 相似,实际上更胜一筹。我看不出他们中的任何一个会有什么帮助。具有返回 Futures 和 close() 的 multiple 方法的资源与 observables 流不同。你能详细说明一下吗? 那时我会使用 iteratee。干杯。以上是关于如何将 Scala ARM 与期货一起使用?的主要内容,如果未能解决你的问题,请参考以下文章
如何将 IntelliJ 与 Play Framework 和 Scala 一起使用
如何将 Spark/Scala RDD 合并/加入到 List 中,以便 RDD 中的每个值与每个 List 项一起获得一个新行