Akka actor、Futures 和闭包

Posted

技术标签:

【中文标题】Akka actor、Futures 和闭包【英文标题】:Akka actors, Futures, and closures 【发布时间】:2012-06-23 04:57:03 【问题描述】:

我在Akka docs 中读到,从封闭的actor 中关闭变量是很危险的。

警告

在这种情况下,您需要小心避免关闭 包含演员的引用,即不要在 在匿名 Actor 类中封装 Actor。这个会 破坏actor封装并可能引入同步错误 和竞争条件,因为其他参与者的代码将被安排 并发给封闭的演员。

现在,我有两个演员,其中一个向第二个请求一些东西,然后对结果做一些事情。在下面我整理的这个示例中,actor Accumulator 从actor NumberGenerator 中检索数字并将它们相加,同时报告总和。

这可以通过至少两种不同的方式完成,如本例所示,使用两个不同的 receive 函数(A vs B)。两者的区别在于 A 没有关闭 counter 变量;相反,它等待一个整数并将其相加,而 B 创建一个 Future 关闭 counter 并进行求和。如果我正确理解它的工作原理,这会发生在一个仅为处理 onSuccess 而创建的匿名参与者中。

import com.esotericsoftware.minlog.Log

import akka.actor.Actor, Props
import akka.pattern.ask, pipe
import akka.util.Timeout
import akka.util.duration._

case object Start
case object Request


object ActorTest 
  var wake = 0

  val accRef = Main.actorSystem.actorOf(Props[Accumulator], name = "accumulator")
  val genRef = Main.actorSystem.actorOf(Props[NumberGenerator], name = "generator")

  Log.info("ActorTest", "Starting !")

  accRef ! Start


class Accumulator extends Actor 
  var counter = 0

  implicit val timeout = Timeout(5 seconds)

  // A: WITHOUT CLOSURE
  def receive = 
    case Start => ask(ActorTest.genRef, Request).mapTo[Int] pipeTo self
    case x: Int => counter += x; Log.info("Accumulator", "counter = " + counter); self ! Start
  
  // B: WITH CLOSURE
  def receive = 
    case Start => ask(ActorTest.genRef, Request).mapTo[Int] onSuccess 
      case x: Int => counter += x; Log.info("Accumulator", "counter = " + counter); self ! Start
    
  


class NumberGenerator extends Actor 
  val rand = new java.util.Random()

  def receive = 
    case Request => sender ! rand.nextInt(11)-5
  

在这种情况下使用闭包绝对是邪恶的吗?当然,我可以使用 AtomicInteger 而不是 Int,或者在某些网络场景中使用 netty,在 threadsafe 通道上发出写入操作,但这不是我的意思。

冒着问可笑的风险:有没有办法让 Future 的 onSuccess 在 this 演员而不是匿名中间演员中执行,没有接收函数?

编辑

更清楚地说,我的问题是:有没有办法强制一系列 Futures 与给定的 Actor 在同一个线程中运行?

【问题讨论】:

【参考方案1】:

问题是onSuccess 将在与actor 的receive 将运行的线程不同的线程中运行。您可以使用pipeTo 方法,或使用Agent。将counter 设为AtomicInteger 可以解决问题,但它并不那么干净——也就是说,它破坏了Actor 模型。

【讨论】:

【参考方案2】:

实现这种设计的最简单方法是使用“即发即弃”语义:

class Accumulator extends Actor 
  private[this] var counter = 0

  def receive = 
    case Start => ActorTest.genRef ! Request
    case x: Int => 
      counter += x
      Log.info("Accumulator", "counter = " + counter)
      self ! Start
    
  

此解决方案是完全异步的,您不需要任何超时。

【讨论】:

是的,如果我放弃使用 Futures,这将有效。在我的示例中,如果它们以 pipeTo self 结尾,则链式 Futures 很容易实现,但使用“即发即弃”语义就不再可能了。相反,我必须在 Accumulator 的接收函数中定义 N 条中间消息,以保证代码在此 Actor 的线程中运行。我想我可以再问一次,这一次更清楚:有没有办法强制一系列 Futures 与给定的 Actor 在同一个线程中运行? 为什么要保证Accumulator Actor 总是在同一个线程中运行?这似乎与演员模型哲学背道而驰。期货也是如此:它们应该在线程池上分派,以最大限度地提高性能。如果它们都在链中的同一个线程中运行,那么您只需一个简单的顺序程序,就不再需要期货了... 其实不管是不是在同一个线程中运行,更多的要求是顺序运行,就像处理单个actor的消息一样。这不违背演员模型,它演员模型。 您可以使用演员链接Futures:只需使用? (ask) 而不是! (tell):您将得到一个Future 回,这你可以mapfoldover:doc.akka.io/docs/akka/2.4.0/scala/futures.html

以上是关于Akka actor、Futures 和闭包的主要内容,如果未能解决你的问题,请参考以下文章

(转)Akka学习笔记

一 Akka学习 - actor

[scala] akka actor编程

Scala笔记整理:Actor和AKKA

Scala笔记整理:Actor和AKKA

二 Akka学习 - actor介绍