分布式事务

Posted 沉梦匠心

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了分布式事务相关的知识,希望对你有一定的参考价值。


title: 分布式事务
tags: [分布式事务]
date: 2020/1/16 20:26:25
categories:

  • java
  • 分布式事务

基础概念

本地事务

关系型数据库, ACID

产生分布式的场景

分布式服务(跨网络),单应用 分布式数据库(多数据源),跨jvm服务

传统分布式事务模型

  • 产品模型产生经历

  • 存在差异 -> parttern(若干模式) -> 产品

  • 有关算法

    paxos,zab,raft

  • 事务延迟问题

    • (预提交 日志记录 网络延迟)
    • 延迟影响吞吐
  • 分布式下的mvcc顺序问题

    • 逻辑时间戳(happen-before)

      • 物理时间不一定一致

      • 单机下可以有happen-before关系

      • scn(oracle),trx_id(innodb)

    • mvcc存储模式

      pg-xc : 发号器 (version=version+1)

      TrueTimeAPI,原子时钟,NTP

补偿事务

TCC

补偿事务是一种在业务端实施业务逆向操作事务,来保证业务数据一致性的方式。

  1. 不同的业务要写不同的补偿事务,不具备通用性

  2. 没有考虑补偿事务的失败

  3. 如果业务流程很复杂,if/else会嵌套非常多层

事务后置提交优化

一个事务,分成执行与提交两个阶段,执行的时间其实是很长的,而commit的执行其实是很快

副作用

  • 事务提交后置可以降低【数据不一致】的出现概率

  • 事务提交时会释放数据库的连接,如果不后置事务提交,第一个库事务提交,数据库连接就释放了,后置事务提交的方案,所有库的连接,要等到所有事务执行完才释放。这就意味着,数据库连接占用的时间增长了,系统整体的吞吐量降低了。

使用示例

trx1.exec();trx1.commit();
trx2.exec();trx2.commit();
trx3.exec();trx3.commit();
优化为:
trx1.exec();trx2.exec();trx3.exec();
trx1.commit();trx2.commit();trx3.commit();

/*  1、改动成本极低,
	2、不能彻底解决多库分布式事务数据一致性问题,但能大大降低数据不一致的概率
	3、带来的副作用是数据库连接占用时间会增长,吞吐量会降低。
	4、对于一致性与吞吐量的折衷,还需要业务架构师谨慎权衡折衷
*/

CAP & BASE

cap组合 : CP,AP

  • 分布式环境下随着服务节点越来越多,如果想保证数据一致性,代价是越来越高了

base理论

  • 基本可用,柔性状态(软状态),最终一致性

2PC

  • prepare pharse

    • TM给每个参与者发送prepare消息,参与者执行本地事务但是不提交

    • 并记录undo/redo记录

      • undo 修改前数据
      • redo 修改后数据(用于提交后写入数据文件)
  • commit pharse

    • commit
    • rollback
    • 释放锁资源

mysql,oracle 都实现了2pc协议

XA 方案

所谓的 XA 方案,即:两阶段提交,有一个TM(事务管理器)的概念,负责协调多个数据库(资源管理器)的事务,事务管理器先问问各个数据库你准备好了吗?如果每个数据库都回复 ok,那么就正式提交事务,在各个数据库上执行操作;如果任何其中一个数据库回答不 ok,那么就回滚事务。

这种分布式事务方案,比较适合单块应用里,跨多个库的分布式事务,而且因为严重依赖于数据库层面来搞定复杂的事务,效率很低,绝对不适合高并发的场景。如果要玩儿,那么基于 Spring + JTA 就可以搞定

  • XA事务

    ShardingSphere默认的XA事务管理器为Atomikos,在项目的logs目录中会生成xa_tx.log, 这是XA崩溃恢复时所需的日志,请勿删除。

  • 需要数据库支持XA

  • 资源要锁定到两个阶段结束,性能较差

Seata AT实现2PC

  • AT模式(2PC),TCC模式

  • 设计思想上业务无侵入 AT模式

  • TC + 全局事务 + 分支事务

角色

  • TC (事务协调器)
    • 独立布署运行,维护全局事务的运行状态
    • 接收TM指令发起全局事务的提交和回滚,负责和RM通信协调分支事务
  • TM(事务管理器)
    • 嵌入到应用程序中,开启全局事务,上报指令到TC
  • RM(数据库实例-资源管理器)
    • 控制分支事务

具体流程

用户注册(用户服务) 送积分 (积分服务)

  1. 用户服务的TM是全局事务的开端

  2. 用户服务的TM向TC申请开启一个全局事务,创建成功并生成一个xid

  3. 用户服务的RM向TC注册分支事务,该分支事务在执行完新增用户逻辑后,将其纳入到xid对应全局事务的管辖

  4. 用户执行分支事务,向用户表插入一条记录

    • 使用了数据源代理的方式向undo_log中添加数据
    • undo_log中保存了修改前和修改后的值
  5. 远程调用积分服务时(xid在微服务的调用链路的上下文中传播)

    • 积分服务RM向TC注册分支事务,该分支事务在执行积分逻辑后,将其纳入到xid对应全局事务的管辖
  6. 积分服务执行分支事务,向积分表插入一条记录,执行后返回用户服务

  7. 用户服务分支事务执行完毕

  8. TM向TC发起xid全局事务的提交或回滚

  9. TC 调度xid对应的所有分支事务完成提交或回滚

    • 提交成功 删除undo_log
    • 解析undo_log ,把执行成功的事务 生成反向操作语句,执行回滚

与传统2PC的区别

  • Seata AT 的RM角色是以jar包的形式布署在应用程序的一侧
  • Seata at 在第一阶段就将本地事务提交了

案例

BASE柔性事务管理器 SEATA-AT
  • https://github.com/seata/seata-workshop

  • 在每一个分片数据库实例中执创建undo_log表(以MySQL为例)

  • context:jackson ; rollback_info : json数据

    CREATE TABLE IF NOT EXISTS `undo_log`
    (
      `id`            BIGINT(20)   NOT NULL AUTO_INCREMENT COMMENT \'increment id\',
      `branch_id`     BIGINT(20)   NOT NULL COMMENT \'branch transaction id\',
      `xid`           VARCHAR(100) NOT NULL COMMENT \'global transaction id\',
      `context`       VARCHAR(128) NOT NULL COMMENT \'undo_log context,such as serialization\',
      `rollback_info` LONGBLOB     NOT NULL COMMENT \'rollback info\',
      `log_status`    INT(11)      NOT NULL COMMENT \'0:normal status,1:defense status\',
      `log_created`   DATETIME     NOT NULL COMMENT \'create datetime\',
      `log_modified`  DATETIME     NOT NULL COMMENT \'modify datetime\',
      PRIMARY KEY (`id`),
      UNIQUE KEY `ux_undo_log` (`xid`, `branch_id`)
    ) ENGINE = InnoDB
      AUTO_INCREMENT = 1
      DEFAULT CHARSET = utf8 COMMENT =\'AT transaction mode undo table\';
    

@GlobalTransactional => xid {branchids }

fegin远程调用时(自动加入了xid)

TCC

适用场景

​ 对于常见的微服务系统,大部分接口调用是同步的,也就是一个服务直接调用另外一个服务的接口。

这个时候,用 TCC 分布式事务方案来保证各个接口的调用,要么一起成功,要么一起回滚,是比较合适的。

TCC解释

  • Try: 其实就是所谓的 TCC 分布式事务中的第一个 T 字母代表的阶段,也就是 Try 阶段。

总结上述过程,如果你要实现一个 TCC 分布式事务,首先你的业务的主流程以及各个接口提供的业务含义,不是说直接完成那个业务操作,而是完成一个 Try 的操作。

这个操作,一般都是锁定某个资源,比如订单状态为“修改中”,库存锁定x(可销售库存减少x),客户积分预增x,仓库出库单(状态为UNKNOW)等等

  • Confirm:

  • Cancel:

  • try ,confirm , cancel

  • 需要实现确认和补偿逻辑

  • 需要支持幂等

  • 性能提升:具体业务来实现控制资源锁的粒度变小,不会锁定整个资源。

    数据最终一致性:基于 Confirm 和 Cancel 的幂等性,保证事务最终完成确认或者取消,保证数据的一致性。

    可靠性:解决了 XA 协议的协调者单点故障问题,由主业务方发起并控制整个业务活动,业务活动管理器也变成多点,引入集群。

  • TCC 的 Try、Confirm 和 Cancel 操作功能要按具体业务来实现,业务耦合度较高,提高了开发成本。

  • 2PC机制的业务阶段 等价于 TCC机制的try业务阶段;

    2PC机制的提交阶段(prepare & commit) 等价于 TCC机制的提交阶段(confirm);
    2PC机制的回滚阶段(rollback) 等价于 TCC机制的回滚阶段(cancel)。

  • 2PC: begin -> 业务逻辑 -> prepare -> commit ;TCC: begin -> 业务逻辑(try业务) -> commit(comfirm业务)

Seata TCC

Hmily TCC

  • 支持嵌套事务
  • 采用disruptor框架进行事务日志异步读写
  • 本地事务存储支持 redis,file ,mysql,zk,mongodb
  • 事务日志序列化支持:json,protobuf,hessain,kryo
  • 不需要协调者(利用aop拦截)
// 事务发起方
@Hmily (confirmMethod = "commit" , cancelMethod = "rollback")
@Transational
public void transfer(String account,long money){
    //get 全局事务id
    String txId = HmilyTransationContextLocal.getInstance().get().getTransId();
    //try幂等判断
    if(accountDao.isExistTry(txId) > 0 ){ }
    //try悬挂处理
    if(accountDao.isExistCancel(txId) > 0 || accountDao.isExistConfirm(txId) > 0 ){}
    //业务处理 扣款
    
    accountDao.subtractAccount(account,money){};
    //try执行记录,用于幂等判断
    accountDao.addTryLog(txId);
    //远程调用对方账户的添加余额
    //rpc,feginClient
    
}

public void commit(String account,long money){
    
}
@Transactional
public void rollback(String account,long money){
     //get 全局事务id
    String txId = HmilyTransationContextLocal.getInstance().get().getTransId();
     //cancel幂等判断
    if(accountDao.isExistCancel(txId) > 0 ){ }
    // 空回滚处理,如果try没有执行,cancel不允许执行
    if(accountDao.isExistTry(txId) <= 0 ){ }
    // 返回余额
    accountDao.addAccountBanlance(account,money);
    //cancel执行记录,用于幂等判断
    accountDao.addCancelLog(txId);
    
}


//对方账号
@Transactional    
public void commit(String account,long money){
     //get 全局事务id
    String txId = HmilyTransationContextLocal.getInstance().get().getTransId();
     //confirm幂等判断
    if(accountDao.isExistConfirm(txId) > 0 ){ }
   
    // 添加余额
    accountDao.addAccountBanlance(account,money);
     //confirm执行记录,用于幂等判断
    accountDao.addConfirmLog(txId);
    
}
 

使用总结

https://www.cnblogs.com/jajian/p/10014145.html

首先需要选择某种 TCC 分布式事务框架,各个服务里就会有这个 TCC 分布式事务框架在运行。

然后你原本的一个接口,要改造为 3 个逻辑,Try-Confirm-Cancel:

  • 先是服务调用链路依次执行 Try 逻辑。
  • 如果都正常的话,TCC 分布式事务框架推进执行 Confirm 逻辑,完成整个事务。
  • 如果某个服务的 Try 逻辑有问题,TCC 分布式事务框架感知到之后就会推进执行各个服务的 Cancel 逻辑,撤销之前执行的各种操作。

这就是所谓的 TCC 分布式事务。TCC 分布式事务的核心思想,说白了,就是当遇到下面这些情况时:

  • 某个服务的数据库宕机了。
  • 某个服务自己挂了。
  • 那个服务的 Redis、Elasticsearch、MQ 等基础设施故障了。
  • 某些资源不足了,比如说库存不够这些。

先来 Try 一下,不要把业务逻辑完成,先试试看,看各个服务能不能基本正常运转,能不能先冻结我需要的资源。

如果 Try 都 OK,也就是说,底层的数据库、Redis、Elasticsearch、MQ 都是可以写入数据的,并且你保留好了需要使用的一些资源(比如冻结了一部分库存)。

接着,再执行各个服务的 Confirm 逻辑,基本上 Confirm 就可以很大概率保证一个分布式事务的完成了。

那如果 Try 阶段某个服务就失败了,比如说底层的数据库挂了,或者 Redis 挂了,等等。

此时就自动执行各个服务的 Cancel 逻辑,把之前的 Try 逻辑都回滚,所有服务都不要执行任何设计的业务逻辑。保证大家要么一起成功,要么一起失败。

等一等,你有没有想到一个问题?如果有一些意外的情况发生了,比如说订单服务突然挂了,然后再次重启,TCC 分布式事务框架是如何保证之前没执行完的分布式事务继续执行的呢?

所以,TCC 事务框架都是要记录一些分布式事务的活动日志的,可以在磁盘上的日志文件里记录,也可以在数据库里记录。保存下来分布式事务运行的各个阶段和状态。

问题还没完,万一某个服务的 Cancel 或者 Confirm 逻辑执行一直失败怎么办呢?

那也很简单,TCC 事务框架会通过活动日志记录各个服务的状态。举个例子,比如发现某个服务的 Cancel 或者 Confirm 一直没成功,会不停的重试调用它的 Cancel 或者 Confirm 逻辑,务必要它成功!

当然了,如果你的代码没有写什么 Bug,有充足的测试,而且 Try 阶段都基本尝试了一下,那么其实一般 Confirm、Cancel 都是可以成功的!

异常处理

  • 空回滚

    • 没有执行try却调用了第二阶段的cancel

    • 产生原因

      当一个分支事务所在服务当机或网络异常,分支事务调用记录失败,故障恢复后,分布式事务进行回滚会调用第二阶段的Cancel方法,从而形成空回滚

    • 解决

      • 利用try执行记录,判断是否为空回滚 (try_log)
  • 幂等

    • 因为重试机制
    • 也只可以利用confirm执行记录进行判断
    • confirm_log
  • 悬挂

    • cancel接口比try先执行

      try阶段rpc调用分支事务超时,触发cancel。但是try的rpc调用这个时候又到达了,

      这样就锁定了部分业务资源,且这部分资源没人能够处理。

    • 解决

      • 如果第二阶段已经执行,就不要再继续执行第一阶段
      • cancel_log

可靠消息最终一致性

  • 问题1:本地事务和消息发送的原子性(同一本地事务控制)

    • 本地事务执行成功,消息发送超时(可能MQ已经收到),本地事务回滚
      • 本地消息表方案 + 定时任务
      • rocketmq事务消息
  • 问题2:100%投递到MQ, 100%投递到参与方

    • 本地消息表方案 + 定时任务
    • 重试+ack机制
  • 问题3:由问题2带来的消息幂等性问题

  • 异步调用

实际系统的开发过程中,可能服务间的调用是异步的。也就是说,一个服务发送一个消息给 MQ( RocketMQ、RabbitMQ、Kafka)等等,然后,另外一个服务从 MQ 消费到一条消息后进行处理。这就成了基于 MQ 的异步调用了

可靠消息最终一致性方案的核心流程

①上游服务投递消息

如果要实现可靠消息最终一致性方案,一般你可以自己写一个可靠消息服务,实现一些业务逻辑。

首先,上游服务需要发送一条消息给可靠消息服务。这条消息说白了,你可以认为是对下游服务一个接口的调用,里面包含了对应的一些请求参数。

然后,可靠消息服务就得把这条消息存储到自己的数据库里去,状态为“待确认”。

接着,上游服务就可以执行自己本地的数据库操作,根据自己的执行结果,再次调用可靠消息服务的接口。

如果本地数据库操作执行成功了,那么就找可靠消息服务确认那条消息。如果本地数据库操作失败了,那么就找可靠消息服务删除那条消息。

此时如果是确认消息,那么可靠消息服务就把数据库里的消息状态更新为“已发送”,同时将消息发送给 MQ。

这里有一个很关键的点,就是更新数据库里的消息状态和投递消息到 MQ。这俩操作,你得放在一个方法里,而且得开启本地事务。

啥意思呢?如果数据库里更新消息的状态失败了,那么就抛异常退出了,就别投递到 MQ;如果投递 MQ 失败报错了,那么就要抛异常让本地数据库事务回滚。这俩操作必须得一起成功,或者一起失败。

如果上游服务是通知删除消息,那么可靠消息服务就得删除这条消息。

②下游服务接收消息

下游服务就一直等着从 MQ 消费消息好了,如果消费到了消息,那么就操作自己本地数据库。

如果操作成功了,就反过来通知可靠消息服务,说自己处理成功了,然后可靠消息服务就会把消息的状态设置为“已完成”。

③如何保证上游服务对消息的 100% 可靠投递?

上面的核心流程大家都看完:一个很大的问题就是,如果在上述投递消息的过程中各个环节出现了问题该怎么办?

我们如何保证消息 100% 的可靠投递,一定会从上游服务投递到下游服务?别着急,下面我们来逐一分析。

  • 上游调用可靠消息服务异常

如果上游服务给可靠消息服务发送待确认消息的过程出错了,那没关系,上游服务可以感知到调用异常的,就不用执行下面的流程了,这是没问题的。

  • 本地事务执行后通知可靠消息服务失败 或者 MQ投递失败

    可靠消息服务中的 消息状态变更(已发送)和MQ投递操作必须在一个本地事务中

    定时任务+回查本地事务接口

如果上游服务操作完本地数据库之后,通知可靠消息服务确认消息或者删除消息的时候,出现了问题。

比如:没通知成功,或者没执行成功,或者是可靠消息服务没成功的投递消息到 MQ。这一系列步骤出了问题怎么办?

其实也没关系,因为在这些情况下,那条消息在可靠消息服务的数据库里的状态会一直是“待确认”。

此时,我们在可靠消息服务里开发一个后台定时运行的线程,不停的检查各个消息的状态。

如果一直是“待确认”状态,就认为这个消息出了点什么问题。此时的话,就可以回调上游服务提供的一个接口,问问说,兄弟,这个消息对应的数据库操作,你执行成功了没啊?

如果上游服务答复说,我执行成功了,那么可靠消息服务将消息状态修改为“已发送”,同时投递消息到 MQ。

如果上游服务答复说,没执行成功,那么可靠消息服务将数据库中的消息删除即可。

通过这套机制,就可以保证,可靠消息服务一定会尝试完成消息到 MQ 的投递。

④如何保证下游服务对消息的 100% 可靠接收?

重复通知+幂等设计

那如果下游服务消费消息出了问题,没消费到?或者是下游服务对消息的处理失败了,怎么办?

其实也没关系,在可靠消息服务里开发一个后台线程,不断的检查消息状态。

如果消息状态一直是“已发送”,始终没有变成“已完成”,那么就说明下游服务始终没有处理成功。

此时可靠消息服务就可以再次尝试重新投递消息到 MQ,让下游服务来再次处理。

只要下游服务的接口逻辑实现幂等性,保证多次处理一个消息,不会插入重复数据即可。

⑤基于 RocketMQ 的可靠消息最终一致性方案?

在上面的通用方案设计里,完全依赖可靠消息服务的各种自检机制来确保:

  • 如果上游服务的数据库操作没成功,下游服务是不会收到任何通知。
  • 如果上游服务的数据库操作成功了,可靠消息服务死活都会确保将一个调用消息投递给下游服务,而且一定会确保下游服务务必成功处理这条消息。

通过这套机制,保证了基于 MQ 的异步调用/通知的服务间的分布式事务保障。其实阿里开源的 RocketMQ,就实现了可靠消息服务的所有功能,核心思想跟上面类似。

只不过 RocketMQ 为了保证高并发、高可用、高性能,做了较为复杂的架构实现,非常的优秀。有兴趣的同学,自己可以去查阅 RocketMQ 对分布式事务的支持。

  1. A 系统先发送一个 prepared (half msg) 消息到 RocketMQ,如果这个 prepared 消息发送失败那么就直接取消操作别执行了;

  2. 如果这个消息发送成功过了,那么接着执行本地事务,如果成功就告诉 RocketMQ 发送确认消息,如果失败就告诉 RocketMQ 回滚消息;

  3. 如果发送了确认消息,那么此时 B 系统会接收到确认消息,然后执行本地的事务;

  4. RocketMQ 会自动定时轮询所有 prepared 消息回调你的接口,问你,这个消息是不是本地事务处理失败了,所有没发送确认的消息,是继续重试还是回滚?一般来说这里你就可以查下数据库看之前本地事务是否执行,如果回滚了,那么这里也回滚吧。这个就是避免可能本地事务执行成功了,而确认消息却发送失败了。

  5. 这个方案里,要是系统 B 的事务失败了咋办?重试咯,自动不断重试直到成功,如果实在是不行,要么就是针对重要的资金类业务进行回滚,比如 B 系统本地回滚后,想办法通知系统 A 也回滚;或者是发送报警由人工来手工回滚和补偿。

  6. 这个还是比较合适的,目前国内互联网公司大都是这么玩儿的,要不你就用 RocketMQ 支持的,要不你就自己基于类似 ActiveMQ?RabbitMQ?自己封装一套类似的逻辑出来,总之思路就是这样子的。

    • 支持事务消息的MQ,其支持事务消息的方式采用类似于二阶段提交。
    1. 消费者向MQ发送half消息。

    2. MQ Server将消息持久化后,向发送方ack确认消息发送成功。

    3. 消费者开始执行事务逻辑。

    4. 消费者根据本地事务执行结果向MQ Server提交二次确认或者回滚。

    5. MQ Server收到commit状态则将half消息标记可投递状态。

    6. 服务提供者收到该消息,执行本地业务逻辑。返回处理结果。

    https://blog.csdn.net/squirrelanimal0922/article/details/97238041

    RocketMQ主要解决了两个功能:
    1、本地事务与消息发送的原子性问题。
    2、事务参与方接收消息的可靠性。

可靠消息最终一致+高可用又当如何

①自行封装 MQ 客户端组件与故障感知

首先第一点,你要做到自动感知 MQ 的故障接着自动完成降级,那么必须动手对 MQ 客户端进行封装,发布到公司 Nexus 私服上去。

然后公司需要支持 MQ 降级的业务服务都使用这个自己封装的组件来发送消息到 MQ,以及从 MQ 消费消息。

在你自己封装的 MQ 客户端组件里,你可以根据写入 MQ 的情况来判断 MQ 是否故障。

比如说,如果连续 10 次重新尝试投递消息到 MQ 都发现异常报错,网络无法联通等问题,说明 MQ 故障,此时就可以自动感知以及自动触发降级开关。

②基于 KV 存储中队列的降级方案

如果 MQ 挂掉之后,要是希望继续投递消息,那么就必须得找一个 MQ 的替代品。

举个例子,比如我那位朋友的公司是没有高并发场景的,消息的量很少,只不过可用性要求高。此时就可以使用类似 Redis 的 KV 存储中的队列来进行替代。

由于 Redis 本身就支持队列的功能,还有类似队列的各种数据结构,所以你可以将消息写入 KV 存储格式的队列数据结构中去。

PS:关于 Redis 的数据存储格式、支持的数据结构等基础知识,请大家自行查阅了,网上一大堆。

但是,这里有几个大坑,一定要注意一下:

第一个,任何 KV 存储的集合类数据结构,建议不要往里面写入数据量过大,否则会导致大 Value 的情况发生,引发严重的后果。

因此绝不能在 Redis 里搞一个 Key,就拼命往这个数据结构中一直写入消息,这是肯定不行的。

第二个,绝对不能往少数 Key 对应的数据结构中持续写入数据,那样会导致热 Key 的产生,也就是某几个 Key 特别热。

大家要知道,一般 KV 集群,都是根据 Key 来 Hash 分配到各个机器上的,你要是老写少数几个 Key,会导致 KV 集群中的某台机器访问过高,负载过大。

基于以上考虑,下面是笔者当时设计的方案:

  • 根据它们每天的消息量,在 KV 存储中固定划分上百个队列,有上百个 Key 对应。
  • 这样保证每个 Key 对应的数据结构中不会写入过多的消息,而且不会频繁的写少数几个 Key。
  • 一旦发生了 MQ 故障,可靠消息服务可以对每个消息通过 Hash 算法,均匀的写入固定好的上百个 Key 对应的 KV 存储的队列中。

同时需要通过 ZK 触发一个降级开关,整个系统在 MQ 这块的读和写全部立马降级。

③下游服务消费 MQ 的降级感知

下游服务消费 MQ 也是通过自行封装的组件来做的,此时那个组件如果从 ZK 感知到降级开关打开了,首先会判断自己是否还能继续从 MQ 消费到数据?

如果不能了,就开启多个线程,并发的从 KV 存储的各个预设好的上百个队列中不断的获取数据。

每次获取到一条数据,就交给下游服务的业务逻辑来执行。通过这套机制,就实现了 MQ 故障时候的自动故障感知,以及自动降级。如果系统的负载和并发不是很高的话,用这套方案大致是没问题的。

因为在生产落地的过程中,包括大量的容灾演练以及生产实际故障发生时的表现来看,都是可以有效的保证 MQ 故障时,业务流程继续自动运行的。

④故障的自动恢复

如果降级开关打开之后,自行封装的组件需要开启一个线程,每隔一段时间尝试给 MQ 投递一个消息看看是否恢复了。

如果 MQ 已经恢复可以正常投递消息了,此时就可以通过 ZK 关闭降级开关,然后可靠消息服务继续投递消息到 MQ,下游服务在确认 KV 存储的各个队列中已经没有数据之后,就可以重新切换为从 MQ 消费消息。

⑤更多的业务细节

上面说的那套方案是一套通用的降级方案,但是具体的落地是要结合各个公司不同的业务细节来决定的,很多细节多没法在文章里体现。

比如说你们要不要保证消息的顺序性?是不是涉及到需要根据业务动态,生成大量的 Key?等等。

此外,这套方案实现起来还是有一定的成本的,所以建议大家尽可能还是 Push 公司的基础架构团队,保证 MQ 的 99.99% 可用性,不要宕机。

其次就是根据大家公司实际对高可用的需求来决定,如果感觉 MQ 偶尔宕机也没事,可以容忍的话,那么也不用实现这种降级方案。

但是如果公司领导认为 MQ 中间件宕机后,一定要保证业务系统流程继续运行,那么还是要考虑一些高可用的降级方案,比如本文提到的这种。

最后再说一句,真要是一些公司涉及到每秒几万几十万的高并发请求,那么对 MQ 的降级方案会设计的更加的复杂,那就远远不是这么简单可以做到的。

本地消息表方案

本地消息表其实是国外的 ebay 搞出来的这么一套思想。

这个大概意思是这样的:

  1. A 系统在自己本地一个事务里操作同时,插入一条数据到消息表;
  2. 接着 A 系统将这个消息发送到 MQ 中去;
  3. B 系统接收到消息之后,在一个事务里,往自己本地消息表里插入一条数据,同时执行其他的业务操作,如果这个消息已经被处理过了,那么此时这个事务会回滚,这样保证不会重复处理消息
  4. B 系统执行成功之后,就会更新自己本地消息表的状态以及 A 系统消息表的状态;
  5. 如果 B 系统处理失败了,那么就不会更新消息表状态,那么此时 A 系统会定时扫描自己的消息表,如果有未处理的消息,会再次发送到 MQ 中去,让 B 再次处理;
  6. 这个方案保证了最终一致性,哪怕 B 事务失败了,但是 A 会不断重发消息,直到 B 那边成功为止。

这个方案说实话最大的问题就在于严重依赖于数据库的消息表来管理事务啥的,如果是高并发场景咋办呢?咋扩展呢?所以一般确实很少用。

最大努力通知方案

这个方案的大致意思就是:

  1. 系统 A 本地事务执行完之后,发送个消息到 MQ;
  2. 这里会有个专门消费 MQ 的最大努力通知服务,这个服务会消费 MQ 然后写入数据库中记录下来,或者是放入个内存队列也可以,接着调用系统 B 的接口;
  3. 要是系统 B 执行成功就 ok 了;要是系统 B 执行失败了,那么最大努力通知服务就定时尝试重新调用系统 B,反复 N 次,最后还是不行就放弃。

最大努力通知与可靠消息一致性有什么不同?

1、解决方案思想不同

  • 可靠消息一致性,发起通知方需要保证将消息发出去,并且将消息发到接收方,消息的可靠性关键由发起通知
    方来保证。
  • 最大努力通知,发起通知方尽最大的努力将业务处理结果通知为接收通知方,但是可能消息接收不到,此时需要接收通知方主动调用发起通知方的接口查询业务处理结果,通知的可靠性关键在接收通知方。

2、两者的业务应用场景不同

  • 可靠消息一致性关注的是交易过程的事务一致,以异步的方式完成交易。

  • 最大努力通知关注的是交易后的通知事务,即将交易结果可靠的通知出去(关注的不是发起通知方是谁,关注的是通知接收方是谁并可靠地接收通知)

3、技术解决方向不同

  • 可靠消息一致性要解决消息从发出到接收的一致性,即消息发出并且被接收到。
  • 最大努力通知无法保证消息从发出到接收的一致性,只提供消息接收的可靠性机制
    • 可靠机制是,最大努力的将消息通知给接收方,当消息无法被接收方接收时,由接收方主动查询消息(业务处理结果)。

解决方案

demo : 充值系统 -> 充值成功通知msg -> 账户系统

  • MQ的ack机制 + 通知接收方调用发起方的消息校对接口

    • 接收方监听MQ充值消息

    • 如果努力通知多次失败 ,通知接收方调用发起方的消息校对接口

    • 适合内部系统的通知

  • 也是MQ的ack机制 + 通知程序

    • 接收方不直接监听mq,由专门的通知程序监听MQ,再由通知程序通知各个接收方

方案1和方案2的不同点:
1、方案1中接收通知方与MQ接口,即接收通知方案监听 MQ,此方案主要应用与内部应用之间的通知。
2、方案2中由通知程序与MQ接口,通知程序监听MQ,收到MQ的消息后由通知程序通过互联网接口协议调用接收通知方。此方案主要应用于外部应用之间的通知,例如支付宝、微信的支付结果通知。

最大努力通知小结

  • 消息重复通知机制(ack)s
  • 消息校对机制

各方案对比

2PC TCC 可靠消息 本地消息表 最大努力通知
一致性 强一致 最终一致 最终一致 最终一致 最终一致
吞吐量
实现复杂度

合理使用分布式事务,从设计角度:高内聚低耦合,微服务拆分是否合理(粒度,边界)

DEMO

以上是关于分布式事务的主要内容,如果未能解决你的问题,请参考以下文章

理解片段事务期间片段的生命周期方法调用

提交带有全屏片段的片段事务

使用 OnItemClickListener 列出视图片段到片段事务

Android中的片段事务问题

Android从后台堆栈中删除事务

android 片段事务