使用Akka持久化——持久化与快照
Posted 泰山不老生
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了使用Akka持久化——持久化与快照相关的知识,希望对你有一定的参考价值。
前言
对于java web而言,一个用户的HTTP请求最终会转换为一条java线程处理。HTTP本身是无状态的,具体的请求逻辑一般也是无状态的。如果进程奔溃或者系统宕机,用户会发觉当前网页不可用之类的错误。虽然会影响一些用户体验,但是只要服务重启了,用户依然可以完成他的请求并满足其需要。但是有些情况下则势必会造成混乱甚至恐慌,例如跨行转账。用户从自己A银行的账户转账1万元至自己在B银行的账户,如果转出的动作成功了,但是转入却失败了,用户的心情是可想而知的,自己的财产不翼而飞了!一种解决的方式是引入事务,在此场景下还必须是分布式事务。如果只是银行内部实现分布式事务多少还是可行的,但是不同银行之间要实现的成本是可想而知的,甚至不可行的。如果A银行转出时对用户的状态作持久化,B银行对收到的转入请求也进行持久化,那么恢复用户的损失才有可能。
以上啰里啰嗦说了这么多,无非就是抛出个引子,进而介绍Akka提供的持久化功能。
Akka的持久化架构
- UntypedPersistentActor
- UntypedPersistentView
- UntypedPersistentActorAtLeastOnceDelivery
- AsyncWriteJournal
- Snapshot store
本文基于Akka官网提供的持久化例子,并对其进行一些适应性改造,将着重介绍UntypedPersistentActor、AsyncWriteJournal及Snapshot store的应用。
配置
有关Akka的日志持久化和快照持久化的配置如下:
persistence {
journal {
plugin = "akka.persistence.journal.leveldb"
leveldb.dir = "target/example/journal"
leveldb.native = false
}
snapshot-store {
plugin = "akka.persistence.snapshot-store.local"
local.dir = "target/example/snapshots"
}
}
根据配置,我们知道日志插件使用了leveldb,leveldb的存储目录为当前项目编译路径下的example/journal路径下。快照插件使用了local,存储路径与前者相同。
持久化Actor的例子
消息与状态
本例子中需要用到Cmd和Evt两种消息,Cmd代表命令,Evt代表事件。ExampleState代表我们例子中的状态。以上三个类的定义如下:
public interface Persistence {
public static class Cmd implements Serializable {
private static final long serialVersionUID = 1L;
private final String data;
public Cmd(String data) {
this.data = data;
}
public String getData() {
return data;
}
}
public static class Evt implements Serializable {
private static final long serialVersionUID = 1L;
private final String data;
public Evt(String data) {
this.data = data;
}
public String getData() {
return data;
}
}
public static class ExampleState implements Serializable {
private static final long serialVersionUID = 1L;
private final ArrayList<String> events;
public ExampleState() {
this(new ArrayList<String>());
}
public ExampleState(ArrayList<String> events) {
this.events = events;
}
public ExampleState copy() {
return new ExampleState(new ArrayList<String>(events));
}
public void update(Evt evt) {
events.add(evt.getData());
}
public int size() {
return events.size();
}
@Override
public String toString() {
return events.toString();
}
}
}
上面代码展示的Cmd和Evt都很简单,它们有一样的data字段作为内容。ExampleState中维护了一个列表,次列表用于缓存所有的事件内容。
持久化Actor的实现
在具体介绍本例中持久化Actor之前,先看看其实现,其代码清单如下:
@Named("ExamplePersistentActor")
@Scope("prototype")
public class ExamplePersistentActor extends UntypedPersistentActor {
LoggingAdapter log = Logging.getLogger(getContext().system(), this);
@Override
public String persistenceId() {
return "sample-id-1";
}
private ExampleState state = new ExampleState();
public int getNumEvents() {
return state.size();
}
@Override
public void onReceiveRecover(Object msg) {
if (msg instanceof Evt) {
state.update((Evt) msg);
} else if (msg instanceof SnapshotOffer) {
state = (ExampleState) ((SnapshotOffer) msg).snapshot();
} else {
unhandled(msg);
}
}
@Override
public void onReceiveCommand(Object msg) {
if (msg instanceof Cmd) {
final String data = ((Cmd) msg).getData();
final Evt evt1 = new Evt(data + "-" + getNumEvents());
final Evt evt2 = new Evt(data + "-" + (getNumEvents() + 1));
persistAll(Arrays.asList(evt1, evt2), new Procedure<Evt>() {
public void apply(Evt evt) throws Exception {
state.update(evt);
if (evt.equals(evt2)) {
getContext().system().eventStream().publish(evt);
}
}
});
} else if (msg.equals("snap")) {
// IMPORTANT: create a copy of snapshot
// because ExampleState is mutable !!!
saveSnapshot(state.copy());
} else if (msg.equals("print")) {
log.info(state.toString());
} else {
unhandled(msg);
}
}
}
ExamplePersistentActor继承了UntypedPersistentActor,并覆盖实现了三个方法:
- persistenceId:持久化Actor必须有一个标识符,此标识符必须通过persistenceId方法定义;
- onReceiveRecover:此方法将在恢复期间被调用,并交由用户处理那些持久化的消息或者快照;
- onReceiveCommand:此方法用于处理正常的消息;
以上是关于使用Akka持久化——持久化与快照的主要内容,如果未能解决你的问题,请参考以下文章
SpringCloud系列十一:SpringCloudStream(SpringCloudStream 简介创建消息生产者创建消息消费者自定义消息通道分组与持久化设置 RoutingKey)(代码片段
图解Redis,谈谈Redis的持久化,RDB快照与AOF日志