Akka框架使用注意点

Posted 明也无涯

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Akka框架使用注意点相关的知识,希望对你有一定的参考价值。

1.mailbox

Akka的每个actor默认有一个mailbox,按照FIFO顺序单线程处理。在抛出异常导致父actor根据设置的监管策略执行重启或恢复操作时,会从触发异常的消息的后续消息开始处理,邮箱并不会被清空。如果你想重新处理那个触发异常的消息,可以通过重写preRestart方法来访问该消息,java 中的preRestart参数为(Throwable reason, Option<Object> message),message.get()可以获得该消息(因为是从Option对象中get,所以可能为空),可以将该消息再次发给自己或做其它处理。

默认邮箱的大小没有限制,也就是内存的上限。可以设置bounded邮箱来限定大小,还可以设置邮箱以文件形式持久存储。

2.监管策略设置

  1)在actor类中重写supervisorStrategy()

  2)创建父actor时在Props参数中使用FromConfig.getInstance().withSupervisorStrategy(strategy).props(XXX)

可以使用下面的类来方便设置:

 

技术分享
import akka.actor.AllForOneStrategy;
import akka.actor.OneForOneStrategy;
import akka.actor.SupervisorStrategy;
import akka.japi.Function;
import scala.concurrent.duration.Duration;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import static akka.actor.SupervisorStrategy.escalate;

/**
 * Created by fyk on 16-4-2.
 */
public class StrategySetter {
    private Map<Class<? extends Throwable>, SupervisorStrategy.Directive> map;
    private boolean oneForOne;
    private int maxNrOfRetries=5;
    private Duration withinTimeRange=Duration.create(1, TimeUnit.MINUTES);//Duration.create("1 minute")
    public StrategySetter(boolean oneForOne) {
        this.oneForOne=oneForOne;
        map=new HashMap<Class<? extends Throwable>, SupervisorStrategy.Directive>();
    }
    public void setOptParam(int maxNrOfRetries,Duration withinTimeRange){
        this.maxNrOfRetries=maxNrOfRetries;
        this.withinTimeRange=withinTimeRange;
    }
    public void put(Class<? extends Throwable> t, SupervisorStrategy.Directive action){
        map.put(t,action);
    }
    /**
     * 设定监管策略并返回
     * cls.isInstance(yourObject)
     * instead of using the instanceof operator, which can only be used if you know the class at compile time.
     */
    public SupervisorStrategy getSupervisorStrategy(){
        SupervisorStrategy strategy=null;
        if(oneForOne){
            strategy=new OneForOneStrategy(maxNrOfRetries, withinTimeRange,
                    new Function<Throwable, SupervisorStrategy.Directive>() {
                        @Override
                        public SupervisorStrategy.Directive apply(Throwable t) {
                            for(Class c:map.keySet()){
                                if(c.isInstance(t)) return map.get(c);
                            }
                            return escalate();//提交给上一级监管
                        }
                    });
        }else{
            strategy=new AllForOneStrategy(maxNrOfRetries, withinTimeRange,
                    new Function<Throwable, SupervisorStrategy.Directive>() {
                        @Override
                        public SupervisorStrategy.Directive apply(Throwable t) {
                            for(Class c:map.keySet()){
                                if(c.isInstance(t)) return map.get(c);
                            }
                            return escalate();//提交给上一级监管
                        }
                    });
        }

        return strategy;
    }
}
View Code

 

3.continue...

 

以上是关于Akka框架使用注意点的主要内容,如果未能解决你的问题,请参考以下文章

Akka中使用Logback日志框架

Akka 框架支持查找重复消息

使用 Akka 和 Websockets 玩框架

使用akka框架和scala语言编写简单的RPC通信案例

Scala框架Akka学习

利用Akka并行执行SparkSQL任务