使用返回 Future 的函数映射 Stream
Posted
技术标签:
【中文标题】使用返回 Future 的函数映射 Stream【英文标题】:mapping a Stream with a function returning a Future 【发布时间】:2013-08-05 07:25:28 【问题描述】:我有时会发现自己有一些 Stream[X]
和 function X => Future Y
,我想将它们合并到 Future[Stream[Y]]
中,但我似乎找不到办法.例如,我有
val x = (1 until 10).toStream
def toFutureString(value : Integer) = Future(value toString)
val result : Future[Stream[String]] = ???
我试过了
val result = Future.Traverse(x, toFutureString)
它给出了正确的结果,但似乎在返回 Future 之前消耗了整个流,这或多或少地失败了
我试过了
val result = x.flatMap(toFutureString)
但这不能用type mismatch; found : scala.concurrent.Future[String] required: scala.collection.GenTraversableOnce[?]
编译
val result = x.map(toFutureString)
返回有点奇怪和无用的Stream[Future[String]]
我应该怎么做才能解决问题?
编辑:我没有卡在Stream
上,我同样会对Iterator
上的相同操作感到满意,只要它在开始处理头部之前不会阻止评估所有项目
Edit2:我不能 100% 确定 Future.Traverse 构造是否需要在返回 Future[Stream] 之前遍历整个流,但我认为确实如此。如果没有,这本身就是一个很好的答案。
Edit3:我也不需要结果是有序的,无论返回的流或迭代器是什么顺序,我都可以。
【问题讨论】:
请注意,我已提交an issue 以跟进我在下面的回答。 啊,太棒了@TravisBrown。我想自己做,但我找不到登录 Jira 的方法 有点不清楚 - 你想避免将“toFutureString”应用于集合中的所有元素......?似乎简单地创建一个未来不应该有太多的开销。如果“列表”中的其余项目是 thunk,什么会触发它们的评估?完成未来列表中的上一个?我可以在 Scala 中找到的所有序列/遍历操作似乎对单个列表元素都很严格。 @pdxleif 好问题,我想这基本上就是我的问题归结为。我知道我只想要一个[Future[Stream[String]]
,但我不知道每次评估应该如何以及何时进行(如果我这样做了,我不会问;)。在我看来,thunk 和 Future 是(可能是错误的)相似的东西,所以我想我想加入或组合它们?
我认为它们很相似。 Travis 提到的 Scalaz 遍历确实立即为您提供了 Future[Stream[String]],其中(单个)Future 直到整个 Stream 必须应用 toFutureStream 并运行完成后才会完成。我想我对用例有点好奇——通常当我有一系列涉及等待的 IO 计算时,它们之间没有数据依赖关系,我想以并行方式急切地开始它们的执行。 Stream 的 scalaz 遍历实例似乎给出了与折叠带有 .flatMap 的常规列表所获得的效果相似的效果。
【参考方案1】:
你在traverse
的正确轨道上,但不幸的是,在这种情况下标准库的定义看起来有点错误——它不应该在返回之前消耗流。
Future.traverse
是一个更通用的函数的特定版本,它适用于包装在“可遍历”类型中的任何应用函子(例如,请参阅thesepapers 或我的答案here 以获取更多信息) .
Scalaz 库提供了这个更通用的版本,它在这种情况下按预期工作(请注意,我从scalaz-contrib
获得了Future
的应用函子实例;它还不是稳定版本Scalaz,它仍然与 Scala 2.9.2 交叉构建,没有这个 Future
):
import scala.concurrent._
import scalaz._, Scalaz._, scalaz.contrib.std._
import ExecutionContext.Implicits.global
def toFutureString(value: Int) = Future(value.toString)
val result: Future[Stream[String]] = Stream.from(0) traverse toFutureString
这会立即在无限流上返回,因此我们确定它不会首先被消耗。
作为脚注:如果您查看 Future.traverse
的 the source,您会发现它是根据 foldLeft
实现的,这很方便,但在流的情况下不是必需或不合适的。
【讨论】:
仅供参考,Scala 2.9.3 包含 scala.concurrent @ViktorKlang:是的,这些实例很快就会出现are coming to Scalaz core,但据我所知还没有具体的时间表。 在 Scalaz 7.2.27 中,示例代码根本不返回,导致java.lang.***Exception
【参考方案2】:
忘记流:
import scala.concurrent.Future
import ExecutionContext.Implicits.global
val x = 1 to 10 toList
def toFutureString(value : Int) = Future
println("starting " + value)
Thread.sleep(1000)
println("completed " + value)
value.toString
产量(在我的 8 芯盒子上):
scala> Future.traverse(x)(toFutureString)
starting 1
starting 2
starting 3
starting 4
starting 5
starting 6
starting 7
starting 8
res12: scala.concurrent.Future[List[String]] = scala.concurrent.impl.Promise$DefaultPromise@2d9472e2
scala> completed 1
completed 2
starting 9
starting 10
completed 3
completed 4
completed 5
completed 6
completed 7
completed 8
completed 9
completed 10
所以它们中的 8 个立即启动(每个内核一个,尽管可以通过线程池执行程序进行配置),然后随着这些完成更多的启动。 Future[List[String]] 立即返回,然后在暂停后开始打印那些“已完成的 x”消息。
当您有一个 List[Url's] 和一个 Url => Future[HttpResponseBody] 类型的函数时,可以使用此示例。您可以使用该函数在该列表上调用 Future.traverse,并并行启动这些 http 请求,返回一个作为结果列表的未来。
是不是和你想要的一样?
【讨论】:
我猜“不想急切地评估流元素”似乎与并行性不一致,因为这些元素是启动 Futures 的任务的输入。您希望如何使用/评估该 Stream?【参考方案3】:接受的答案不再有效,因为现代版本的 Scalaz traverse()
行为不同,并试图在调用时消耗整个流。
关于这个问题,我想说的是,不可能以真正的非阻塞方式实现这一点。
在Stream[Y]
可用之前,无法解析Future[Stream[Y]]
。而且由于Y
是由函数X => Future[Y]
异步生成的,因此在遍历Stream[Y]
时,如果不阻塞就无法获得Y
。这意味着要么必须在解析 Future[Stream[Y]]
之前解析所有 Future[Y]
(这需要消耗整个流),要么在遍历 Stream[Y]
时必须允许出现块(在基础期货尚未完成的项目上) .
但是,如果我们允许阻塞遍历,那么结果未来的完成的定义是什么?从这个角度来看,它可能与Future.successful(BlockingStream[Y])
相同。这又在语义上等于原始的Stream[Future[Y]]
。
换句话说,我认为问题本身存在问题。
【讨论】:
以上是关于使用返回 Future 的函数映射 Stream的主要内容,如果未能解决你的问题,请参考以下文章
在 Scala 期货中,我应该让我的函数返回 Future 还是返回 Try?