AkkaAkka Actor生命周期

Posted 九师兄

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了AkkaAkka Actor生命周期相关的知识,希望对你有一定的参考价值。

1.概述

转载:Akka Actor生命周期 用于自我学习。

Actor的生命周期是使用Hooks体现和控制的,我们可以重写相关的Hooks,从而实现对Actor生命周期各环节的细粒度控制。各事件发生顺序关系如下:

prestart():在构造函数之后调用。
postStop():在重启之前调用。
preRestart(reason, message):默认情况下会调用 postStop()postRestart():默认情况下会调用 preStart()

注意: preRestart 和 postRestart 只在重启的时候才会被调用。它们默认调用了 preStartpostStop,但是调用它们的时候就不再直接调用 preStartpostStop 了。 这样我们就能够决定, 到底是只在 Actor 启动或停止的时候调用一次 preStart 和postStop,还是每次重启一个 Actor 的时候就调用 preStart 和postStop。

从上图我们可以发现Actor的生命周期主要包含三个状态:启动、停止和重启

2.启动Start策略

Start策略一般用于初始化资源,调用preStart Hook。当Akka通过Props构建一个Actor后,会调用构造函数,之后调用preStart。


override def preStart=
    log.info ("Starting storage actor...")
    initDB
  

3.停止Stop策略

Stop策略一般用于回收资源,一个Actor可能因为完成运算、发生异常又或者人为通过发送Kill,PoisonPill强行终止等而进入停止(stopping)状态。而这个终止过程分为两步:

  • Actor将挂起对邮箱的处理,并向所有子Actor发送终止命令,然后处理来自子Actor的终止消息直到所有的子Actor都完成终止。
  • 终止自己,调用postStop方法,清空邮箱,向DeathWatch发布Terminated,通知其监管者。

整个人过程保证Actor系统中的子树以一种有序的方式终止,将终止命令传播到叶子结点并收集它们回送的确认消息给被终止的监管者。如果其中某个Actor没有响应(即由于处理消息用了太长时间以至于没有收到终止命令),整个过程将会被阻塞。

override def postStop=
  
   log.info ("Stopping storage actor...")
   db.release
 

4.重启Restart策略

Restart策略是最为复杂的一种情况,在一个Actor的生命周期里可能因为多种原因发生重启。造成一个Actor需要重启的原因可能有下面几个:

  1. 在处理某特定消息时造成了系统性的异常,必须通过重启来清理系统错误;

  2. 内部状态毁坏,必须通过重启来重新构建状态;

  3. 在处理消息时无法使用到一些依赖资源,需要重启来重新配置资源;

Actor的重启过程也是一个递归的过程,由于其过程比较复杂,先上个图:


在默认情况下 ,重启过程主要分为以下几步:

  1. actor被挂起;
  2. 调用旧实例的supervisionStrategy.handleSupervisorFailing方法(缺省实现为挂起所有的子actor);
  3. 调用preRestart方法,从上面的源码可以看出来,preRestart方法将所有的children stop掉了!(Stop动作,大家注意!),并调用postStop回收资源;
  4. 调用旧实例的supervisionStrategy.handleSupervisorRestarted方法(缺省实现为向所有剩下的子actor发送重启请求);
  5. 等待所有子actor终止直到preRestart最终结束;
  6. 再次调用之前提供的actor工厂创建新的actor实例;
  7. 对新实例调用postRestart;
  8. 恢复运行新的actor;

缺省实现:比如说构造方法不带参数的,就是一个缺省实现。

4.1 常见疑难点

Restart策略,和Stop策略有什么不同的地方?

Stop策略会调用postStop(),Restart策略也会调用postStop(),但是Restart策略不是通过Stop策略来停止旧的Actor,UID和Path都没变。也就是说,在被Restart之后,不用重新获取ActorRef

Actor是由UID和Path来唯一标识的,也就是说ActorRef也是通过UID和Path来定位。在Actor被Stop之后,新的Actor是可以用这个Path的,但是旧的ActorRef是不能用的,因为UID不一样。

preRestart Hook有什么特别之处?

默认的preRestart Hook会将所有的Children通过Stop策略停止,这个时候Children就是通过Stop策略->Start策略启动的,而不是被递归Restart。如果有外部的Actor持有旧的Chidren ActorRef,那这个Ref就是不能用的,因为虽然Path是对的,但是UID已经变了!

postRestart Hook有什么特别之处?

默认postRestart是调用preStart(),这样在重启的过程中,构造函数和preStart方法都会被重新调用,如果有个资源只想初始化一次,那么就必须重写掉这个方法。所以一般创建children是放在preStart里面。

override def preStart(): Unit = 
// 初始化children


// 重写postRestart防止preStart每次重启都被调用
override def postRestart(reason: Throwable): Unit = ()


override def preRestart(reason: Throwable, message: Option[Any]): Unit = 
  // 任然要清理自己,但是不Stop children
  postStop()

5.Actor生命周期测试代码

ParentActor.scala :接收从 Main.scala 里发送的消息,初始化或控制 ChildActor。

import akka.actor.ActorActorLoggingPropsReceiveTimeout

/**
 * @author li.pan
 * @version 1.0.0
 * @Description 父Actor
 * @createTime 2021年01月13日 13:04:00
 */
class ParentActor extends Actor with ActorLogging 
  println("start pActor ")

  def receive = 
    case "test" => log.info("received test")
    case ("newChild", name: String) => context.actorOf(Props[ChildActor], name)
    case ("stop", name: String) => 
      val child = context.actorFor(self.path + "/" + name);
      context.stop(child)
    
    case ReceiveTimeout => throw new RuntimeException("received timeout"); // 每隔超时时间没收到消息就抛出异常
    case "suicide" =>
    case x: Any => log.info("received unknown message :" + x)
  

  /**
   * 在 actor 实例化后执行,重启时不会执行
   */
  override def preStart 
    println("actor:" + self.path + ", parent preStart ")
  

  /**
   * 在 actor 正常终止后执行,异常重启时不会执行。
   */
  override def postStop 
    println("actor:" + self.path + ",parent postStop .")
  

  /**
   * 在 actor 异常重启前保存状态
   */
  override def preRestart(reason: Throwable, message: Option[Any]) 
    println("actor:" + self.path + ", preRestart parent, reason:" + reason + ", message:" + message)
  

  /**
   * 在 actor 异常重启后恢复状态
   */
  override def postRestart(reason: Throwable) 
    println("actor:" + self.path + ", postRestart parent, reason:" + reason)
  

ChildActor.scala:子 Actor,被 ParentActor 控制。

ackage com.lp.akka.notes.lifecycle

import akka.actor.Actor

/**
 * @author li.pan
 * @version 1.0.0
 * @Description 子 Actor
 * @createTime 2021年01月13日 13:03:00
 */
class ChildActor extends Actor 

  override def receive() = 
    case "abc" => println("get abc string ")
    case "exception" => throw new NullPointerException()
    case _ => println("children cann't handle unknown message")
  

  override def preStart 
    println("actor:" + self.path + ",child preStart .")
  

  override def postStop 
    println("actor:" + self.path + ",child postStop .")
  

  override def preRestart(reason: Throwable, message: Option[Any]) 
    println("actor:" + self.path + ",preRestart child, reason:" + reason + ", message:" + message)
  

  override def postRestart(reason: Throwable) 
    println("actor:" + self.path + ",postRestart child, reason:" + reason)
  

LifeCycleMainApp:初始化、发送消息给Actor。

package com.lp.akka.notes.lifecycle

import akka.actor.ActorSystemKillProps

/**
 * @author li.pan
 * @version 1.0.0
 * @Description 主类
 * @createTime 2021年01月13日 13:05:00
 */
object LifeCycleMainApp extends App 

  // 构建Actor系统
  val system = ActorSystem("lpLocalSys")

  // 通过Props方式创建父Actor
  val pActor = system.actorOf(Props[ParentActor], name = "pActor")

  pActor ! ("newChild""child-1")
  pActor ! ("newChild""child-2")
  pActor ! "test"

  val parent = system.actorSelection("akka://lpLocalSys/user/pActor")
  parent ! "test"
  //    parent ! ("stop", "child-2")

  val child2 = system.actorSelection("akka://lpLocalSys/user/pActor/child-2")
  child2 ! Kill // 杀死 child2
  //    child2 ! "exception"
  Thread.sleep(5000) // 等待 child2  被杀死
  pActor ! ("newChild""child-2")
  //    Thread.sleep(5000)
  //    myActor ! ("newChild", "child-2")

运行结果:


在 LifeCycleMainApp 里,如果不等的 child-2 被杀死就创建同名的 actor将导致名为 myactor 的父 actor 异常,使它重启,而根据 Akka Actor 的监管策略,它也会重启它的子 Actor,所以 child-1 和child-2 也会被重启,输出如下

从输出可以看到:父 Actor 首先调用 preRestart ,然后被实例化,再调用 postRestart,最后再重启它的子 Actor,子 Actor 也遵循上述的步骤。

以上是关于AkkaAkka Actor生命周期的主要内容,如果未能解决你的问题,请参考以下文章

Scala框架Akka学习

AkkaAkka容错处理

AkkaAkka 传递消息

(转载)Actor 生命周期

akkaakka源码 Akka源码分析-FSM

1.Actor编写-ESGrain与ESRepGrain