事件溯源其他模式

Posted zhuxudong

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了事件溯源其他模式相关的知识,希望对你有一定的参考价值。

事件溯源

@Slf4j
public class EventSourcing {
    /**
     * 事件溯源模式:
     * Instead of storing just the current state of the data in a domain,
     * use an append-only store to record the full series of actions taken on that data.
     * The store acts as the system of record and can be used to materialize the domain objects.
     * This can simplify tasks in complex domains, by avoiding the need to synchronize the data model
     * and the business domain, while improving performance, scalability, and responsiveness.
     * It can also provide consistency for transactional data, and maintain full audit trails
     * and history that can enable compensating actions
     * 除了只存储领域模型的当前状态外,使用 append-only 存储来记录对该数据的所有修改操作。
     * 该存储充当记录系统,可用于实现域对象的具体化。
     * 这可以通过避免同步数据模型和业务领域模型,来简化复杂领域模型中的任务,同时提高性能、可伸缩性和响应能力。
     * 它还可以为事务数据提供一致性,并保持完整的审计跟踪和修改历史来执行补偿操作。
     */
    /**
     * The constant ACCOUNT OF DAENERYS.
     */
    public static final int ACCOUNT_OF_DAENERYS = 1;
    /**
     * The constant ACCOUNT OF JON.
     */
    public static final int ACCOUNT_OF_JON = 2;

    @Test
    public void all() {
        DomainEventProcessor eventProcessor = new DomainEventProcessor();

        log.info("Running the system first time............");
        eventProcessor.reset();

        log.info("Creating th accounts............");
        eventProcessor
        .process(new AccountCreateEvent(0, new Date().getTime(), ACCOUNT_OF_DAENERYS, "Daenerys Targaryen"));

        eventProcessor.process(new AccountCreateEvent(1, new Date().getTime(), ACCOUNT_OF_JON, "Jon Snow"));

        log.info("Do some money operations............");

        eventProcessor
        .process(new MoneyDepositEvent(2, new Date().getTime(), ACCOUNT_OF_DAENERYS, new BigDecimal("100000")));

        eventProcessor.process(new MoneyDepositEvent(3, new Date().getTime(), ACCOUNT_OF_JON, new BigDecimal("100")));

        eventProcessor.process(new MoneyTransferEvent(4, new Date().getTime(), new BigDecimal("10000"),
                ACCOUNT_OF_DAENERYS, ACCOUNT_OF_JON));

        log.info("...............State:............");
        log.info(AccountAggregate.getAccount(ACCOUNT_OF_DAENERYS).toString());
        log.info(AccountAggregate.getAccount(ACCOUNT_OF_JON).toString());

        log.info("At that point system had a shot down, state in memory is cleared............");
        AccountAggregate.resetState();

        log.info("Recover the system by the events in journal file............");

        eventProcessor = new DomainEventProcessor();
        eventProcessor.recover();

        log.info("...............Recovered State:............");
        log.info(AccountAggregate.getAccount(ACCOUNT_OF_DAENERYS).toString());
        log.info(AccountAggregate.getAccount(ACCOUNT_OF_JON).toString());
    }
}

/**
 * 1)需要持久化的事件抽象
 */
@Data
@RequiredArgsConstructor
abstract class DomainEvent implements Serializable {
    private static final long serialVersionUID = 5922894715338132042L;
    /**
     * 唯一序列号
     */
    private final long sequenceId;
    /**
     * 创建时间
     */
    private final long createdTime;
    /**
     * 实际的事件类型
     */
    private final String eventClassName;
    private boolean realTime = true;

    public abstract void process();
}

class AccountAggregate {
    private static Map<Integer, Account> accounts = new ConcurrentHashMap<>();

    private AccountAggregate() {
    }

    public static void putAccount(Account account) {
        accounts.put(account.getAccountNo(), account);
    }

    public static Account getAccount(int accountNo) {
        final Account account = accounts.get(accountNo);
        if (account == null) {
            return null;
        }
        return account.copy();
    }

    public static void resetState() {
        accounts = new ConcurrentHashMap<>();
    }
}

/**
 * 2)需要持久化的具体事件
 */
@Data
class AccountCreateEvent extends DomainEvent {
    private static final long serialVersionUID = -493304186114851718L;
    private final int accountNo;
    private final String owner;

    public AccountCreateEvent(long sequenceId, long createdTime, int accountNo, String owner) {
        super(sequenceId, createdTime, "AccountCreateEvent");
        this.accountNo = accountNo;
        this.owner = owner;
    }

    @Override
    public void process() {
        Account account = AccountAggregate.getAccount(accountNo);
        if (account != null) {
            throw new RuntimeException("Account already exists");
        }
        account = new Account(accountNo, owner);
        account.handleEvent(this);
    }
}

@Data
class MoneyDepositEvent extends DomainEvent {
    private final BigDecimal money;
    private final int accountNo;

    public MoneyDepositEvent(long sequenceId, long createdTime, int accountNo, BigDecimal money) {
        super(sequenceId, createdTime, "MoneyDepositEvent");
        this.money = money;
        this.accountNo = accountNo;
    }

    @Override
    public void process() {
        final Account account = AccountAggregate.getAccount(accountNo);
        if (account == null) {
            throw new RuntimeException("Account not found");
        }
        account.handleEvent(this);
    }
}

@Data
class MoneyTransferEvent extends DomainEvent {
    private static final long serialVersionUID = -5846383677434713494L;
    private final BigDecimal money;
    private final int accountNoFrom;
    private final int accountNoTo;

    public MoneyTransferEvent(long sequenceId, long createdTime, BigDecimal money, int accountNoFrom, int accountNoTo) {
        super(sequenceId, createdTime, "MoneyTransferEvent");
        this.money = money;
        this.accountNoFrom = accountNoFrom;
        this.accountNoTo = accountNoTo;
    }

    @Override
    public void process() {
        final Account accountFrom = AccountAggregate.getAccount(accountNoFrom);
        if (accountFrom == null) {
            throw new RuntimeException("Account not found " + accountNoFrom);
        }
        final Account accountTo = AccountAggregate.getAccount(accountNoTo);
        if (accountTo == null) {
            throw new RuntimeException("Account not found" + accountTo);
        }

        accountFrom.handleTransferFromEvent(this);
        accountTo.handleTransferToEvent(this);
    }
}

@Data
@Slf4j
class Account {
    private final int accountNo;
    private final String owner;
    private BigDecimal money;

    public Account(int accountNo, String owner) {
        this.accountNo = accountNo;
        this.owner = owner;
        money = BigDecimal.ZERO;
    }

    public Account copy() {
        final Account account = new Account(accountNo, owner);
        account.setMoney(money);
        return account;
    }

    private void depositMoney(BigDecimal money) {
        this.money = this.money.add(money);
    }

    private void withdrawMoney(BigDecimal money) {
        this.money = this.money.subtract(money);
    }

    private void handleDeposit(BigDecimal money, boolean realTime) {
        depositMoney(money);
        AccountAggregate.putAccount(this);
        if (realTime) {
            log.info("Some external api for only realtime execution could be called here.");
        }
    }

    private void handleWithdrawal(BigDecimal money, boolean realTime) {
        if (this.money.compareTo(money) == -1) {
            throw new RuntimeException("Insufficient Account Balance");
        }

        withdrawMoney(money);
        AccountAggregate.putAccount(this);
        if (realTime) {
            log.info("Some external api for only realtime execution could be called here.");
        }
    }

    public void handleEvent(AccountCreateEvent accountCreateEvent) {
        AccountAggregate.putAccount(this);
        if (accountCreateEvent.isRealTime()) {
            log.info("Some external api for only realtime execution could be called here.");
        }
    }

    public void handleEvent(MoneyDepositEvent moneyDepositEvent) {
        handleDeposit(moneyDepositEvent.getMoney(), moneyDepositEvent.isRealTime());
    }

    public void handleTransferFromEvent(MoneyTransferEvent moneyTransferEvent) {
        handleWithdrawal(moneyTransferEvent.getMoney(), moneyTransferEvent.isRealTime());
    }

    public void handleTransferToEvent(MoneyTransferEvent moneyTransferEvent) {
        handleDeposit(moneyTransferEvent.getMoney(), moneyTransferEvent.isRealTime());
    }
}

/**
 * 3)事件回溯
 */
class JsonFileJournal {
    private final File aFile;
    private List<String> events;
    private int index = 0;

    public JsonFileJournal() {
        aFile = new File("Journal.json");
        try {
            events = Files.lines(aFile.toPath()).collect(Collectors.toList());
        } catch (final IOException e) {
            events = Lists.newArrayList();
        }
    }

    public void write(DomainEvent domainEvent) {
        final Gson gson = new Gson();
        JsonElement jsonElement;
        if (domainEvent instanceof AccountCreateEvent) {
            jsonElement = gson.toJsonTree(domainEvent, AccountCreateEvent.class);
        } else if (domainEvent instanceof MoneyDepositEvent) {
            jsonElement = gson.toJsonTree(domainEvent, MoneyDepositEvent.class);
        } else if (domainEvent instanceof MoneyTransferEvent) {
            jsonElement = gson.toJsonTree(domainEvent, MoneyTransferEvent.class);
        } else {
            throw new RuntimeException("Journal Event not recegnized");
        }

        try (Writer output = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(aFile, true), "UTF-8"))) {
            final String eventString = jsonElement.toString();
            output.write(eventString + "
");
        } catch (final IOException e) {
            throw new RuntimeException(e);
        }
    }

    public void reset() {
        aFile.delete();
    }

    public DomainEvent readNext() {
        if (index >= events.size()) {
            return null;
        }
        final String event = events.get(index);
        index++;

        final JsonParser parser = new JsonParser();
        final JsonElement jsonElement = parser.parse(event);
        final String eventClassName = jsonElement.getAsJsonObject().get("eventClassName").getAsString();
        final Gson gson = new Gson();
        DomainEvent domainEvent;
        if (eventClassName.equals("AccountCreateEvent")) {
            domainEvent = gson.fromJson(jsonElement, AccountCreateEvent.class);
        } else if (eventClassName.equals("MoneyDepositEvent")) {
            domainEvent = gson.fromJson(jsonElement, MoneyDepositEvent.class);
        } else if (eventClassName.equals("MoneyTransferEvent")) {
            domainEvent = gson.fromJson(jsonElement, MoneyTransferEvent.class);
        } else {
            throw new RuntimeException("Journal Event not recegnized");
        }

        domainEvent.setRealTime(false);
        return domainEvent;
    }
}

/**
 * 4)事件持久化和回溯的处理器
 */
class DomainEventProcessor {
    private final JsonFileJournal processorJournal = new JsonFileJournal();

    public void process(DomainEvent domainEvent) {
        domainEvent.process();
        processorJournal.write(domainEvent);
    }

    public void reset() {
        processorJournal.reset();
    }

    public void recover() {
        DomainEvent domainEvent;
        while (true) {
            domainEvent = processorJournal.readNext();
            if (domainEvent == null) {
                break;
            } else {
                domainEvent.process();
            }
        }
    }
}

以上是关于事件溯源其他模式的主要内容,如果未能解决你的问题,请参考以下文章

基于Kafka构建事件溯源模式的微服务

基于Kafka构建事件溯源模式的微服务

事件溯源是基于编排的 SAGA 模式的增强模式吗?

事件溯源和 CQRS,我错过了啥?

通过Java学习与演示CQRS与事件溯源模式

通过Java学习与演示CQRS与事件溯源模式