从主管重新启动后向参与者发送消息

Posted

技术标签:

【中文标题】从主管重新启动后向参与者发送消息【英文标题】:Send message to actor after restart from Supervisor 【发布时间】:2018-01-25 15:16:20 【问题描述】:

我正在使用 BackoffSupervisor 策略来创建一个必须处理某些消息的子角色。我想实现一个非常简单的重启策略,在异常情况下:

    孩子将失败消息传播给主管

    主管重新启动子进程并再次发送失败消息。

    主管重试 3 次后放弃

    Akka 持久性不是一种选择

到目前为止,我拥有的是这样的:

主管定义:

val childProps = Props(new SenderActor())
val supervisor = BackoffSupervisor.props(
  Backoff.onFailure(
    childProps,
    childName = cmd.hashCode.toString,
    minBackoff = 1.seconds,
    maxBackoff = 2.seconds,
    randomFactor = 0.2 
  )
    .withSupervisorStrategy(
      OneForOneStrategy(maxNrOfRetries = 3, loggingEnabled = true) 
        case msg: MessageException => 
          println("caught specific message!")
          SupervisorStrategy.Restart
        
        case _: Exception => SupervisorStrategy.Restart
        case _              ⇒ SupervisorStrategy.Escalate
      )
)

val sup = context.actorOf(supervisor)


sup ! cmd

应该发送电子邮件但失败(抛出一些异常)并将异常传播回主管的子actor:

class SenderActor() extends Actor 

  def fakeSendMail():Unit =  
    Thread.sleep(1000)
    throw new Exception("surprising exception")
   

  override def receive: Receive = 
    case cmd: NewMail =>

      println("new mail received routee")
      try 
        fakeSendMail()
       catch 
        case t => throw MessageException(cmd, t)
      

  

在上面的代码中,我将任何异常包装到自定义类 MessageException 中,然后将其传播到 SupervisorStrategy,但是如何将其进一步传播到新的子类以强制重新处理?这是正确的方法吗?

编辑。我试图在 preRestart 钩子上向 Actor 重新发送消息,但不知何故没有触发钩子:

class SenderActor() extends Actor 

  def fakeSendMail():Unit =  
    Thread.sleep(1000)
    //    println("mail sent!")
    throw new Exception("surprising exception")
  

  override def preStart(): Unit = 
    println("child starting")
  


  override def preRestart(reason: Throwable, message: Option[Any]): Unit = 
    reason match 
      case m: MessageException => 
        println("aaaaa")
        message.foreach(self ! _)
      
      case _ => println("bbbb")
    
  

  override def postStop(): Unit = 
    println("child stopping")
  

  override def receive: Receive = 
    case cmd: NewMail =>

      println("new mail received routee")
      try 
        fakeSendMail()
       catch 
        case t => throw MessageException(cmd, t)
      

  

这给了我类似于以下输出的东西:

new mail received routee
caught specific message!
child stopping
[ERROR] [01/26/2018 10:15:35.690]
[example-akka.actor.default-dispatcher-2]
[akka://example/user/persistentActor-4-scala/$a/1962829645] Could not
process message sample.persistence.MessageException:
Could not process message <stacktrace>
child starting

但是没有来自preRestart钩子的日志

【问题讨论】:

使用Backoff.onFailure,当BackoffSupervisor的子进程重启时,不会调用子进程的preRestart方法,因为underlying supervisor实际上是停止子进程,稍后再启动。 正是问题所在。有什么办法可以解决这个问题并且仍然可以重用退避主管? 【参考方案1】:

子的preRestart钩子没有被调用的原因是因为Backoff.onFailure在幕后使用BackoffOnRestartSupervisor,它用与退避一致的停止和延迟启动行为替换了默认的重启行为政策。换句话说,当使用Backoff.onFailure时,当一个child重新启动时,child的preRestart方法没有被调用,因为底层的supervisor实际上停止了child,然后稍后再启动它。 (使用Backoff.onStop 可以触发孩子的preRestart 钩子,但这与目前的讨论相切。)

BackoffSupervisor API 不支持在主管的孩子重新启动时自动重新发送消息:您必须自己实现此行为。重试消息的一个想法是让BackoffSupervisor 的主管处理它。例如:

val supervisor = BackoffSupervisor.props(
  Backoff.onFailure(
    ...
  ).withReplyWhileStopped(ChildIsStopped)
  ).withSupervisorStrategy(
    OneForOneStrategy(maxNrOfRetries = 3, loggingEnabled = true) 
      case msg: MessageException =>
        println("caught specific message!")
        self ! Error(msg.cmd) // replace cmd with whatever the property name is
        SupervisorStrategy.Restart
      case ...
    )
)

val sup = context.actorOf(supervisor)

def receive = 
  case cmd: NewMail =>
    sup ! cmd
  case Error(cmd) =>
    timers.startSingleTimer(cmd.id, Replay(cmd), 10.seconds)
    // We assume that NewMail has an id field. Also, adjust the time as needed.
  case Replay(cmd) =>
    sup ! cmd
  case ChildIsStopped =>
    println("child is stopped")

在上面的代码中,嵌入在MessageException 中的NewMail 消息被包装在一个自定义案例类中(为了便于将其与“正常”/新的NewMail 消息区分开来)并发送到@987654337 @。在这种情况下,self 是创建 BackoffSupervisor 的参与者。然后,这个封闭的演员使用single timer 在某个时候重播原始消息。这个时间点在未来应该足够远,以至于BackoffSupervisor 可能会耗尽SenderActor 的重新启动尝试,以便孩子在收到重新发送的消息之前有足够的机会进入“良好”状态.显然这个例子只涉及一个消息重发,不管子重启的次数。


另一个想法是为每个NewMail 消息创建一个BackoffSupervisor-SenderActor 对,并让SenderActorpreStart 挂钩中将NewMail 消息发送给它自己。这种方法的一个问题是清理资源。即,当处理成功或孩子重新启动用尽时,关闭BackoffSupervisors(这将反过来关闭它们各自的SenderActor 孩子)。 NewMail ids 到 (ActorRef, Int) 元组的映射(其中 ActorRef 是对 BackoffSupervisor 演员的引用,Int 是重新启动尝试的次数)在这种情况下会有所帮助:

class Overlord extends Actor 

  var state = Map[Long, (ActorRef, Int)]() // assuming the mail id is a Long

  def receive = 
    case cmd: NewMail =>
      val childProps = Props(new SenderActor(cmd, self))
      val supervisor = BackoffSupervisor.props(
        Backoff.onFailure(
          ...
        ).withSupervisorStrategy(
          OneForOneStrategy(maxNrOfRetries = 3, loggingEnabled = true) 
            case msg: MessageException =>
              println("caught specific message!")
              self ! Error(msg.cmd)
              SupervisorStrategy.Restart
            case ...
          )
      )
      val sup = context.actorOf(supervisor)
      state += (cmd.id -> (sup, 0))

    case ProcessingDone(cmdId) =>
      state.get(cmdId) match 
        case Some((backoffSup, _)) =>
          context.stop(backoffSup)
          state -= cmdId
        case None =>
          println(s"$cmdId not found")
      

    case Error(cmd) =>
       val cmdId = cmd.id
       state.get(cmdId) match 
         case Some((backoffSup, numRetries)) =>
           if (numRetries == 3) 
             println(s"$cmdId has already been retried 3 times. Giving up.")
             context.stop(backoffSup)
             state -= cmdId
            else
             state += (cmdId -> (backoffSup, numRetries + 1))
         case None =>
           println(s"$cmdId not found")
       

    case ...
  

请注意,上面示例中的SenderActorNewMailActorRef 作为构造函数参数。后一个参数允许SenderActor 将自定义ProcessingDone 消息发送到封闭的actor:

class SenderActor(cmd: NewMail, target: ActorRef) extends Actor 
  override def preStart(): Unit = 
    println(s"child starting, sending $cmd to self")
    self ! cmd
  

  def fakeSendMail(): Unit = ...

  def receive = 
    case cmd: NewMail => ...
  

显然,SenderActor 在当前实现 fakeSendMail 时每次都会失败。我将在SenderActor 中留下实现快乐路径所需的额外更改,其中SenderActorProcessingDone 消息发送给target,给你。

【讨论】:

【参考方案2】:

在@chunjef 提供的良好解决方案中,他提醒在退避主管启动工人之前安排重新发送作业的风险

这个封闭的actor然后使用一个计时器在某个时间点重播原始消息。这个时间点在未来应该足够远,这样 BackoffSupervisor 可能会耗尽 SenderActor 的重新启动尝试,以便孩子在收到重新发送的消息之前有足够的机会进入“良好”状态。

如果发生这种情况,情况将是工作陷入僵局,并且不会有进一步的进展。 我用this scenario做了一个简化的小提琴。

因此,计划延迟应该大于 maxBackoff,这可能会影响作业完成时间。 避免这种情况的一个可能解决方案是让工人演员在准备好工作时向他的父亲发送消息,例如here。

【讨论】:

【参考方案3】:

失败的子actor可用作您的主管策略中的发送者。引用https://doc.akka.io/docs/akka/current/fault-tolerance.html#creating-a-supervisor-strategy:

如果策略是在监督参与者内部声明的(相反 在伴随对象内)它的决策者可以访问所有内部 以线程安全的方式显示actor的状态,包括获取一个 引用当前失败的孩子(可作为 失败消息)。

【讨论】:

“如果策略是在监督参与者内部声明的(而不是在伴随对象内)......” 这种情况下的策略没有在监督参与者内部声明演员,但在 BackoffOptions#withSupervisorStrategy 方法中。【参考方案4】:

在您的情况下,使用某些第三方软件发送电子邮件是一项危险的操作。为什么不应用Circuit Breaker 模式并完全跳过发送者参与者?此外,您仍然可以在其中包含一个演员(带有一些退避监督员)和断路器(如果这对您有意义的话)。

【讨论】:

以上是关于从主管重新启动后向参与者发送消息的主要内容,如果未能解决你的问题,请参考以下文章

主管不会在 econnrefused 上重新启动(在 init/1 中抛出)

重新启动 => erlang 主管的瞬态与永久

如何让主管重新启动挂起的工人?

Elixir - 基本主管设置崩溃而不是重新启动子进程

主管的所有孩子都死后会发生啥?

重新启动节点时如何使节点检查器重新启动?