Akka之BackoffSupervisor

Posted junjiang3

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Akka之BackoffSupervisor相关的知识,希望对你有一定的参考价值。

一、背景

最近在开发一个项目,项目的各模块之间是使用akka grpc传输音频帧的,并且各模块中的actor分别都进行了persist。本周在开发过程中遇到了一个bug,就是音频帧在通行一段时间后,整个系统处于卡死状态,没有了反应。刚开始怀疑是akka grpc通信时,流中断了,或者没有传输过来,可是通过抓包和日志,发现流的每一帧已经接受到了。最后定位到问题时persist的原因,每次卡死之间都可以发现persit失败了。我们去看persist方法的源码可以发现上面的注解如下:

/**
* Asynchronously persists `event`. On successful persistence, `handler` is called with the
* persisted event. It is guaranteed that no new commands will be received by a persistent actor
* between a call to `persist` and the execution of its `handler`. This also holds for
* multiple `persist` calls per received command. Internally, this is achieved by stashing new
* commands and unstashing them when the `event` has been persisted and handled. The stash used
* for that is an internal stash which doesn‘t interfere with the inherited user stash.
*
* An event `handler` may close over persistent actor state and modify it. The `sender` of a persisted
* event is the sender of the corresponding command. This means that one can reply to a command
* sender within an event `handler`.
*
* Within an event handler, applications usually update persistent actor state using persisted event
* data, notify listeners and reply to command senders.
*
* If persistence of an event fails, [[#onPersistFailure]] will be invoked and the actor will
* unconditionally be stopped. The reason that it cannot resume when persist fails is that it
* is unknown if the event was actually persisted or not, and therefore it is in an inconsistent
* state. Restarting on persistent failures will most likely fail anyway, since the journal
* is probably unavailable. It is better to stop the actor and after a back-off timeout start
* it again.
*
* @param event event to be persisted
* @param handler handler for each persisted `event`
*/
def persist[A](event: A)(handler: A ? Unit): Unit = {
internalPersist(event)(handler)
}

从注解我们可以发现,我们在使用akka的persist特性时,如果持久化失败,相应的actor就会被stop掉。因此,如果你继续往它发消息是没有任何反应。并且这个注解建议我们使用Backoff来重启这个Actor。(建议,我们在把actor持久化到本地或者使用redis等插件持久化到数据库时,最好选择持久化的方法,不然会使用java持久化,会出现WARN,因为默认的java持久化效率不是很好)。因此,我们来学习一下,这种BackOff重启的方式。

二、普通的监控和重启

我们都知道Actor之间是有父子关系的,如果子Actor出现了异常,是会进行上报,并且使用策略来进行相应的处理,其中最常用的策略就是OneForOneStrategy,也就是只针对发生异常的Actor施用策略,策略中定义了对下属子级发生的各种异常的处理方式。而默认的处理方式是这样的:

final val defaultDecider: Decider = {
case _: ActorInitializationException ? Stop
case _: ActorKilledException ? Stop
case _: DeathPactException ? Stop
case _: Exception ? Restart
}

但是,我们可以加上自己的一些特殊处理方式,例如

override val supervisorStrategy =
OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) {
case _: ArithmeticException => Resume
case _: MyException => Restart
case t =>
super.supervisorStrategy.decider.applyOrElse(t, (_: Any) => Escalate)
}

三、BackoffSupervisor

但是,如果我们的actor是通过system.actorOf进行启动的,我们就很难通过上述这个方式去自定义自己的异常处理方式。并且,如果我们想进行个细粒度的控制,例如在actor发生异常以后多久去处理它。这种情况我们就可以使用BackoffSupervisor去实现。

我们可以这样理解BackoffSupervisor。实际上BackoffSupervisor与定义了supervisorStrategy的Actor有所不同。我们应该把BackoffSupervisor看作是一个一体化的Actor。当然,它的实现方式还是由一对父子Actor组成。监管策略(SupervisorStrategy)是在BackoffSupervisor的内部实现的。从外表上BackoffSupervisor就像是一个Actor,运算逻辑是在子级Actor中定义的,所谓的父级Actor除监管之外没有任何其它功能,我们甚至没有地方定义父级Actor的功能,它的唯一功能是转发收到的信息给子级,是嵌入BackoffSupervisor里的。所以我们虽然发送消息给BackoffSupervisor,但实际上是在与它的子级交流。下面我们可以通过一个例子来看看怎么使用:

object InnerChild {
case class TestMessage(msg: String)
class ChildException extends Exception

def props = Props[InnerChild]
}
class InnerChild extends Actor with ActorLogging {
import InnerChild._
override def receive: Receive = {
case TestMessage(msg) => //模拟子级功能
log.info(s"Child received message: ${msg}")
}
}
object Supervisor {
def props: Props = { //在这里定义了监管策略和child Actor构建
def decider: PartialFunction[Throwable, SupervisorStrategy.Directive] = {
case _: InnerChild.ChildException => SupervisorStrategy.Restart
}

val options = Backoff.onFailure(InnerChild.props, "innerChild", 1 second, 5 seconds, 0.0)
.withManualReset
.withSupervisorStrategy(
OneForOneStrategy(maxNrOfRetries = 5, withinTimeRange = 5 seconds)(
decider.orElse(SupervisorStrategy.defaultDecider)
)
)
BackoffSupervisor.props(options)
}
}
//注意:下面是Supervisor的父级,不是InnerChild的父级
object ParentalActor {
case class SendToSupervisor(msg: InnerChild.TestMessage)
case class SendToInnerChild(msg: InnerChild.TestMessage)
case class SendToChildSelection(msg: InnerChild.TestMessage)
def props = Props[ParentalActor]
}
class ParentalActor extends Actor with ActorLogging {
import ParentalActor._
//在这里构建子级Actor supervisor
val supervisor = context.actorOf(Supervisor.props,"supervisor")
supervisor ! BackoffSupervisor.getCurrentChild //要求supervisor返回当前子级Actor
var innerChild: Option[ActorRef] = None //返回的当前子级ActorRef
val selectedChild = context.actorSelection("/user/parent/supervisor/innerChild")
override def receive: Receive = {
case BackoffSupervisor.CurrentChild(ref) => //收到子级Actor信息
innerChild = ref
case SendToSupervisor(msg) => supervisor ! msg
case SendToChildSelection(msg) => selectedChild ! msg
case SendToInnerChild(msg) => innerChild foreach(child => child ! msg)
}

}
object BackoffSupervisorDemo extends App {
import ParentalActor._
val testSystem = ActorSystem("testSystem")
val parent = testSystem.actorOf(ParentalActor.props,"parent")
Thread.sleep(1000) //wait for BackoffSupervisor.CurrentChild(ref) received
parent ! SendToSupervisor(TestMessage("Hello message 1 to supervisor"))
parent ! SendToInnerChild(TestMessage("Hello message 2 to innerChild"))
parent ! SendToChildSelection(TestMessage("Hello message 3 to selectedChild"))
scala.io.StdIn.readLine()
testSystem.terminate()
}

运行结果如下:

[INFO] [10/13/2018 16:11:48.167] [testSystem-akka.actor.default-dispatcher-2] [akka://testSystem/user/parent/supervisor/innerChild] Child received message: Hello message 1 to supervisor
[INFO] [10/13/2018 16:11:48.177] [testSystem-akka.actor.default-dispatcher-2] [akka://testSystem/user/parent/supervisor/innerChild] Child received message: Hello message 2 to innerChild
[INFO] [10/13/2018 16:11:48.179] [testSystem-akka.actor.default-dispatcher-3] [akka://testSystem/user/parent/supervisor/innerChild] Child received message: Hello message 3 to selectedChild

从结果可以看出,虽然在上面的例子里我们分别向supervisor,innerChild,selectedChild发送消息。但最终所有消息都是由InnerChild响应的。





































































































以上是关于Akka之BackoffSupervisor的主要内容,如果未能解决你的问题,请参考以下文章

Scala 学习 并发编程模型Akka

从主管重新启动后向参与者发送消息

akka-stream之异常处理

大数据技术之_19_Spark学习_06_Spark 源码解析小结

Akka Stream之Graph

[scala] akka actor编程