如何在 Scala 中优先考虑演员之间发送的消息?

Posted

技术标签:

【中文标题】如何在 Scala 中优先考虑演员之间发送的消息?【英文标题】:How to prioritize messages sent between actors in Scala? 【发布时间】:2015-09-11 07:44:44 【问题描述】:

我正在尝试从子角色到父角色捕获“终止信号”,但是在死信消息池中,信号无法到达父角色的队列。 解决此问题的最佳方法是什么?

这是我正在处理的 sn-p 代码:

class MinerActor extends Actor 
   var count:Int = 0
   def receive = 
       case Mine =>
            //some task here     
            //if success
               count = count + 1
            //
            if (count >= 100) 
            
                context.stop(self) 
            


class MasterActor extends Actor 
    val miner = context.actorOf(Props(new MinerActor,name = "miner")
    context.watch(miner)

    def receive = 
      case Foo => 
            while (true) 
              miner ! Mine
            

      case Terminated(miner) =>
            println("Miner Terminated!!")
            context.stop(self)
            context.system.shutdown
    

这里永远不会调用“终止(矿工)”案例。相反,在标准输出上,我看到很多从 Master 发送到 Miner 的死信消息(这是一种预期的矿工演员停止)。但是如何在 Event 总线上对 Terminate 信号进行优先级排序,以便到达 Master Actor?

如果我将 while 循环限制为大约 200 个而不是无穷大,则在 100 个死信消息之后,我会收到打印“矿工终止!!”的终止信号。但是当while循环无限大时如何实现呢?

我是 Scala/Akka 编程的新手,我的主要目标是成功运行 '//some task' 100 次,然后退出整个程序。这是完成这项任务的好方法吗?

【问题讨论】:

【参考方案1】:

替换:

case Foo => 
  while (true) 
    miner ! Mine
  

case Foo =>
  miner ! Mine
  self forward Foo

【讨论】:

成功了!感谢您展示“前进”功能。【参考方案2】:

问题是你无限的while循环阻塞了actor线程。因此,您的主要参与者总是卡在处理第一个到达的Foo 消息,并且永远不会处理邮箱中的任何其他消息。原因是只有一个线程负责接收消息。这有一些非常好的含义,因为如果您的状态更改仅发生在此线程中,您基本上不必担心单个参与者内的并发性。

有多种方法可以解决这个问题。我建议使用调度程序来安排重复任务。

class MasterActor extends Actor 
  var minerOption: Option[ActorRef] = None
  var mineMessageOption: Option[Cancellable] = None

  override def preStart: Unit = 
    minerOption = Some(context.actorOf(Props(new MinerActor,name = "miner")))

    minerOption.foreach(context.watch(_))

    import context.dispatcher

    mineMessageOption = Some(context.system.scheduler.schedule(0 seconds, 1 seconds, self, Foo))
  

  def receive = 
    case Foo =>
      minerOption.foreach 
        _ ! Mine
      

    case Terminated(miner) =>
      println("Miner Terminated!!")

      mineMessageOption.foreach(_.cancel())

      context.stop(self)
      context.system.shutdown
  

schedule 调用中,您可以定义消息Foo 的间隔,从而定义将多少条消息发送给矿工。

【讨论】:

调度,这是个好办法。作为 scala 的新手,我无法获得大部分代码,直觉上我可以理解这通过每秒安排一条消息来解决问题。但是我发现我可以通过将循环移动到“MinerActor”进行 1000 次迭代来修改代码,并向“Master”发送一条消息,如果 count != 100 则再次启动“miner”并终止程序。感谢您提供替代方法!

以上是关于如何在 Scala 中优先考虑演员之间发送的消息?的主要内容,如果未能解决你的问题,请参考以下文章

在 scala/akka 的计算之间检查参与者的消息查询

Python 和 Scala 程序之间的进程间通信

如何访问通过 guice 创建的 akka 系统?

如何在 Scala Akka 中停止 system.scheduler.schedule

scala演员和持久性上下文

Akka Scala 演员中的死信