Akka actor、Futures 和闭包
Posted
技术标签:
【中文标题】Akka actor、Futures 和闭包【英文标题】:Akka actors, Futures, and closures 【发布时间】:2012-06-23 04:57:03 【问题描述】:我在Akka docs 中读到,从封闭的actor 中关闭变量是很危险的。
警告
在这种情况下,您需要小心避免关闭 包含演员的引用,即不要在 在匿名 Actor 类中封装 Actor。这个会 破坏actor封装并可能引入同步错误 和竞争条件,因为其他参与者的代码将被安排 并发给封闭的演员。
现在,我有两个演员,其中一个向第二个请求一些东西,然后对结果做一些事情。在下面我整理的这个示例中,actor Accumulator 从actor NumberGenerator 中检索数字并将它们相加,同时报告总和。
这可以通过至少两种不同的方式完成,如本例所示,使用两个不同的 receive 函数(A vs B)。两者的区别在于 A 没有关闭 counter 变量;相反,它等待一个整数并将其相加,而 B 创建一个 Future 关闭 counter 并进行求和。如果我正确理解它的工作原理,这会发生在一个仅为处理 onSuccess 而创建的匿名参与者中。
import com.esotericsoftware.minlog.Log
import akka.actor.Actor, Props
import akka.pattern.ask, pipe
import akka.util.Timeout
import akka.util.duration._
case object Start
case object Request
object ActorTest
var wake = 0
val accRef = Main.actorSystem.actorOf(Props[Accumulator], name = "accumulator")
val genRef = Main.actorSystem.actorOf(Props[NumberGenerator], name = "generator")
Log.info("ActorTest", "Starting !")
accRef ! Start
class Accumulator extends Actor
var counter = 0
implicit val timeout = Timeout(5 seconds)
// A: WITHOUT CLOSURE
def receive =
case Start => ask(ActorTest.genRef, Request).mapTo[Int] pipeTo self
case x: Int => counter += x; Log.info("Accumulator", "counter = " + counter); self ! Start
// B: WITH CLOSURE
def receive =
case Start => ask(ActorTest.genRef, Request).mapTo[Int] onSuccess
case x: Int => counter += x; Log.info("Accumulator", "counter = " + counter); self ! Start
class NumberGenerator extends Actor
val rand = new java.util.Random()
def receive =
case Request => sender ! rand.nextInt(11)-5
在这种情况下使用闭包绝对是邪恶的吗?当然,我可以使用 AtomicInteger 而不是 Int,或者在某些网络场景中使用 netty,在 threadsafe 通道上发出写入操作,但这不是我的意思。
冒着问可笑的风险:有没有办法让 Future 的 onSuccess 在 this 演员而不是匿名中间演员中执行,没有在接收函数?
编辑
更清楚地说,我的问题是:有没有办法强制一系列 Futures 与给定的 Actor 在同一个线程中运行?
【问题讨论】:
【参考方案1】:问题是onSuccess
将在与actor 的receive
将运行的线程不同的线程中运行。您可以使用pipeTo
方法,或使用Agent。将counter
设为AtomicInteger
可以解决问题,但它并不那么干净——也就是说,它破坏了Actor 模型。
【讨论】:
【参考方案2】:实现这种设计的最简单方法是使用“即发即弃”语义:
class Accumulator extends Actor
private[this] var counter = 0
def receive =
case Start => ActorTest.genRef ! Request
case x: Int =>
counter += x
Log.info("Accumulator", "counter = " + counter)
self ! Start
此解决方案是完全异步的,您不需要任何超时。
【讨论】:
是的,如果我放弃使用 Futures,这将有效。在我的示例中,如果它们以 pipeTo self 结尾,则链式 Futures 很容易实现,但使用“即发即弃”语义就不再可能了。相反,我必须在 Accumulator 的接收函数中定义 N 条中间消息,以保证代码在此 Actor 的线程中运行。我想我可以再问一次,这一次更清楚:有没有办法强制一系列 Futures 与给定的 Actor 在同一个线程中运行? 为什么要保证Accumulator
Actor 总是在同一个线程中运行?这似乎与演员模型哲学背道而驰。期货也是如此:它们应该在线程池上分派,以最大限度地提高性能。如果它们都在链中的同一个线程中运行,那么您只需一个简单的顺序程序,就不再需要期货了...
其实不管是不是在同一个线程中运行,更多的要求是顺序运行,就像处理单个actor的消息一样。这不违背演员模型,它是演员模型。
您可以使用演员链接Future
s:只需使用?
(ask
) 而不是!
(tell
):您将得到一个Future
回,这你可以map
和fold
over:doc.akka.io/docs/akka/2.4.0/scala/futures.html以上是关于Akka actor、Futures 和闭包的主要内容,如果未能解决你的问题,请参考以下文章