事件溯源其他模式
Posted zhuxudong
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了事件溯源其他模式相关的知识,希望对你有一定的参考价值。
事件溯源
@Slf4j
public class EventSourcing {
/**
* 事件溯源模式:
* Instead of storing just the current state of the data in a domain,
* use an append-only store to record the full series of actions taken on that data.
* The store acts as the system of record and can be used to materialize the domain objects.
* This can simplify tasks in complex domains, by avoiding the need to synchronize the data model
* and the business domain, while improving performance, scalability, and responsiveness.
* It can also provide consistency for transactional data, and maintain full audit trails
* and history that can enable compensating actions
* 除了只存储领域模型的当前状态外,使用 append-only 存储来记录对该数据的所有修改操作。
* 该存储充当记录系统,可用于实现域对象的具体化。
* 这可以通过避免同步数据模型和业务领域模型,来简化复杂领域模型中的任务,同时提高性能、可伸缩性和响应能力。
* 它还可以为事务数据提供一致性,并保持完整的审计跟踪和修改历史来执行补偿操作。
*/
/**
* The constant ACCOUNT OF DAENERYS.
*/
public static final int ACCOUNT_OF_DAENERYS = 1;
/**
* The constant ACCOUNT OF JON.
*/
public static final int ACCOUNT_OF_JON = 2;
@Test
public void all() {
DomainEventProcessor eventProcessor = new DomainEventProcessor();
log.info("Running the system first time............");
eventProcessor.reset();
log.info("Creating th accounts............");
eventProcessor
.process(new AccountCreateEvent(0, new Date().getTime(), ACCOUNT_OF_DAENERYS, "Daenerys Targaryen"));
eventProcessor.process(new AccountCreateEvent(1, new Date().getTime(), ACCOUNT_OF_JON, "Jon Snow"));
log.info("Do some money operations............");
eventProcessor
.process(new MoneyDepositEvent(2, new Date().getTime(), ACCOUNT_OF_DAENERYS, new BigDecimal("100000")));
eventProcessor.process(new MoneyDepositEvent(3, new Date().getTime(), ACCOUNT_OF_JON, new BigDecimal("100")));
eventProcessor.process(new MoneyTransferEvent(4, new Date().getTime(), new BigDecimal("10000"),
ACCOUNT_OF_DAENERYS, ACCOUNT_OF_JON));
log.info("...............State:............");
log.info(AccountAggregate.getAccount(ACCOUNT_OF_DAENERYS).toString());
log.info(AccountAggregate.getAccount(ACCOUNT_OF_JON).toString());
log.info("At that point system had a shot down, state in memory is cleared............");
AccountAggregate.resetState();
log.info("Recover the system by the events in journal file............");
eventProcessor = new DomainEventProcessor();
eventProcessor.recover();
log.info("...............Recovered State:............");
log.info(AccountAggregate.getAccount(ACCOUNT_OF_DAENERYS).toString());
log.info(AccountAggregate.getAccount(ACCOUNT_OF_JON).toString());
}
}
/**
* 1)需要持久化的事件抽象
*/
@Data
@RequiredArgsConstructor
abstract class DomainEvent implements Serializable {
private static final long serialVersionUID = 5922894715338132042L;
/**
* 唯一序列号
*/
private final long sequenceId;
/**
* 创建时间
*/
private final long createdTime;
/**
* 实际的事件类型
*/
private final String eventClassName;
private boolean realTime = true;
public abstract void process();
}
class AccountAggregate {
private static Map<Integer, Account> accounts = new ConcurrentHashMap<>();
private AccountAggregate() {
}
public static void putAccount(Account account) {
accounts.put(account.getAccountNo(), account);
}
public static Account getAccount(int accountNo) {
final Account account = accounts.get(accountNo);
if (account == null) {
return null;
}
return account.copy();
}
public static void resetState() {
accounts = new ConcurrentHashMap<>();
}
}
/**
* 2)需要持久化的具体事件
*/
@Data
class AccountCreateEvent extends DomainEvent {
private static final long serialVersionUID = -493304186114851718L;
private final int accountNo;
private final String owner;
public AccountCreateEvent(long sequenceId, long createdTime, int accountNo, String owner) {
super(sequenceId, createdTime, "AccountCreateEvent");
this.accountNo = accountNo;
this.owner = owner;
}
@Override
public void process() {
Account account = AccountAggregate.getAccount(accountNo);
if (account != null) {
throw new RuntimeException("Account already exists");
}
account = new Account(accountNo, owner);
account.handleEvent(this);
}
}
@Data
class MoneyDepositEvent extends DomainEvent {
private final BigDecimal money;
private final int accountNo;
public MoneyDepositEvent(long sequenceId, long createdTime, int accountNo, BigDecimal money) {
super(sequenceId, createdTime, "MoneyDepositEvent");
this.money = money;
this.accountNo = accountNo;
}
@Override
public void process() {
final Account account = AccountAggregate.getAccount(accountNo);
if (account == null) {
throw new RuntimeException("Account not found");
}
account.handleEvent(this);
}
}
@Data
class MoneyTransferEvent extends DomainEvent {
private static final long serialVersionUID = -5846383677434713494L;
private final BigDecimal money;
private final int accountNoFrom;
private final int accountNoTo;
public MoneyTransferEvent(long sequenceId, long createdTime, BigDecimal money, int accountNoFrom, int accountNoTo) {
super(sequenceId, createdTime, "MoneyTransferEvent");
this.money = money;
this.accountNoFrom = accountNoFrom;
this.accountNoTo = accountNoTo;
}
@Override
public void process() {
final Account accountFrom = AccountAggregate.getAccount(accountNoFrom);
if (accountFrom == null) {
throw new RuntimeException("Account not found " + accountNoFrom);
}
final Account accountTo = AccountAggregate.getAccount(accountNoTo);
if (accountTo == null) {
throw new RuntimeException("Account not found" + accountTo);
}
accountFrom.handleTransferFromEvent(this);
accountTo.handleTransferToEvent(this);
}
}
@Data
@Slf4j
class Account {
private final int accountNo;
private final String owner;
private BigDecimal money;
public Account(int accountNo, String owner) {
this.accountNo = accountNo;
this.owner = owner;
money = BigDecimal.ZERO;
}
public Account copy() {
final Account account = new Account(accountNo, owner);
account.setMoney(money);
return account;
}
private void depositMoney(BigDecimal money) {
this.money = this.money.add(money);
}
private void withdrawMoney(BigDecimal money) {
this.money = this.money.subtract(money);
}
private void handleDeposit(BigDecimal money, boolean realTime) {
depositMoney(money);
AccountAggregate.putAccount(this);
if (realTime) {
log.info("Some external api for only realtime execution could be called here.");
}
}
private void handleWithdrawal(BigDecimal money, boolean realTime) {
if (this.money.compareTo(money) == -1) {
throw new RuntimeException("Insufficient Account Balance");
}
withdrawMoney(money);
AccountAggregate.putAccount(this);
if (realTime) {
log.info("Some external api for only realtime execution could be called here.");
}
}
public void handleEvent(AccountCreateEvent accountCreateEvent) {
AccountAggregate.putAccount(this);
if (accountCreateEvent.isRealTime()) {
log.info("Some external api for only realtime execution could be called here.");
}
}
public void handleEvent(MoneyDepositEvent moneyDepositEvent) {
handleDeposit(moneyDepositEvent.getMoney(), moneyDepositEvent.isRealTime());
}
public void handleTransferFromEvent(MoneyTransferEvent moneyTransferEvent) {
handleWithdrawal(moneyTransferEvent.getMoney(), moneyTransferEvent.isRealTime());
}
public void handleTransferToEvent(MoneyTransferEvent moneyTransferEvent) {
handleDeposit(moneyTransferEvent.getMoney(), moneyTransferEvent.isRealTime());
}
}
/**
* 3)事件回溯
*/
class JsonFileJournal {
private final File aFile;
private List<String> events;
private int index = 0;
public JsonFileJournal() {
aFile = new File("Journal.json");
try {
events = Files.lines(aFile.toPath()).collect(Collectors.toList());
} catch (final IOException e) {
events = Lists.newArrayList();
}
}
public void write(DomainEvent domainEvent) {
final Gson gson = new Gson();
JsonElement jsonElement;
if (domainEvent instanceof AccountCreateEvent) {
jsonElement = gson.toJsonTree(domainEvent, AccountCreateEvent.class);
} else if (domainEvent instanceof MoneyDepositEvent) {
jsonElement = gson.toJsonTree(domainEvent, MoneyDepositEvent.class);
} else if (domainEvent instanceof MoneyTransferEvent) {
jsonElement = gson.toJsonTree(domainEvent, MoneyTransferEvent.class);
} else {
throw new RuntimeException("Journal Event not recegnized");
}
try (Writer output = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(aFile, true), "UTF-8"))) {
final String eventString = jsonElement.toString();
output.write(eventString + "
");
} catch (final IOException e) {
throw new RuntimeException(e);
}
}
public void reset() {
aFile.delete();
}
public DomainEvent readNext() {
if (index >= events.size()) {
return null;
}
final String event = events.get(index);
index++;
final JsonParser parser = new JsonParser();
final JsonElement jsonElement = parser.parse(event);
final String eventClassName = jsonElement.getAsJsonObject().get("eventClassName").getAsString();
final Gson gson = new Gson();
DomainEvent domainEvent;
if (eventClassName.equals("AccountCreateEvent")) {
domainEvent = gson.fromJson(jsonElement, AccountCreateEvent.class);
} else if (eventClassName.equals("MoneyDepositEvent")) {
domainEvent = gson.fromJson(jsonElement, MoneyDepositEvent.class);
} else if (eventClassName.equals("MoneyTransferEvent")) {
domainEvent = gson.fromJson(jsonElement, MoneyTransferEvent.class);
} else {
throw new RuntimeException("Journal Event not recegnized");
}
domainEvent.setRealTime(false);
return domainEvent;
}
}
/**
* 4)事件持久化和回溯的处理器
*/
class DomainEventProcessor {
private final JsonFileJournal processorJournal = new JsonFileJournal();
public void process(DomainEvent domainEvent) {
domainEvent.process();
processorJournal.write(domainEvent);
}
public void reset() {
processorJournal.reset();
}
public void recover() {
DomainEvent domainEvent;
while (true) {
domainEvent = processorJournal.readNext();
if (domainEvent == null) {
break;
} else {
domainEvent.process();
}
}
}
}
以上是关于事件溯源其他模式的主要内容,如果未能解决你的问题,请参考以下文章