Axon Framework - saga 基础设施

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Axon Framework - saga 基础设施相关的知识,希望对你有一定的参考价值。

参考技术A

事件需要重定向到适当的 saga 实例。 为此,需要一些基础设施类。 最重要的组件是 SagaManager 和 SagaRepository 。

与处理事件的任何组件一样,处理是由事件处理器完成的。 但是,sagas 不是处理事件的单例实例。 他们有各自的生命周期,需要加以管理。

Axon 通过 AnnotatedSagaManager 支持生命周期管理,它被提供给事件处理器以执行处理程序的实际调用。 它使用要管理的 saga 的类型以及可以存储和检索该类型的 saga 的 SagaRepository 进行初始化。 单个 AnnotatedSagaManager 只能管理单个 saga 类型。

使用配置 API 时,Axon 将为大多数组件使用合理的默认值。 但是,强烈建议定义一个 SagaStore 实现来使用。 SagaStore 是在某处“物理”存储 saga 实例的机制。 AnnotatedSagaRepository (默认)使用 SagaStore 来存储和检索需要的 Saga 实例。

Axon Configuration API

Spring Boot AutoConfiguration

SagaRepository 负责存储和检索 saga,供 SagaManager 使用。 它能够通过它们的标识符以及它们的关联值来检索特定的 saga 实例。

但是,有一些特殊要求。 由于 saga 中的并发处理是一个非常微妙的过程,repository 必须确保对于每个概念 saga 实例(具有相同的标识符)在 JVM 中仅存在一个实例。

Axon 提供 AnnotatedSagaRepository 实现,它允许查找 saga 实例,同时保证只能同时访问 saga 的单个实例。 它使用 SagaStore 来执行 saga 实例的实际持久化。

使用的实现选择主要取决于应用程序使用的存储引擎。 Axon 提供了 JdbcSagaStore 、 InMemorySagaStore 、 JpaSagaStore 和 MongoSagaStore 。

在某些情况下,应用程序受益于缓存 saga 实例。 在这种情况下,有一个 CachingSagaStore 包装了另一个实现以添加缓存行为。 请注意, CachingSagaStore 是直写式缓存,这意味着保存操作始终会立即转发到后备存储,以确保数据安全。

JpaSagaStore 使用 JPA 来存储 sagas 的状态和关联值。 Sagas 本身不需要任何 JPA 注解; Axon 将使用 Serializer 对 sagas 进行序列化(类似于事件序列化,您可以在 XStreamSerializer 或 JacksonSerializer 之间进行选择,可以通过在应用程序中配置默认的 Serializer 来设置。有关更多详细信息,请参阅序列化程序 .

JpaSagaStore 配置有 EntityManagerProvider ,它提供对要使用的 EntityManager 实例的访问。 这种抽象允许使用应用程序管理和容器管理的 EntityManager 。 或者,您可以定义序列化程序来序列化 Saga 实例。 Axon 默认为 XStreamSerializer 。

JdbcSagaStore 使用纯 JDBC 来存储阶段实例及其关联值。 与 JpaSagaStore 类似,saga 实例不需要知道它们是如何存储的。 存储使用序列化程序序列化 saga 实例。

您应该使用 DataSource 或 ConnectionProvider 配置 JdbcSagaStore 。 虽然不是必需的,但在使用 ConnectionProvider 进行初始化时,建议将实现包装在 UnitOfWorkAwareConnectionProviderWrapper 中。 它将检查当前工作单元中是否存在已打开的数据库连接,以确保工作单元内的所有活动都在单个连接上完成。

与 JPA 不同, JdbcSagaRepository 使用纯 SQL 语句来存储和检索信息。 这种方法可能意味着某些操作依赖于特定于数据库的 SQL 方言。 也可能是某些数据库供应商提供了您想使用的非标准功能。 为此,您可以提供自己的 SagaSqlSchema 。 SagaSqlSchema 是一个接口,它定义了存储库需要在底层数据库上执行的所有操作。 它允许您自定义为每个操作执行的 SQL 语句。 默认值为 GenericSagaSqlSchema 。 其他可用的实现是 PostgresSagaSqlSchema 、 Oracle11SagaSqlSchema 和 HsqlSagaSchema 。

Schema 构建

请注意,Axon 不会为您创建开箱即用的数据库 schema 。 例如,在使用 Spring Boot 时也不行。

要构建 schema,应该调用 JdbcSagaStore#createSchema 。 默认情况下,这将使用 GenericSagaSqlSchema 。 您可以通过 JdbcSagaStore.Builder 配置不同的版本来更改 schema。

MongoSagaStore 将 saga 实例及其关联存储在 MongoDB 数据库中。 MongoSagaStore 将所有 saga 存储在 MongoDB 数据库中的单个集合中。 对于每个 saga 实例,都会创建一个文档。

MongoSagaStore 还确保在任何时候,对于单个 JVM 中的任何唯一 Saga,都只存在一个 Saga 实例。 这可确保不会因并发问题而丢失状态更改。

MongoSagaStore 使用 MongoTemplate 和可选的 Serializer 初始化。 MongoTemplate 提供了对存储 sagas 的集合的引用。Axon 提供了 DefaultMongoTemplate ,它接受一个 MongoClient 实例以及数据库名称和存储 sagas 的集合名称。数据库名称 和集合名称可以省略。 在这种情况下,它们分别默认为 “axonframework” 和 “sagas” 。

如果使用数据库支持的 saga 存储,保存和加载 saga 实例可能是一项相对昂贵的操作。 在短时间内多次调用同一个 saga 实例的情况下,缓存可能对应用程序的性能特别有益。

Axon 提供 CachingSagaStore 实现。 它是一个包装了另一个 SagaStore 的 SagaStore ,它负责实际存储。 加载 saga 或关联值时, CachingSagaStore 将首先查询其缓存,然后再委托给包装的 repository。 存储信息时,所有调用总是被委派以确保后备存储始终对 saga 的状态有一致的视图。

要配置缓存,只需将任何 SagaStore 包装在 CachingSagaStore 中。 CachingSagaStore 的构造函数采用三个参数: 1. 要包装的 SagaStore 2. 用于关联值的缓存 3. 用于 saga 实例的缓存

后两个参数可能指的是同一个缓存,也可能指不同的缓存。 这取决于您的特定应用程序的要求。

从无到有-在create-react-app基础上接入react-routerredux-saga

搭建项目框架

新建项目

执行如下代码,用create-react-app来建立项目的基础框架,然后安装需要用到的依赖。

$ npx create-react-app my-test-project
$ cd my-test-project
$ yarn add react-router-dom react-redux prop-types redux redux-saga
$ yarn start

完成后,应用启动在localhost的3000端口。

接入react-router-dom

react-router-dom其实就是react-router 4.0,与之前的3.0有什么区别呢?react-router被一分为三。react-routerreact-router-domreact-router-native

react-router实现了路由的核心的路由组件和函数。而react-router-domreact-router-native则是基于react-router,提供了特定的环境的组件。

react-router-dom依赖react-router,安装的时候,不用再显示的安装react-router, 如果你有机会去看react-router-dom的源码,就会发现里面有些组件都是从react-router中引入的。

新建layout

/src下新建layout目录。为什么要新建layout目录,因为有可能我们会用到多个layout,layout是一个什么样的概念?

例如这个应用需要提供一部分功能在微信使用。那么进入所有微信的相关界面下都要进行鉴权。没有鉴权信息就不允许访问,但是这个服务仍然有所有人都可以访问的路由。使用layout可以很好的帮我们解决这个问题。

将所有的需要鉴权的页面放在例如WechatContainer下,只有在有微信相关鉴权的信息存在,才允许访问接下来的界面,否则,容器内甚至可以直接不渲染接下来的界面。

/src/layout下新建两个文件,分别是AppLayout.jsWechatLayout.js

AppLayout.js的代码如下。在这个layout中,首页就是单纯的一个路由,导向至首页。而接下来的/wechat则是把路由导向至了一个微信端专用的layout。

import React, { Component } from ‘react‘;
import Home from ‘../routes/home‘;
import WechatLayout from ‘./WechatLayout‘;
import { Route, Switch } from ‘react-router-dom‘;

/**
 * 项目入口布局
 * 在此处根据一级路由的不同进入不同的container
 * 每个container有自己不同的作用
 *
 * 在react-router V4中,将原先统一在一处的路由分散到各个模块中,分散到各个模块当中
 * 例如: WechatLayout的路由为/wechat 表示到该layout下的默认路径
 */
class AppLayout extends Component {
  constructor(props) {
    super(props);

    this.state = {};
  }

  render() {
    return (
      <div className=‘App‘>
        <main>
          <Switch>
            <Route path=‘/‘ exact component={Home} />
            <Route path=‘/wechat‘ component={WechatLayout} />
          </Switch>
        </main>
      </div>
    );
  }
}

export default AppLayout;

WechatLayout.js的代码如下。在这个layout中,我们就可以对访问该路由的用户进行鉴权。如果没有权限,我们可以直接限制用户的访问,甚至直接不渲染render中的数据。

例如,我们可以在componentWillMount中或者在render中,根据当前的state数据,对当前用户进行鉴权。如果没有权限,我们就可以将当前页面重定向到没有权限的提示界面。

import React, { Component } from ‘react‘;
import Home from ‘../routes/wechat/home‘;
import { Route, Switch } from ‘react-router-dom‘;
import { connect } from ‘react-redux‘;

class WechatLayout extends Component {
  constructor(props) {
    super(props);

    this.state = {};
  }

  componentWillMount() {
  }

  render() {
    const className = ‘Wechat-Layout‘;

    return (
      <div className={`${className}`}>
        <header>
          Our Manage Layout
        </header>
        <main>
          <Switch>
            <Route path={`${this.props.match.path}/home`} component={Home} />
          </Switch>
        </main>
      </div>
    );
  }
}

const mapStateToProps = state => ({
  reducer: state.wechatLayout
});

export default connect(mapStateToProps)(WechatLayout);

新建routes

新建/src/routes/home/index.js,代码如下。

import React, { Component } from ‘react‘;
import {Link} from ‘react-router-dom‘;

class Home extends Component {
  constructor(props) {
    super(props);

    this.state = {};
  }

  render() {
    const className = ‘Home‘;

    return (
      <div className={`${className}`}>
        <h1>This is Home</h1>
        <div><Link to={‘/wechat/home‘}>Manage Home</Link></div>
      </div>
    );
  }
}

export default Home;

新建/src/routes/wechat/home/index.js, 代码如下。在代码中可以看到,触发reducer很简单,只需要调用dispatch方法即可。dispatch中的payload就是该请求所带的参数,该参数会传到saga中间层,去调用真正的后端请求。并在请求返回成功之后,调用put方法更新state。

import React, { Component } from ‘react‘;
import {connect} from "react-redux";

class Home extends Component {
  constructor(props) {
    super(props);

    this.state = {};
  }

  componentWillMount() {
    this.props.dispatch({ type: ‘WATCH_GET_PROJECT‘, payload: { projectName: ‘tap4fun‘ } });
  }

  render() {
    const className = ‘Wechat-Home‘;

    return (
      <div className={`${className}`}>
        <h1>Home</h1>
        <h2>The project name is : { this.props.reducer.projectName }</h2>
      </div>
    );
  }
}

const mapStateToProps = state => ({
  reducer: state.wechat
});

export default connect(mapStateToProps)(Home)

新建container

/src下新建container,在container中新建文件AppContainer.js。我们整个react应用都装在这个容器里面。AppContainer.js的代码如下。

而其中的Provider组件,将包裹我们应用的容器AppLayout包在其中,使得下面的所有子组件都可以拿到state。Provider接受store参数作为props,然后通过context往下传递。

import React, { Component } from ‘react‘;
import PropTypes from ‘prop-types‘;
import { Provider } from ‘react-redux‘;
import { BrowserRouter as Router } from ‘react-router-dom‘;
import AppLayout from ‘../layout/AppLayout‘;

class AppContainer extends Component {
  constructor(props) {
    super(props);

    this.state = {};
  }

  static propTypes = {
    store: PropTypes.object.isRequired
  };

  render() {
    const { store } = this.props;

    return (
      <Provider store={store}>
        <Router>
          <AppLayout />
        </Router>
      </Provider>
    );
  }
}

export default AppContainer;

修改项目入口文件

更新/src/index.js,代码如下。在此处会将create出来的store容器当作属性传入到Appcontainer中,作为我们应用的状态容器。

import React from ‘react‘;
import ReactDOM from ‘react-dom‘;
import ‘./index.css‘;
import * as serviceWorker from ‘./serviceWorker‘;
import AppContainer from ‘./container/AppContainer‘;
import createStore from ‘./store/createStore‘;

const store = createStore();

ReactDOM.render(<AppContainer store={store} />, document.getElementById(‘root‘));

// If you want your app to work offline and load faster, you can change
// unregister() to register() below. Note this comes with some pitfalls.
// Learn more about service workers: http://bit.ly/CRA-PWA
serviceWorker.unregister();

新建store

新建/src/store/craeteStore.js,代码如下。通过以下的方式,我们可以给redux添加很多中间件,甚至是自己写的中间件。

比如,我们可以自己实现一个日志中间件,然后添加到中间件数组middleWares中,在创建redux的store的时候,应用我们自己写的中间件。

import { applyMiddleware, compose, createStore } from ‘redux‘;
import createSagaMiddleware  from ‘redux-saga‘;
import rootReducer from ‘../reducers‘;
import rootSaga  from ‘../saga‘;

export default function configureStore(preloadedState) {
  // 创建saga中间件
  const sagaMiddleware = createSagaMiddleware();
  const middleWares = [sagaMiddleware];
  const middlewareEnhancer = applyMiddleware(...middleWares);

  const enhancers = [middlewareEnhancer];
  const composedEnhancers = compose(...enhancers);

  // 创建存储容器
  const store = createStore(rootReducer, preloadedState, composedEnhancers);
  sagaMiddleware.run(rootSaga);

  return store;
}

在这引入了redux-saga。我之前在使用redux的时候,几乎在每个模块都要写相应的action和reducer,然后在相应的模块文件中引入action的函数,然后在使用mapDispatchToProps将该函数注入到props中,在相应的函数中调用。并且,一个action不能复用,即使触发的是相同的reducer。这样就会出现很多重复性的代码,新增一个模块的工作也相对繁琐了很多。

但是使用了redux-saga之后,只需要在reducer中定义好相应类型的操作和saga就可以了。不需要定义action的函数,不需要在文件中引入action中函数,甚至连mapDispatchToProps都不需要,直接使用this.props.dispatch({ ‘type‘: ‘WATCH_GET_PROJECT‘ })就可以调用。而且,action可以复用。

新建saga

新建/src/saga/index.js,代码如下。

import { put, takeEvery } from ‘redux-saga/effects‘;
import { delay } from ‘redux-saga‘;

export function* fetchProject() {
  yield delay(1000);
  yield put({ type: ‘GET_PROJECT‘ })
}

export default function * rootSaga() {
  yield takeEvery(‘WATCH_GET_PROJECT‘, fetchProject);
}

新建reducer

新建/src/reducers/wechat.js,代码如下。

const initialState = {
  projectName: null
};

export default function counter(state = initialState, action) {
  let newState = state;
  switch (action.type) {
    case ‘GET_PROJECT‘:
      newState.projectName = action.payload.projectName;
      break;
    default:
      break;
  }
  return {...newState}
}

新建/src/reducers/index.js,代码如下。

import { combineReducers } from ‘redux‘;
import Wechat from ‘./wechat‘;

export default combineReducers({
  wechat: Wechat
});

在这里我们使用了combineReducers。在之前的基于redux的应用程序中,常见的state结构就是一个简单的JavaScript对象。

重新启动应用

到此处,重新启动应用,就可以在http://localhost:3000/wechat/home下看到从reducer中取出的数据。

在页面中,我们就可以通过代码this.props.dispatch的方式,来触发action。

参考

项目源代码

Github仓库

以上是关于Axon Framework - saga 基础设施的主要内容,如果未能解决你的问题,请参考以下文章

Axon Framework - Spring Boot 集成

Axon Framework 扩展 - Spring AMQP

DDD随笔-Axon

Spring Micro 服务如何知道 Axon Server Port

electron+react-redux-saga基础项目配置

Axon 服务器命令不包含路由键