Scala:带有地图的期货,用于 IO/CPU 绑定任务的 flatMap

Posted

技术标签:

【中文标题】Scala:带有地图的期货,用于 IO/CPU 绑定任务的 flatMap【英文标题】:Scala : Futures with map, flatMap for IO/CPU bound tasks 【发布时间】:2017-06-27 14:40:26 【问题描述】:

我知道在 Java 8 中,在流框架的 filter、map 等方法中启动长时间运行的任务并不是一个好主意,因为没有办法调整底层的 fork-join 池,这可能会导致延迟问题和饥饿。

现在,我的问题是,Scala 有这样的问题吗?我试图用谷歌搜索它,但我想我不能把这个问题放在一个谷歌可用的句子中。

假设我有一个对象列表,我想使用 forEach 将它们保存到数据库中,这会导致任何问题吗?我想这在 Scala 中不是问题,因为函数转换是该语言的基本构建块,但无论如何......

【问题讨论】:

【参考方案1】:

如果您没有看到任何类型的 I/O 操作,那么使用期货可能会产生开销。

def add(x: Int, y: Int) = Future x + y

在 Future 构造函数中执行纯 CPU 绑定操作会使您的逻辑执行速度变慢,而不是更快。对它们进行映射和平面映射可能会加剧这个问题。

如果你想用一个常数/简单的计算来初始化一个Future,你可以使用Future.successful()

但是所有阻塞 I/O,包括 SQL 查询,都可以用blocking 包装在Future

例如:

Future DB.withConnection implicit connection => val query = SQL("select * from bar") query() 应该这样做,

import scala.concurrent.blocking Future blocking DB.withConnection implicit connection => val query = SQL("select * from bar") query()

这个blocking 通知线程池这个任务正在阻塞。这允许池根据需要临时产生新的工人。这样做是为了防止在阻塞应用程序时出现饥饿。

线程池(默认为scala.concurrent.ExecutionContext.global 池)知道blocking 中的代码何时完成。(因为它是一个fork join 线程池)

因此,它会在空闲工作线程完成时移除它们,并且池将随着时间的推移收缩回其预期大小(默认为核心数)。

但是如果没有足够的内存来扩展线程池,这种情况也会适得其反。

所以对于你的场景,你可以使用

images.foreach(i => 

  import scala.concurrent.blocking
   Future 
    blocking 
      DB.withConnection  implicit connection =>
      val query = SQL("insert into .........")
      query()
    
  
)

如果您正在执行大量阻塞 I/O,那么最好创建一个单独的线程池/执行上下文并在该池中执行所有阻塞调用。

参考文献:

scala-best-practices

demystifying-the-blocking-construct-in-scala-futures

希望这会有所帮助。

【讨论】:

请注意blocking 只是一个建议:***.com/questions/29068064/… 和***.com/questions/19681389/…

以上是关于Scala:带有地图的期货,用于 IO/CPU 绑定任务的 flatMap的主要内容,如果未能解决你的问题,请参考以下文章

期货未终止的Scala主类

为啥我的 scala 期货效率不高?

Scala 期货 - 如何在完成时结束?

Scala 中的异步 IO 与期货

Scala 捕获一系列期货引发的错误

scala-cats EitherT:链接期货