scala actor 中的操作之间的依赖关系

Posted

技术标签:

【中文标题】scala actor 中的操作之间的依赖关系【英文标题】:Dependency between operations in scala actors 【发布时间】:2010-03-17 17:46:04 【问题描述】:

我正在尝试使用 scala 演员并行化代码。这是我的第一个真正的演员代码,但我在 C 中使用 Java 多线程和 MPI 有一些经验。但是我完全迷路了。

我要实现的工作流是一个循环流水线,可以这样描述:

每个workeractor 都有对另一个actor 的引用,从而形成一个圆圈 有一个 coordinator actor 可以通过发送 StartWork() 消息来触发计算 当worker收到StartWork()消息时,它会在本地处理一些东西并将DoWork(...)消息发送给它在圈子中的邻居。 邻居做一些其他事情,然后依次向自己的邻居发送DoWork(...) 消息。 这种情况一直持续到初始工作人员收到 DoWork() 消息。 协调者可以向初始工作人员发送GetResult() 消息并等待回复。

关键是协调器应该只在数据准备好时接收结果。 工人如何在回复GetResult() 消息之前等待作业返回?

为了加快计算速度,任何工作人员都可以随时收到StartWork()

这是我第一次尝试对 worker 的伪实现:

class Worker( neighbor: Worker, numWorkers: Int ) 
   var ready = Foo()
   def act() 
     case StartWork() =>  
       val someData = doStuff()
       neighbor ! DoWork( someData, numWorkers-1 ) 
       
     case DoWork( resultData, remaining ) => if( remaining == 0 ) 
         ready = resultData
        else 
         val someOtherData = doOtherStuff( resultData )
         neighbor ! DoWork( someOtherData, remaining-1 )
      
    case GetResult() => reply( ready )
  

在协调者方面:

worker ! StartWork()
val result = worker !? GetResult() // should wait

【问题讨论】:

【参考方案1】:

首先,您显然需要对构成单件作品的内容有一些标识符,以便GetResult 可以得到正确的结果。我想显而易见的解决方案是让你的演员保留一个 Map 的结果和一个 Map 任何等待的getter

class Worker( neighbor: Worker, numWorkers: Int ) 
   var res: Map[Long, Result] = Map.empty
   var gets: Map[Long, OutputChannel[Any]] = Map.empty   
   def act() 
     ...
     case DoWork( id, resultData, remaining ) if remaining == 0 =>
       res += (id -> resultData)
       gets.get(id).foreach(_ ! res(id)) //reply to getters when result is ready
       gets -= id //clear out getter map now?
     case GetResult(id) if res.isDefinedAt(d) => //result is ready
       reply (res(id))
     case GetResult(id) => //no result ready 
       gets += (id -> sender)
   

注意:在匹配条件中使用if可以使消息处理更清晰一些

【讨论】:

感谢您的回答。我会尽快尝试。顺便说一句,我认为 => 之后的 if 在这种情况下是正确的。我不是在匹配参数时寻找守卫,但我想根据一个值有两种不同的行为。也许我应该使用两个 case 条目,带有不同的警卫。 哦,是的。所以它是 - 我正在阅读=> 去别的地方【参考方案2】:

另一种选择是:

class Worker( neighbor: Worker, numWorkers: Int ) 
   var ready = Foo()
   def act() 
     case StartWork() =>  
       val someData = doStuff()
       neighbor ! DoWork( someData, numWorkers-1 ) 
       
     case DoWork( resultData, remaining ) => if( remaining == 0 ) 
         ready = resultData
         react 
           case GetResult() => reply( ready )
         
        else 
         val someOtherData = doOtherStuff( resultData )
         neighbor ! DoWork( someOtherData, remaining-1 )
      
  

工作完成后,这个worker会一直卡住,直到收到GetResult消息。另一方面,协调者可以立即发送GetResult,因为它将一直保留在邮箱中,直到工作人员收到为止。

【讨论】:

真的很好。我没有意识到可以嵌入反应块。但是,这不是我的问题的解决方案,因为(如果我理解正确的话)工人将被困在内部 react 等待 GetResult() 并且将无法成为管道的一部分。 @paradigmatic 只有当结果准备好时它才会停止等待GetResult,但我的意思是真正表明你可以级联反应。

以上是关于scala actor 中的操作之间的依赖关系的主要内容,如果未能解决你的问题,请参考以下文章

scala当中的Actor并发编程

Oracle:如何循环并查找表之间的依赖/依赖关系,以便为每个表执行插入/更新操作?

Scala学习之路 Scala的Actor

Scala依赖注入

类之间的依赖关系和组合关系

类之间的关系