Scala:带有地图的期货,用于 IO/CPU 绑定任务的 flatMap
Posted
技术标签:
【中文标题】Scala:带有地图的期货,用于 IO/CPU 绑定任务的 flatMap【英文标题】:Scala : Futures with map, flatMap for IO/CPU bound tasks 【发布时间】:2017-06-27 14:40:26 【问题描述】:我知道在 Java 8 中,在流框架的 filter、map 等方法中启动长时间运行的任务并不是一个好主意,因为没有办法调整底层的 fork-join 池,这可能会导致延迟问题和饥饿。
现在,我的问题是,Scala 有这样的问题吗?我试图用谷歌搜索它,但我想我不能把这个问题放在一个谷歌可用的句子中。
假设我有一个对象列表,我想使用 forEach 将它们保存到数据库中,这会导致任何问题吗?我想这在 Scala 中不是问题,因为函数转换是该语言的基本构建块,但无论如何......
【问题讨论】:
【参考方案1】:如果您没有看到任何类型的 I/O 操作,那么使用期货可能会产生开销。
def add(x: Int, y: Int) = Future x + y
在 Future 构造函数中执行纯 CPU 绑定操作会使您的逻辑执行速度变慢,而不是更快。对它们进行映射和平面映射可能会加剧这个问题。
如果你想用一个常数/简单的计算来初始化一个Future
,你可以使用Future.successful()
。
但是所有阻塞 I/O,包括 SQL 查询,都可以用blocking
包装在Future
中
例如:
Future
DB.withConnection implicit connection =>
val query = SQL("select * from bar")
query()
应该这样做,
import scala.concurrent.blocking
Future
blocking
DB.withConnection implicit connection =>
val query = SQL("select * from bar")
query()
这个blocking
通知线程池这个任务正在阻塞。这允许池根据需要临时产生新的工人。这样做是为了防止在阻塞应用程序时出现饥饿。
线程池(默认为scala.concurrent.ExecutionContext.global
池)知道blocking
中的代码何时完成。(因为它是一个fork join 线程池)
因此,它会在空闲工作线程完成时移除它们,并且池将随着时间的推移收缩回其预期大小(默认为核心数)。
但是如果没有足够的内存来扩展线程池,这种情况也会适得其反。
所以对于你的场景,你可以使用
images.foreach(i =>
import scala.concurrent.blocking
Future
blocking
DB.withConnection implicit connection =>
val query = SQL("insert into .........")
query()
)
如果您正在执行大量阻塞 I/O,那么最好创建一个单独的线程池/执行上下文并在该池中执行所有阻塞调用。
参考文献:
scala-best-practices
demystifying-the-blocking-construct-in-scala-futures
希望这会有所帮助。
【讨论】:
请注意blocking
只是一个建议:***.com/questions/29068064/… 和***.com/questions/19681389/…以上是关于Scala:带有地图的期货,用于 IO/CPU 绑定任务的 flatMap的主要内容,如果未能解决你的问题,请参考以下文章