scala actor 中的操作之间的依赖关系
Posted
技术标签:
【中文标题】scala actor 中的操作之间的依赖关系【英文标题】:Dependency between operations in scala actors 【发布时间】:2010-03-17 17:46:04 【问题描述】:我正在尝试使用 scala 演员并行化代码。这是我的第一个真正的演员代码,但我在 C 中使用 Java 多线程和 MPI 有一些经验。但是我完全迷路了。
我要实现的工作流是一个循环流水线,可以这样描述:
每个workeractor 都有对另一个actor 的引用,从而形成一个圆圈 有一个 coordinator actor 可以通过发送StartWork()
消息来触发计算
当worker收到StartWork()
消息时,它会在本地处理一些东西并将DoWork(...)
消息发送给它在圈子中的邻居。
邻居做一些其他事情,然后依次向自己的邻居发送DoWork(...)
消息。
这种情况一直持续到初始工作人员收到 DoWork()
消息。
协调者可以向初始工作人员发送GetResult()
消息并等待回复。
关键是协调器应该只在数据准备好时接收结果。 工人如何在回复GetResult()
消息之前等待作业返回?
为了加快计算速度,任何工作人员都可以随时收到StartWork()
。
这是我第一次尝试对 worker 的伪实现:
class Worker( neighbor: Worker, numWorkers: Int )
var ready = Foo()
def act()
case StartWork() =>
val someData = doStuff()
neighbor ! DoWork( someData, numWorkers-1 )
case DoWork( resultData, remaining ) => if( remaining == 0 )
ready = resultData
else
val someOtherData = doOtherStuff( resultData )
neighbor ! DoWork( someOtherData, remaining-1 )
case GetResult() => reply( ready )
在协调者方面:
worker ! StartWork()
val result = worker !? GetResult() // should wait
【问题讨论】:
【参考方案1】:首先,您显然需要对构成单件作品的内容有一些标识符,以便GetResult
可以得到正确的结果。我想显而易见的解决方案是让你的演员保留一个 Map
的结果和一个 Map
任何等待的getter:
class Worker( neighbor: Worker, numWorkers: Int )
var res: Map[Long, Result] = Map.empty
var gets: Map[Long, OutputChannel[Any]] = Map.empty
def act()
...
case DoWork( id, resultData, remaining ) if remaining == 0 =>
res += (id -> resultData)
gets.get(id).foreach(_ ! res(id)) //reply to getters when result is ready
gets -= id //clear out getter map now?
case GetResult(id) if res.isDefinedAt(d) => //result is ready
reply (res(id))
case GetResult(id) => //no result ready
gets += (id -> sender)
注意:在匹配条件中使用if
可以使消息处理更清晰一些
【讨论】:
感谢您的回答。我会尽快尝试。顺便说一句,我认为=>
之后的 if 在这种情况下是正确的。我不是在匹配参数时寻找守卫,但我想根据一个值有两种不同的行为。也许我应该使用两个 case
条目,带有不同的警卫。
哦,是的。所以它是 - 我正在阅读=>
去别的地方【参考方案2】:
另一种选择是:
class Worker( neighbor: Worker, numWorkers: Int )
var ready = Foo()
def act()
case StartWork() =>
val someData = doStuff()
neighbor ! DoWork( someData, numWorkers-1 )
case DoWork( resultData, remaining ) => if( remaining == 0 )
ready = resultData
react
case GetResult() => reply( ready )
else
val someOtherData = doOtherStuff( resultData )
neighbor ! DoWork( someOtherData, remaining-1 )
工作完成后,这个worker会一直卡住,直到收到GetResult
消息。另一方面,协调者可以立即发送GetResult
,因为它将一直保留在邮箱中,直到工作人员收到为止。
【讨论】:
真的很好。我没有意识到可以嵌入反应块。但是,这不是我的问题的解决方案,因为(如果我理解正确的话)工人将被困在内部react
等待 GetResult()
并且将无法成为管道的一部分。
@paradigmatic 只有当结果准备好时它才会停止等待GetResult
,但我的意思是真正表明你可以级联反应。以上是关于scala actor 中的操作之间的依赖关系的主要内容,如果未能解决你的问题,请参考以下文章