事务管理之数据一致性

Posted 陈小兵

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了事务管理之数据一致性相关的知识,希望对你有一定的参考价值。

事务管理之数据一致性

1 传统单体应用事务一致性

1.1              本地事务

针对传统单体应用,单个关系型数据库的场景,比如:oracle , DB2

使用ACID 事务

 

 

1.2              分布式事务

针对传统单体应用,2个以上关系型数据库的场景,比如:oracle , DB2

基于XA的二次阶段提交方案(利用XA分布式事务实现)Oracle、DB2这些商业数据库都实现了XA接口

XA分布式事务是由一个或者多个Resource Managerd,一个事务管理器Transaction Manager以及一个应用程序 Application Program组成。

第一阶段(准备阶段)

 

 

TM 通知所有参与事务的各个 RM,给每个 RM 发送 prepare 消息。

RM 接收到消息后进入准备阶段后,要么直接返回失败,要么创建并执行本地事务,写本地事务日志(redo 和 undo 日志),但是 不提交(此处只保留最后一步耗时最少的提交操作给第二阶段执行)。

 

第二阶段(提交 / 回滚阶段)

 

 

TM 收到 RM 准备阶段的失败消息或者获取 RM 返回消息超时,则直接给 RM 发送回滚(rollback)消息,否则发送提交(commit)消息。

RM 根据 TM 的指令执行提交或者回滚,执行完成后释放所有事务处理过程中使用的锁(最后阶段释放锁)。

 

1.3              XA分布式事务配置实例:

引入jta实例

<dependency> 

    <groupId>com.atomikos</groupId> 

    <artifactId>transactions-jdbc</artifactId> 

    <version>3.7.0</version> 

</dependency> 

<dependency> 

    <groupId>javax.transaction</groupId> 

    <artifactId>jta</artifactId> 

    <version>1.1</version> 

</dependency> 

 

  1. mysql数据源配置:
<!-- mysql数据源 -->
<bean id="mysqlDS" class="com.atomikos.jdbc.AtomikosDataSourceBean"
      init-method="init" destroy-method="close">
    <description>mysql xa datasource</description>
    <property name="uniqueResourceName">
        <value>mysql_ds</value><!-- 自定义,唯一的数据源名称-->
    </property>
    <property name="xaDataSourceClassName" value="com.mysql.jdbc.jdbc2.optional.MysqlXADataSource" />
    <property name="xaProperties">
        <props>
            <prop key="user">userName</prop>
            <prop key="password">password</prop>
            <prop key="URL">jdbc\\:mysql\\://127.0.0.1\\:3306/dataBaseName?autoReconnect\\=true</prop>
        </props>
    </property>
    <!-- 连接池里面连接的个数-->
    <property name="poolSize" value="3"/>
</bean>

 

 

  1. oracle数据源配置
<!-- oracle数据源 -->
<bean id="oracleDS" class="com.atomikos.jdbc.AtomikosDataSourceBean"
      init-method="init" destroy-method="close">
    <description>oracle xa datasource</description>
    <property name="uniqueResourceName">
        <value>oracle_ds</value><!-- 自定义,唯一的数据源名称-->
    </property>
    <property name="xaDataSourceClassName">
        <value>oracle.jdbc.xa.client.OracleXADataSource</value>
    </property>
    <property name="xaProperties">
        <props>
            <prop key="user">userName</prop>
            <prop key="password">password</prop>
            <prop key="URL">jdbc\\:oracle\\:thin\\:@127.0.0.1\\:1521\\:dataBaseName</prop>
        </props>
    </property>
    <!-- 连接池里面连接的个数 -->
    <property name="poolSize" value="3"/>
</bean>

 

 

  1. JTA事务配置
<!-- atomikos事务管理器 -->
<bean id="atomikosTransactionManager" class="com.atomikos.icatch.jta.UserTransactionManager"
      init-method="init" destroy-method="close">
    <description>UserTransactionManager</description>
    <property name="forceShutdown">
        <value>true</value>
    </property>
</bean>

<bean id="atomikosUserTransaction" class="com.atomikos.icatch.jta.UserTransactionImp">
    <property name="transactionTimeout" value="300" />
</bean>

<!-- spring 事务管理器 -->
<bean id="springTransactionManager"
      class="org.springframework.transaction.jta.JtaTransactionManager">
    <property name="transactionManager">
        <ref bean="atomikosTransactionManager" />
    </property>
    <property name="userTransaction">
        <ref bean="atomikosUserTransaction" />
    </property>
</bean>

<!-- spring 事务模板 我在项目当中用的是编程式事务-->
<bean id="transactionTemplate"
      class="org.springframework.transaction.support.TransactionTemplate">
        <property name="transactionManager">
        <ref bean="springTransactionManager" />
    </property>
</bean>
 

 

4.两个数据源分别使用

<bean id="simpleJdbcTemplate" class="org.springframework.jdbc.core.simple.SimpleJdbcTemplate">
    <constructor-arg>
        <ref bean="mysqlDS" />
    </constructor-arg>
</bean>

<bean id="simplejdbcTemplateOra" class="org.springframework.jdbc.core.simple.SimpleJdbcTemplate">
    <constructor-arg>
        <ref bean="oracleDS" />
    </constructor-arg>
</bean>

<!-- spring atomikos 配置 结束-->

<!-- 接下来就是具体的Dao的配置 -->
<bean id="oracleJtaDao" class="com.dao.TerminalOracleJtaDao">
    <property name="simplejdbcTemplateOra">
        <ref bean="simplejdbcTemplateOra" />
    </property>
    <property name="transactionTemplate">
        <ref bean="transactionTemplate" />
    </property>
</bean>

<bean id="myaqlJtaDao" class="com.dao.TerminalMyaqlJtaDao">
    <property name="simpleJdbcTemplate">
        <ref bean="simpleJdbcTemplate" />
    </property>
    <property name="transactionTemplate">
        <ref bean="transactionTemplate" />
    </property>
</bean>

 

 

 

2 微服务架构下的数据一致性

XA分布式事务,局限于关系型数据库,而微服务架构下,通常会含由关系型数据库和非关系型数据库并用

微服务架构下的3种方案,实现数据最终一致性:

 

 

 

2.1              可靠消息 方案(跨平台业务)

利用MQ实现二阶段提交方案

此方案涉及 3 个模块:

上游应用,执行业务并发送 MQ 消息。

可靠消息服务和 MQ 消息组件,协调上下游消息的传递,并确保上下游数据的一致性。

下游应用,监听 MQ 的消息并执行自身业务。

 

 

 

2.1.1     第一阶段:上游应用执行业务并发送 MQ 消息

上游应用将本地业务执行和消息发送绑定在同一个本地事务中,保证要么本地操作成功并发送 MQ 消息,要么两步操作都失败并回滚。

上游应用和可靠消息之间的业务交互图如下

 

 

  1. 上游应用发送待确认消息到可靠消息系统
  2. 可靠消息系统保存待确认消息并返回
  3. 上游应用执行本地业务
  4. 上游应用通知可靠消息系统确认业务已执行并发送消息。
  5. 可靠消息系统修改消息状态为发送状态并将消息投递到 MQ 中间件。

以上每一步都可能出现失败情况,分析一下这 5 步出现异常后上游业务和消息发送是否一致:

 

 

上游应用执行完成,下游应用尚未执行或执行失败

2.1.2     第二阶段:下游应用监听 MQ 消息并执行业务

下游应用监听 MQ 消息并执行业务,并且将消息的消费结果通知可靠消息服务。

可靠消息的状态需要和下游应用的业务执行保持一致,可靠消息状态不是已完成时,确保下游应用未执行,可靠消息状态是已完成时,确保下游应用已执行。

 

 

  1. 下游应用监听 MQ 消息组件并获取消息
  2. 下游应用根据 MQ 消息体信息处理本地业务
  3. 下游应用向 MQ 组件自动发送 ACK 确认消息被消费
  4. 下游应用通知可靠消息系统消息被成功消费,可靠消息将该消息状态更改为已完成。

以上每一步都可能出现失败情况,分析一下这 4 步出现异常后下游业务和消息状态是否一致:

 

 

通过分析以上两个阶段可能失败的情况,为了确保上下游数据的最终一致性,在可靠消息系统中,需要开发 消息状态确认消息重发 两个功能

2.1.3        消息状态确认

可靠消息服务定时监听消息的状态,如果存在状态为待确认并且超时的消息,则表示上游应用和可靠消息交互中的步骤 4 或者 5 出现异常。

可靠消息则携带消息体内的信息向上游应用发起请求查询该业务是否已执行。上游应用提供一个可查询接口供可靠消息追溯业务执行状态,如果业务执行成功则更改消息状态为已发送,否则删除此消息确保数据一致。具体流程如下:

 

 

1.可靠消息查询超时的待确认状态的消息

2.向上游应用查询业务执行的情况

3.业务未执行,则删除该消息,保证业务和可靠消息服务的一致性。业务已执行,则修改消息状态为已发送,并发送消息到 MQ 组件。

 

2.1.4     消息重发

消息已发送则表示上游应用已经执行,接下来则确保下游应用也能正常执行。

可靠消息服务发现可靠消息服务中存在消息状态为已发送并且超时的消息,则表示可靠消息服务和下游应用中存在异常的步骤,无论哪个步骤出现异常,可靠消息服务都将此消息重新投递到 MQ 组件中供下游应用监听。

下游应用监听到此消息后,在保证幂等性的情况下重新执行业务并通知可靠消息服务此消息已经成功消费,最终确保上游应用、下游应用的数据最终一致性。具体流程如下:

 

 

1.可靠消息服务定时查询状态为已发送并超时的消息

2.可靠消息将消息重新投递到 MQ 组件中

3.下游应用监听消息,在满足幂等性的条件下,重新执行业务。

4.下游应用通知可靠消息服务该消息已经成功消费。

通过消息状态确认和消息重发两个功能,可以确保上游应用、可靠消息服务和下游应用数据的最终一致性。

当然在实际接入过程中,需要引入 人工干预 功能。比如引入重发次数限制,超过重发次数限制的将消息修改为死亡消息,等待人工干预。

 

2.2              TCC(Try-Confirm-Cancel)方案(同技术栈,平台内部)

TCC 方案是二阶段提交的 另一种实现方式,它涉及 3 个模块: 主业务从业务活动管理器(协作者)

 

 

第一阶段:主业务服务分别调用所有从业务服务的 try 操作,并在活动管理器中记录所有从业务服务。当所有从业务服务 try 成功或者某个从业务服务 try 失败时,进入第二阶段。

第二阶段:活动管理器根据第一阶段从业务服务的 try 结果来执行 confirm 或 cancel 操作。如果第一阶段所有从业务服务都 try 成功,则协作者调用所有从业务服务的 confirm 操作,否则,调用所有从业务服务的 cancel 操作。

在第二阶段中,confirm 和 cancel 同样存在失败情况,所以需要对这两种情况做 异常处理 以保证数据一致性。

Confirm 失败:则回滚所有 confirm 操作并执行 cancel 操作。

Cancel 失败:从业务服务需要提供自动 cancel 机制,以保证 cancel 成功。

 

基于 HTTP 协议的 TCC 实现。具体的实现流程如下:

 

 

  1. 主业务服务调用从业务服务的 try 操作,并获取 confirm/cancel 接口和超时时间。
  2. 如果从业务都 try 成功,主业务服务执行本地业务,并将获取的 confirm/cancel 接口发送给活动管理器,活动管理器会顺序调用从业务 1 和从业务 2 的 confirm 接口并记录请求状态,如果请求成功,则通知主业务服务提交本地事务。如果 confirm 部分失败,则活动管理器会顺序调用从业务 1 和从业务 2 的 cancel 接口来取消 try 的操作。
  3. 如果从业务部分或全部 try 失败,则主业务直接回滚并结束,而 try 成功的从业务服务则通过定时任务来处理处于 try 完成但超时的数据,将这些数据做回滚处理保证主业务服务和从业务服务的数据一致。

 

2.3              最大努力通知方案 (一致性低要求)

最大努力通知方案涉及三个模块:

l  上游应用,发消息到 MQ 队列。

l  下游应用(例如短信服务、邮件服务),接受请求,并返回通知结果。

l  最大努力通知服务,监听消息队列,将消息存储到数据库中,并按照通知规则调用下游应用的发送通知接口。

具体流程如下:

 

 

  1. 上游应用发送 MQ 消息到 MQ 组件内,消息内包含通知规则和通知地址
  2. 最大努力通知服务监听到 MQ 内的消息,解析通知规则并放入延时队列等待触发通知
  3. 最大努力通知服务调用下游的通知地址,如果调用成功,则该消息标记为通知成功,如果失败则在满足通知规则(例如 5 分钟发一次,共发送 10 次)的情况下重新放入延时队列等待下次触发。

最大努力通知服务表示在 不影响主业务 的情况下,尽可能地确保数据的一致性。它需要开发人员根据业务来指定通知规则,在满足通知规则的前提下,尽可能的确保数据的一致,以尽到最大努力的目的。

根据不同的业务可以定制不同的通知规则,比如通知支付结果等相对严谨的业务,可以将通知频率设置高一些,通知时间长一些,比如隔 5 分钟通知一次,持续时间 1 小时。

如果不重要的业务,比如通知用户积分增加,则可以将通知频率设置低一些,时间短一些,比如 10 分钟通知一次,持续 30 分钟。

代入上面提到的支付成功短信通知用户的案例,通过最大努力通知方案,当支付成功后,将消息发送到 MQ 中间件,在消息中,定义发送规则为 5 分钟一次,最大发送数为 10 次。

最大努力通知服务监听 MQ 消息并根据规则调用消息通知服务(短信服务)的消息发送接口,并记录每次调用的日志信息。在通知成功或者已通知 10 次时,停止通知。

 

以上是关于事务管理之数据一致性的主要内容,如果未能解决你的问题,请参考以下文章

数据库事务之隔离级别

Spring(十四)之事务

分布式事务数据最终一致性之serviceComb-Saga使用方法

Spring框架之事务处理源码分析

Golang之mysql

RocketMQ事务消息篇之事务消息的介绍