等待具有超时的期货序列,而不会在 TimeoutException 上失败

Posted

技术标签:

【中文标题】等待具有超时的期货序列,而不会在 TimeoutException 上失败【英文标题】:Await for a Sequence of Futures with timeout without failing on TimeoutException 【发布时间】:2022-01-02 18:54:38 【问题描述】:

我有一系列相同类型的 scala Futures。

我想在有限的时间后得到整个序列的结果,而有些期货可能成功,有些可能失败,有些尚未完成,未完成的期货应视为失败。

我不想按顺序使用 Await 每个未来。

我确实看过这个问题:Scala waiting for sequence of futures 并尝试从那里使用解决方案,即:

  private def lift[T](futures: Seq[Future[T]])(implicit ex: ExecutionContext) =
    futures.map(_.map  Success(_) .recover  case t => Failure(t) )

  def waitAll[T](futures: Seq[Future[T]])(implicit ex: ExecutionContext) =
    Future.sequence(lift(futures))

  futures: Seq[Future[MyObject]] = ...
  val segments = Await.result(waitAll(futures), waitTimeoutMillis millis)

但我仍然收到 TimeoutException,我猜是因为某些期货尚未完成。 该答案还指出,

现在 Future.sequence(lifted) 将在每个未来完成时完成,并使用 Try 表示成功和失败。

但我希望我的未来在超时之后完成,而不是在序列中的每个未来都完成时。我还能做什么?

【问题讨论】:

我认为这对于纯 scala 期货很难做到,因为在未来的 api 中没有“本机”超时处理。等待的超时不会这样做。我认为您应该研究允许此类处理的更完整的“异步”库。如果您有兴趣,我可以在 monix 中为您提供一个示例 @IvanStanislavciuc 是的,请提供一个示例,其中包含您认为最好的任何依赖项,我正在寻找高效的代码,我的期货清单将非常大 【参考方案1】:

如果我使用原始的Future(而不是一些内置此功能的 IO monad,或者没有一些 Akka 实用程序),我会一起破解实用程序,例如:

// make each separate future timeout
object FutureTimeout 
  // separate EC for waiting
  private val timeoutEC: ExecutorContext = ...

  private def timeout[T](delay: Long): Future[T] = Future 
    blocking 
      Thread.sleep(delay)
    
    throw new Exception("Timeout")
  (timeoutEC)

  def apply[T](fut: Future[T], delat: Long)(
    implicit ec: ExecutionContext
  ): Future[T] = Future.firstCompletedOf(Seq(
    fut,
    timeout(delay)
  ))

然后

Future.sequence(
  futures
    .map(FutureTimeout(_, delay))
    .map(Success(_))
    .recover  case e => Failure(e) 
)

由于每个未来最多会在delay 之后终止,因此我们可以在那之后将它们收集到一个结果中。

您必须记住,无论您如何触发超时,您都无法保证超时的Future 停止执行。它可以在某个地方的某个线程上运行,只是你不会等待结果。 firstCompletedOf 只是让这场比赛更加明确。

其他一些实用程序(例如 Cats Effect IO)允许您取消计算(例如在这样的比赛中使用),但您仍然必须记住 JVM 不能任意“杀死”正在运行的线程,因此取消会在一个计算阶段完成之后和下一个计算开始之前发生(例如在.maps 或.flatMaps 之间)。

如果您不害怕添加外部部门,还有其他(更可靠,因为 Thread.sleep 只是暂时的丑陋黑客)方法来超时 Future,例如 Akka utils。另见其他问题like this。

【讨论】:

是的,这就是我所说的native 超时。您需要阻止整个线程来触发“超时”异常。我认为这不应该在生产系统中运行。这可能会耗尽您的执行上下文,并且业务逻辑将停止工作,因为它等待太多超时。 我同意这就是我添加免责声明的原因(如果不允许我使用 Akka 或 Cats IO 或其他什么)。可能大部分时间都可以,除非你会产生太多等待线程。但是 Akka 及其 Scheduler 应该能够帮助处理这种情况,类似于 IO monads。但我希望这些问题主要来自那些坚持原始Futures 并且由于公司/项目的政策而无法迁移到其他任何东西的人。 @MateuszKubuszok 感谢您的解决方案,但就像您说的那样,为我的每个未来创建另一个线程似乎效率很低。我可以集成任何其他依赖项,所以如果你能提供一个最有效和最简单的外部库解决方案,那将是最受欢迎的! 如果您想要较少侵入性的更改,请查看akka.pattern.after。或任何支持IO.racePairtimer.sleep(..) 之类的IO monad。【参考方案2】:

这是使用monix的解决方案

import monix.eval.Task
import monix.execution.Scheduler

val timeoutScheduler = Scheduler.singleThread("timeout") //it's safe to use single thread here because timeout tasks are very fast

def sequenceDiscardTimeouts[T](tasks: Task[T]*): Task[Seq[T]] = 
  Task
    .parSequence(
      tasks
        .map(t =>
          t.map(Success.apply) // Map to success so we can collect the value
            .timeout(500.millis)
            .executeOn(timeoutScheduler) //This is needed to run timesouts in dedicated scheduler that won't be blocked by "blocking"/io work if you have any
            .onErrorRecoverWith  ex =>
              println("timed-out")
              Task.pure(Failure(ex)) //It's assumed that any error is a timeout. It's possible to "catch" just timeout exception here
            
        )
    )
    .map  res =>
      res.collect  case Success(r) => r 
    


测试代码

implicit val mainScheduler = Scheduler.fixedPool(name = "main", poolSize = 10)


def slowTask(msg: String) = 
  Task.sleep(Random.nextLong(1000).millis) //Sleep here to emulate a slow task
    .map  _ =>
      msg
    



val app = sequenceDiscardTimeouts(
  slowTask("1"),
  slowTask("2"),
  slowTask("3"),
  slowTask("4"),
  slowTask("5"),
  slowTask("6")
)

val started: Long = System.currentTimeMillis()
app.runSyncUnsafe().foreach(println)
println(s"Done in $System.currentTimeMillis() - started millis")

这将为每次运行打印不同的输出,但应该如下所示

timed-out
timed-out
timed-out
3
4
5
Done in 564 millis

请注意使用两个单独的调度程序。这是为了确保即使main 调度程序忙于业务逻辑也会触发超时。您可以通过减少主调度程序的poolSize 来测试它。

【讨论】:

以上是关于等待具有超时的期货序列,而不会在 TimeoutException 上失败的主要内容,如果未能解决你的问题,请参考以下文章

如何从具有超时的终端读取UTF-8字符?

期货发送并发HTTP GET请求

在等待期望时使用 XCTFail 并不能防止超时

如何使赛普拉斯等待具有多个结果的异步搜索完成而不会导致测试失败

过滤时Spark sql“期货在300秒后超时”

收到 TimeoutException 的可能原因是啥:使用 Spark 时,期货在 [n 秒] 后超时 [重复]