如何将 Scala 期货与超时联系起来?

Posted

技术标签:

【中文标题】如何将 Scala 期货与超时联系起来?【英文标题】:How to chain Scala Futures with timeouts? 【发布时间】:2021-06-17 13:57:49 【问题描述】:

假设我有几个函数返回Future

def fab(a: A, timeOutMillis: Long): Future[B] = ???
def fbc(b: B, timeOutMillis: Long): Future[C] = ???
def fcd(c: C, timeOutMillis: Long): Future[D] = ???

如果timeOutMillis > 0 函数返回在timeOutMillis 时间段内完成的期货。否则他们返回一个失败的Future

现在我想编写这些函数来编写facfad

def fac(a: A, timeOutMillis: Long): Future[C] = ???
def fad(a: A, timeOutMillis: Long): Future[D] = ???

所以我可以像这样实现fac

def fac(a: A, timeOutMillis: Long): Future[C] = 
  val startTime = System.currentTimeMillis()
  for 
     b <- fab(a, timeOutMillis)
     timeAB = System.currentTimeMillis() - startTime
     c <- fbc(b, timeOutMillis - timeAB)
   yield c

注意我不想等待完成期货,因此我不使用Await

这个fac 实现可能会起作用,但它看起来很笨拙并且包含样板代码。 你会如何建议链 Futures 超时?

附:我正在考虑覆盖flatMapFuture 来计算下一次超时。你怎么看?

【问题讨论】:

【参考方案1】:

我想,你正在寻找scala.duration.Deadline

    d = timeoutMillis.millis.fromNow
    fab(a, d.timeLeft.toMillis)
      .flatMap(fbc(_, d.timeLeft.toMillis))
      .flatMap(fcd(_, d.timeLeft.toMillis))

如果您不喜欢重复 d.timeTimeLeft.toMillis 的口头禅, 你可以把它折叠成一个折叠,也可以将它推广到任何数字 链式调用:

Seq(fab _, fbc _, fcd _)
  .foldLeft(Future(a))  case (last, next) => 
     last.flatMap(next(_, d.timeLeft.toMillis))
   

【讨论】:

太棒了!这可能就是我正在寻找的。 foldDeadline 看起来真的很不错:)) 现在我想知道是否有可能让客户通过截止日期透明。也就是说,我只想写for b &lt;- fab(a); c &lt;- fbc(b) yield c,而不明确传递deadline.timeLeft 好吧,我认为,您必须修改原始函数定义才能做到这一点。 def fab(a: A)(implicit deadline: Deadline = 100.years.fromNow) = ??? 我不会那样做的。就个人而言,我发现子类化 scala 标准库类几乎不值得麻烦。它也不像看起来那么简单。您的 .flatMap 需要返回您的新类实例,因此您不能只调用 .super。出于同样的原因,您还必须覆盖map(和大多数其他方法)。此外,这个新的flatMap 的语义会有所不同(它必须只适用于某些类型的函数)...... 隐含截止日期?【参考方案2】:

为什么不:

import scala.concurrent.Await,Future

Await.ready(fut1, 5.seconds).flatMap  sucInTime =>
  fut2

【讨论】:

首先我不想等待完成。其次,它似乎没有按预期工作。我不明白第二次未来的超时是如何计算的。 你需要更清楚地了解需求。 你是对的。我会澄清这个问题。

以上是关于如何将 Scala 期货与超时联系起来?的主要内容,如果未能解决你的问题,请参考以下文章

org.apache.spark.rpc.RpcTimeoutException:期货在 [120 秒] 后超时。此超时由 spark.rpc.lookupTimeout 控制

等待具有超时的期货序列,而不会在 TimeoutException 上失败

过滤时Spark sql“期货在300秒后超时”

收到 TimeoutException 的可能原因是啥:使用 Spark 时,期货在 [n 秒] 后超时 [重复]

Scala - 使用超时重试HTTP请求

Scala如何使用akka actor有效地处理超时操作