使用返回 Future 的函数映射 Stream

Posted

技术标签:

【中文标题】使用返回 Future 的函数映射 Stream【英文标题】:mapping a Stream with a function returning a Future 【发布时间】:2013-08-05 07:25:28 【问题描述】:

我有时会发现自己有一些 Stream[X]function X => Future Y,我想将它们合并到 Future[Stream[Y]] 中,但我似乎找不到办法.例如,我有

val x = (1 until 10).toStream
def toFutureString(value : Integer) = Future(value toString)

val result : Future[Stream[String]] = ???

我试过了

 val result = Future.Traverse(x, toFutureString)

它给出了正确的结果,但似乎在返回 Future 之前消耗了整个流,这或多或少地失败了

我试过了

val result = x.flatMap(toFutureString)

但这不能用type mismatch; found : scala.concurrent.Future[String] required: scala.collection.GenTraversableOnce[?]编译

val result = x.map(toFutureString)

返回有点奇怪和无用的Stream[Future[String]]

我应该怎么做才能解决问题?

编辑:我没有卡在Stream 上,我同样会对Iterator 上的相同操作感到满意,只要它在开始处理头部之前不会阻止评估所有项目

Edit2:我不能 100% 确定 Future.Traverse 构造是否需要在返回 Future[Stream] 之前遍历整个流,但我认为确实如此。如果没有,这本身就是一个很好的答案。

Edit3:我也不需要结果是有序的,无论返回的流或迭代器是什么顺序,我都可以。

【问题讨论】:

请注意,我已提交an issue 以跟进我在下面的回答。 啊,太棒了@TravisBrown。我想自己做,但我找不到登录 Jira 的方法 有点不清楚 - 你想避免将“toFutureString”应用于集合中的所有元素......?似乎简单地创建一个未来不应该有太多的开销。如果“列表”中的其余项目是 thunk,什么会触发它们的评估?完成未来列表中的上一个?我可以在 Scala 中找到的所有序列/遍历操作似乎对单个列表元素都很严格。 @pdxleif 好问题,我想这基本上就是我的问题归结为。我知道我只想要一个[Future[Stream[String]],但我不知道每次评估应该如何以及何时进行(如果我这样做了,我不会问;)。在我看来,thunk 和 Future 是(可能是错误的)相似的东西,所以我想我想加入或组合它们? 我认为它们很相似。 Travis 提到的 Scalaz 遍历确实立即为您提供了 Future[Stream[String]],其中(单个)Future 直到整个 Stream 必须应用 toFutureStream 并运行完成后才会完成。我想我对用例有点好奇——通常当我有一系列涉及等待的 IO 计算时​​,它们之间没有数据依赖关系,我想以并行方式急切地开始它们的执行。 Stream 的 scalaz 遍历实例似乎给出了与折叠带有 .flatMap 的常规列表所获得的效果相似的效果。 【参考方案1】:

你在traverse 的正确轨道上,但不幸的是,在这种情况下标准库的定义看起来有点错误——它不应该在返回之前消耗流。

Future.traverse 是一个更通用的函数的特定版本,它适用于包装在“可遍历”类型中的任何应用函子(例如,请参阅thesepapers 或我的答案here 以获取更多信息) .

Scalaz 库提供了这个更通用的版本,它在这种情况下按预期工作(请注意,我从scalaz-contrib 获得了Future 的应用函子实例;它还不是稳定版本Scalaz,它仍然与 Scala 2.9.2 交叉构建,没有这个 Future):

import scala.concurrent._
import scalaz._, Scalaz._, scalaz.contrib.std._

import ExecutionContext.Implicits.global

def toFutureString(value: Int) = Future(value.toString)

val result: Future[Stream[String]] = Stream.from(0) traverse toFutureString

这会立即在无限流上返回,因此我们确定它不会首先被消耗。


作为脚注:如果您查看 Future.traverse 的 the source,您会发现它是根据 foldLeft 实现的,这很方便,但在流的情况下不是必需或不合适的。

【讨论】:

仅供参考,Scala 2.9.3 包含 scala.concurrent @ViktorKlang:是的,这些实例很快就会出现are coming to Scalaz core,但据我所知还没有具体的时间表。 在 Scalaz 7.2.27 中,示例代码根本不返回,导致 java.lang.***Exception【参考方案2】:

忘记流:

import scala.concurrent.Future
import ExecutionContext.Implicits.global

val x = 1 to 10 toList
def toFutureString(value : Int) = Future 
  println("starting " + value)
  Thread.sleep(1000)
  println("completed " + value)
  value.toString

产量(在我的 8 芯盒子上):

scala> Future.traverse(x)(toFutureString)
starting 1
starting 2
starting 3
starting 4
starting 5
starting 6
starting 7
starting 8
res12: scala.concurrent.Future[List[String]] = scala.concurrent.impl.Promise$DefaultPromise@2d9472e2

scala> completed 1
completed 2
starting 9
starting 10
completed 3
completed 4
completed 5
completed 6
completed 7
completed 8
completed 9
completed 10

所以它们中的 8 个立即启动(每个内核一个,尽管可以通过线程池执行程序进行配置),然后随着这些完成更多的启动。 Future[List[String]] 立即返回,然后在暂停后开始打印那些“已完成的 x”消息。

当您有一个 List[Url's] 和一个 Url => Future[HttpResponseBody] 类型的函数时,可以使用此示例。您可以使用该函数在该列表上调用 Future.traverse,并并行启动这些 http 请求,返回一个作为结果列表的未来。

是不是和你想要的一样?

【讨论】:

我猜“不想急切地评估流元素”似乎与并行性不一致,因为这些元素是启动 Futures 的任务的输入。您希望如何使用/评估该 Stream?【参考方案3】:

接受的答案不再有效,因为现代版本的 Scalaz traverse() 行为不同,并试图在调用时消耗整个流。

关于这个问题,我想说的是,不可能以真正的非阻塞方式实现这一点。

Stream[Y] 可用之前,无法解析Future[Stream[Y]]。而且由于Y 是由函数X => Future[Y] 异步生成的,因此在遍历Stream[Y] 时,如果不阻塞就无法获得Y。这意味着要么必须在解析 Future[Stream[Y]] 之前解析所有 Future[Y](这需要消耗整个流),要么在遍历 Stream[Y] 时必须允许出现块(在基础期货尚未完成的项目上) . 但是,如果我们允许阻塞遍历,那么结果未来的完成的定义是什么?从这个角度来看,它可能与Future.successful(BlockingStream[Y]) 相同。这又在语义上等于原始的Stream[Future[Y]]

换句话说,我认为问题本身存在问题。

【讨论】:

以上是关于使用返回 Future 的函数映射 Stream的主要内容,如果未能解决你的问题,请参考以下文章

Java 8 Streams 中的过滤器映射

在 Scala 期货中,我应该让我的函数返回 Future 还是返回 Try?

Java 8 Streams:根据不同的属性多次映射同一个对象

Flutter Future:构建函数返回 null

Future 异步函数的返回值问题

future.then 块在返回类型整数的函数内跳过导致返回 null