Scala 期货 - 如何在完成时结束?

Posted

技术标签:

【中文标题】Scala 期货 - 如何在完成时结束?【英文标题】:Scala futures - how to end on completion? 【发布时间】:2021-09-21 23:54:52 【问题描述】:

我从一位前同事那里继承了一些代码,他开始使用期货(在 Scala 中)来处理 Databricks 中的一些数据。

我将它分成在相似时间段内完成的块。但是没有输出,我知道他们没有使用 onSuccess 或 Await 或任何东西。

问题是,代码完成运行(不返回输出)但 Databricks 中的块一直执行到 thread.sleep() 部分。

我是 Scala 和期货的新手,我不确定如何在所有期货完成运行后退出笔记本(我应该在未来块之后使用 dbutils.notebook.exit() 吗?)

代码如下:

import scala.concurrent.Future, blocking, Await
import java.util.concurrent.Executors
import scala.concurrent.ExecutionContext
import com.databricks.WorkflowException

val numNotebooksInParallel = 15
  // If you create too many notebooks in parallel the driver may crash when you submit all of the jobs at once. 
  // This code limits the number of parallel notebooks.

implicit val ec = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(numNotebooksInParallel))
  
  val ctx = dbutils.notebook.getContext()
  // The simplest interface we can have but doesn't
  // have protection for submitting to many notebooks in parallel at once

println("starting parallel jobs... hang tight")

Future 
  
        process("pro","bseg")

        process("prc","bkpf")

        process("prc","bseg")
        
        process("pr4","bkpf")

        process("pr4","bseg")

        println("done with future1")

      
Future 
        
        process("pr5","bkpf")
        
        process("pr5","bseg")
        
        process("pri","bkpf") 
        
        process("pri","bseg")
        
        process("pr9","bkpf")

        println("done with future2")
  
      
Future  
        
        process("pr9","bseg")
        
        process("prl","bkpf") 
        
        process("prl","bseg")
        
        process("pro","bkpf")

        println("done with future3")

      
  println("finished futures - yay! :)")

  Thread.sleep(5*60*60*1000)
  println("thread timed out after 5 hrs... hope it all finished.")

【问题讨论】:

【参考方案1】:

通常会将期货保存为值:

val futs = Seq(
  Future 
    process("pro","bseg")
    // and so on
  ,
  // then the other futures
)

然后对期货进行操作:

import scala.concurrent.Await
import scala.concurrent.duration._

Await.result(Future.sequence(futs), 5.hours)

Future.sequence 将在第一个失败或全部成功后停止。如果您希望它们全部运行,即使其中一个失败,您可以执行类似的操作

Await.result(
  futs.foldLeft(Future.unit)  (_, f) =>
    f.recover 
      case _ => ()
    
  ,
  5.hours
)

【讨论】:

谢谢,我明天试试看效果如何! 看起来这正是我所需要的 - 谢谢

以上是关于Scala 期货 - 如何在完成时结束?的主要内容,如果未能解决你的问题,请参考以下文章

如何将 Scala 期货与超时联系起来?

如何将 Scala ARM 与期货一起使用?

在 Scala 期货中,我应该让我的函数返回 Future 还是返回 Try?

如何将自删除期货存储在列表中

如何使涉及期货尾递归的函数?

concurrent.futures.as_completed如何工作?