等待具有超时的期货序列,而不会在 TimeoutException 上失败
Posted
技术标签:
【中文标题】等待具有超时的期货序列,而不会在 TimeoutException 上失败【英文标题】:Await for a Sequence of Futures with timeout without failing on TimeoutException 【发布时间】:2022-01-02 18:54:38 【问题描述】:我有一系列相同类型的 scala Futures。
我想在有限的时间后得到整个序列的结果,而有些期货可能成功,有些可能失败,有些尚未完成,未完成的期货应视为失败。
我不想按顺序使用 Await 每个未来。
我确实看过这个问题:Scala waiting for sequence of futures 并尝试从那里使用解决方案,即:
private def lift[T](futures: Seq[Future[T]])(implicit ex: ExecutionContext) =
futures.map(_.map Success(_) .recover case t => Failure(t) )
def waitAll[T](futures: Seq[Future[T]])(implicit ex: ExecutionContext) =
Future.sequence(lift(futures))
futures: Seq[Future[MyObject]] = ...
val segments = Await.result(waitAll(futures), waitTimeoutMillis millis)
但我仍然收到 TimeoutException,我猜是因为某些期货尚未完成。 该答案还指出,
现在 Future.sequence(lifted) 将在每个未来完成时完成,并使用 Try 表示成功和失败。
但我希望我的未来在超时之后完成,而不是在序列中的每个未来都完成时。我还能做什么?
【问题讨论】:
我认为这对于纯 scala 期货很难做到,因为在未来的 api 中没有“本机”超时处理。等待的超时不会这样做。我认为您应该研究允许此类处理的更完整的“异步”库。如果您有兴趣,我可以在 monix 中为您提供一个示例 @IvanStanislavciuc 是的,请提供一个示例,其中包含您认为最好的任何依赖项,我正在寻找高效的代码,我的期货清单将非常大 【参考方案1】:如果我使用原始的Future
(而不是一些内置此功能的 IO monad,或者没有一些 Akka 实用程序),我会一起破解实用程序,例如:
// make each separate future timeout
object FutureTimeout
// separate EC for waiting
private val timeoutEC: ExecutorContext = ...
private def timeout[T](delay: Long): Future[T] = Future
blocking
Thread.sleep(delay)
throw new Exception("Timeout")
(timeoutEC)
def apply[T](fut: Future[T], delat: Long)(
implicit ec: ExecutionContext
): Future[T] = Future.firstCompletedOf(Seq(
fut,
timeout(delay)
))
然后
Future.sequence(
futures
.map(FutureTimeout(_, delay))
.map(Success(_))
.recover case e => Failure(e)
)
由于每个未来最多会在delay
之后终止,因此我们可以在那之后将它们收集到一个结果中。
您必须记住,无论您如何触发超时,您都无法保证超时的Future
停止执行。它可以在某个地方的某个线程上运行,只是你不会等待结果。 firstCompletedOf
只是让这场比赛更加明确。
其他一些实用程序(例如 Cats Effect IO)允许您取消计算(例如在这样的比赛中使用),但您仍然必须记住 JVM 不能任意“杀死”正在运行的线程,因此取消会在一个计算阶段完成之后和下一个计算开始之前发生(例如在.map
s 或.flatMap
s 之间)。
如果您不害怕添加外部部门,还有其他(更可靠,因为 Thread.sleep
只是暂时的丑陋黑客)方法来超时 Future,例如 Akka utils。另见其他问题like this。
【讨论】:
是的,这就是我所说的native
超时。您需要阻止整个线程来触发“超时”异常。我认为这不应该在生产系统中运行。这可能会耗尽您的执行上下文,并且业务逻辑将停止工作,因为它等待太多超时。
我同意这就是我添加免责声明的原因(如果不允许我使用 Akka 或 Cats IO 或其他什么)。可能大部分时间都可以,除非你会产生太多等待线程。但是 Akka 及其 Scheduler
应该能够帮助处理这种情况,类似于 IO monads。但我希望这些问题主要来自那些坚持原始Future
s 并且由于公司/项目的政策而无法迁移到其他任何东西的人。
@MateuszKubuszok 感谢您的解决方案,但就像您说的那样,为我的每个未来创建另一个线程似乎效率很低。我可以集成任何其他依赖项,所以如果你能提供一个最有效和最简单的外部库解决方案,那将是最受欢迎的!
如果您想要较少侵入性的更改,请查看akka.pattern.after
。或任何支持IO.racePair
和timer.sleep(..)
之类的IO monad。【参考方案2】:
这是使用monix的解决方案
import monix.eval.Task
import monix.execution.Scheduler
val timeoutScheduler = Scheduler.singleThread("timeout") //it's safe to use single thread here because timeout tasks are very fast
def sequenceDiscardTimeouts[T](tasks: Task[T]*): Task[Seq[T]] =
Task
.parSequence(
tasks
.map(t =>
t.map(Success.apply) // Map to success so we can collect the value
.timeout(500.millis)
.executeOn(timeoutScheduler) //This is needed to run timesouts in dedicated scheduler that won't be blocked by "blocking"/io work if you have any
.onErrorRecoverWith ex =>
println("timed-out")
Task.pure(Failure(ex)) //It's assumed that any error is a timeout. It's possible to "catch" just timeout exception here
)
)
.map res =>
res.collect case Success(r) => r
测试代码
implicit val mainScheduler = Scheduler.fixedPool(name = "main", poolSize = 10)
def slowTask(msg: String) =
Task.sleep(Random.nextLong(1000).millis) //Sleep here to emulate a slow task
.map _ =>
msg
val app = sequenceDiscardTimeouts(
slowTask("1"),
slowTask("2"),
slowTask("3"),
slowTask("4"),
slowTask("5"),
slowTask("6")
)
val started: Long = System.currentTimeMillis()
app.runSyncUnsafe().foreach(println)
println(s"Done in $System.currentTimeMillis() - started millis")
这将为每次运行打印不同的输出,但应该如下所示
timed-out
timed-out
timed-out
3
4
5
Done in 564 millis
请注意使用两个单独的调度程序。这是为了确保即使main
调度程序忙于业务逻辑也会触发超时。您可以通过减少主调度程序的poolSize
来测试它。
【讨论】:
以上是关于等待具有超时的期货序列,而不会在 TimeoutException 上失败的主要内容,如果未能解决你的问题,请参考以下文章