聊聊分布式事务一致性与本地消息表
Posted 又见阿郎
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了聊聊分布式事务一致性与本地消息表相关的知识,希望对你有一定的参考价值。
我个人比较推崇本地消息表模式来实现最终一致性。首先本地消息表的设计不仅可以解决事务一致性的问题,对于消息队列常见问题中的消息丢失与消息幂等其实都是可以通过本地消息表来解决;其带来的好处是多重的。
什么是分布式事务一致性
大白话就是对数据源进行拆分后,多库多机器的多数据库事务一致性问题。因为此时你的系统可能不是分布式的(当然这不满足分布式的概念),我想表达的是事务的一致性其实就是多机器多库下的数据事务一致性问题。
不管是计算机还是数学行业等,某些问题的解决都是基于理论科学来的,所以我们这行的顶级职称有计算机科学家这类的头衔。分布式事务的解决依赖CAP原则与BASE理论。
CAP
CAP理论描述了分布式系统中的基本原则,其中C是指Consistency(一致性),A是指Availability(可用性)和P是指Partition tolerance(区分容错性)。CAP原则指CAP三者不能同时满足,要么能同时满足CP即同时满足区分容错性和一致性,要么同时满足AP即同时满足区分容错性和可用性。从中可以看出,P是分布式系统的基础,没有区分容错性就谈不上分布式系统了。
CAP只能满足AP或CP的原因是,分布式节点之间通常存在一个数据拷贝的过程,在这一个过程中是只能满足AP或者CP的。举个例子好了,比如redis分布式集群中,当一个写请求打到一个主节点上,几乎同时另一个读请求打到redis这个主节点的对应从节点上,此时请问该从节点能返回刚才写在主节点的数据吗?若要保证CP,此时数据正在从主节点复制到从节点的路上,此时该节点的该数据是不可用的;若要保证AP,因为数据正在从主节点复制到从节点的路上,因此节点间的数据状态是不一致的。
BASE理论
前面讲到分布式系统的CAP原则要么同时满足AP要么同时满足CP,那么BASE理论则是CAP原则权衡的结果。BASE是指Basically Available(基本可用的),Soft state(软状态),Eventual consistency(最终一致性)。
Basically Available是指在分布式集群节点中,若某个节点宕机,或者在数据在节点间复制的过程中,只有部分数据不可用,但不影响整个系统的整体的可用性。
Soft state是指软状态即这个状态只是一个中间状态,允许数据在节点集群间操作过程中存在存在一个时延,这个中间状态最终会转化为最终状态。
Eventual consistency是指数据在分布式集群节点间操作过程中存在时延,与ACID相反,最终一致性不是强一致性,在经过一定时间后,分布式集群节点间的数据拷贝能达到最终一致的状态。
分布式事务一致性
说到分布式事务一致性,就离不开2PC与3PC理论。暂时不展开描述。先说说一致性的类型。
- 强一致性
- 弱一致性
- 最终一致性
强一致性解决方案
基于2PC与3PC实现的XA模式,比如Seata中XA模式的实现
最终一致性
- 本地消息表
- saga
- tcc
- at(seata框架独有)
什么是本地消息表模式
本地消息表方案最初是ebay提出的,其实也是BASE理论的应用,属于可靠消息最终一致性的范畴。其概念图如下:
这里是拆分出来了多个本地消息表,看自己的业务。如果规模比较小,可以只创建一个本地消息表。但业务比较大的时候,我个人是推崇这种模式,因为解耦,业务的发起方。只处理新建、已发送状态的消息;业务的消息方,只处理已接收、已处理状态的消息。
消息的丢失与幂等
消息队列的引入可以实现业务的异步与解耦,以及流量削峰。尤其是前两者,我在之前的项目中用到了,反正我很爽。但是没有银弹。消息队列的引入会带来一定的问题,其中最常见的就是消息的丢失与幂等!
消息幂等
消息丢失
总结
如上,引入一个中间的本地消息表,除了解决事务的一致性外,同样可以解决消息的丢失与幂等性问题,一举多得。而且从业务的健壮性与数据一致性来看,一般都会增加一个补偿机制,我在之前的项目中就强力推行补偿机制。本地消息表模式就是补充机制的最直观方式。
分布式事务| 使用 dotnetcore/CAP 的本地消息表模式
本地消息表模式
本地消息表模式,其作为柔性事务的一种,核心是将一个分布式事务拆分为多个本地事务,事务之间通过事件消息衔接,事件消息和上个事务共用一个本地事务存储到本地消息表,再通过定时任务轮询本地消息表进行消息投递,下游业务订阅消息进行消费,本质上是依靠消息的重试机制达到最终一致性。其示意图如下所示,主要分为以下三步:
本地业务数据和发布的事件消息共享同一个本地事务,进行数据落库,其中事件消息持久化到单独的事件发件箱表中。
单独的进程或线程不断查询发件箱表中未发布的事件消息。
将未发布的事件消息发布到消息代理,然后将消息的状态更新为已发布。
dotnetcore/CAP 简介
在《.NET 微服务:适用于容器化 .NET 应用程序的体系结构》电子书中,提及了如何设计兼具原子性和弹性的事件总线,其中提出了三种思路:使用完整的事件溯源模式,使用事务日志挖掘,使用发件箱模式(The outbox pattern)。其中事件溯源模式实现相对复杂,事务日志挖掘局限于特定类型数据库,而发件箱模式则是一种相对平衡的实现方式,其基于事务数据库表和简化的事件溯源模式。发件箱模式的示意图如下所示:
从上图可以看出,其实现原理与上面提及的本地消息表模式十分相似,我们可以理解其也是本地消息表模式的一种实现。作者Savorboard
也正是受该电子书启发,实现了.NET版本的本地消息表模式,并命名为dotnetcore/CAP
,其架构如下图所示。其同时也兼具EventBus的功能,其支持主流消息代理,如RabbitMQ、Redis、Kafka和Pulsar,同时支持多种持久化存储方式进行消息存储,包括MySQL、PostgreSQL、SQL Server和MongoDB。因此基于dotnetcore/CAP
,.NET 开发者也可以快速实现微服务间的异步通信和解决分布式事务问题。
基于dotnetcore/CAP 实现分布式事务
那具体如何使用dotnetcore/CAP
来解决分布式事务问题呢,基于本地消息表加补偿模式实现。dotnetcore/CAP的补偿模式比较巧妙,其基于发布事件的方法签名中提供了一个回调参数。发布方法的事件签名为:PublishAsync<T>(string name, T? contentObj, string? callbackName=null)
,第一个参数是事件名称,第二个参数为事件数据包,第三个参数用来指定于接收事件消费结果的回调地址(事件),但是否触发回调,取决于事件订阅方是否定义返回参数,若有则触发。如果基于CAP实现下单流程,则其流程如下所示:
接下来就来创建解决方案来实现以上下单流程示例。依次创建以下项目,订单服务、库存服务和支付服务均依赖共享类库项目,其中共享类库添加DotNetCore.Cap
、DotNetCore.Cap.MySql
和DotNetCore.Cap.RabbitMQ
NuGet包。
项目 | 项目名 | 项目类型 |
---|---|---|
订单服务 | CapDemo.OrderService | ASP.NET Core Web API |
库存服务 | CapDemo.InventoryService | Worker Service |
支付服务 | CapDemo.PaymentService | Worker Service |
共享类库 | CapDemo.Shared | Class Library |
订单服务
订单服务首先需要暴露WebApi用于订单的创建,为了方便数据的持久化,首先添加Pomelo.EntityFrameworkCore.MySql
Nuget包,然后创建OrderDbContext
:
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using CapDemo.OrderService.Domains;
namespace CapDemo.OrderService.Data
public class OrderDbContext : DbContext
public OrderDbContext (DbContextOptions<OrderDbContext> options)
: base(options)
public DbSet<CapDemo.OrderService.Domains.Order> Order get; set; = default!;
然后创建OrdersController
并添加PostOrder
方法如下所示:
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Mvc;
using Microsoft.EntityFrameworkCore;
using CapDemo.OrderService.Data;
using CapDemo.OrderService.Domains;
using DotNetCore.CAP;
using CapDemo.Shared;
using CapDemo.Shared.Models;
namespace CapDemo.OrderService.Controllers
[Route("api/[controller]")]
[ApiController]
public class OrdersController : ControllerBase
private readonly OrderDbContext _context;
private readonly ICapPublisher _capPublisher;
private readonly ILogger<OrdersController> _logger;
public OrdersController(OrderDbContext context, ICapPublisher capPublisher,ILogger<OrdersController> logger)
_context = context;
_capPublisher = capPublisher;
_logger = logger;
[HttpPost]
public async Task<ActionResult<Order>> PostOrder(CreateOrderDto orderDto)
var shoppingItems =
orderDto.ShoppingCartItems.Select(item => new ShoppingCartItem(item.SkuId, item.Price, item.Qty));
var order = new Order(orderDto.CustomerId).NewOrder(shoppingItems.ToArray());
using (var trans = _context.Database.BeginTransaction(_capPublisher, autoCommit: false))
_context.Order.Add(order);
var deduceDto = new DeduceInventoryDto()
OrderId = order.OrderId,
DeduceStockItems = order.OrderItems.Select(
item => new DeduceStockItem(item.SkuId, item.Qty, item.Price)).ToList()
;
await _capPublisher.PublishAsync(TopicConsts.DeduceInventoryCommand,deduceDto,
callbackName: TopicConsts.CancelOrderCommand);
await _context.SaveChangesAsync();
await trans.CommitAsync();
_logger.LogInformation($"Order [order.OrderId] created successfully!");
return CreatedAtAction("GetOrder", new id = order.OrderId , order);
从代码中可以看出,在订单持久化和事件发布之前先行使用事务包裹:using (var trans = _context.Database.BeginTransaction(_capPublisher, autoCommit: false))
,以确保订单和事件的持久化共享同一个事务,这一步是使用CAP的重中之重。订单服务通过注入了ICapPublisher
服务,并通过PublishAsync
方法发布扣减库存事件,并指定了callbackName: TopicConsts.CancelOrderCommand
。订单服务还需要订阅取消订单和订单支付结果的事件,进行订单状态的更新,添加OrderConsumers
如下所示,其中通过实现ICapSubscribe
接口来显式标记为消费者,然后定义方法并在方法体上通过[CapSubscribe]
特性指定订阅的事件名称来完成事件的消费。
using CapDemo.OrderService.Data;
using CapDemo.Shared;
using DotNetCore.CAP;
namespace CapDemo.OrderService.Consumers;
public class OrderConsumers:ICapSubscribe
private readonly OrderDbContext _orderDbContext;
private readonly ILogger<OrderConsumers> _logger;
public OrderConsumers(OrderDbContext orderDbContext,ILogger<OrderConsumers> logger)
_orderDbContext = orderDbContext;
_logger = logger;
[CapSubscribe(TopicConsts.CancelOrderCommand)]
public async Task CancelOrder(string orderId)
if(string.IsNullOrEmpty(orderId)) return;
var order = await _orderDbContext.Order.FindAsync(orderId);
order?.CancelOrder();
_logger.LogWarning($"Order [orderId] has been canceled!");
await _orderDbContext.SaveChangesAsync();
[CapSubscribe(TopicConsts.PayOrderSucceedTopic)]
public async Task MarkToPaid(string orderId)
var order = await _orderDbContext.Order.FindAsync(orderId);
order?.UpdateToPaid();
await _orderDbContext.SaveChangesAsync();
最后修改Program.cs
添加CAP服务和消费者的注册。
using CapDemo.OrderService.Consumers;
using CapDemo.OrderService.Data;
using Microsoft.EntityFrameworkCore;
using DotNetCore.CAP;
var builder = WebApplication.CreateBuilder(args);
// 注册 DbContext
var connectionStr = builder.Configuration.GetConnectionString("Default");
builder.Services.AddDbContext<OrderDbContext>(options =>
options.UseMySql(connectionStr ?? throw new InvalidOperationException("Connection string 'OrderDbContext' not found."), ServerVersion.AutoDetect(connectionStr)));
// 注册CAP
builder.Services.AddCap(x =>
x.UseEntityFramework<OrderDbContext>();
x.UseRabbitMQ("localhost");
);
// 注册消费者
builder.Services.AddTransient<OrderConsumers>();
库存服务
库存服务在整个下单流程的职责主要是库存的扣减和返还,添加InventoryConsumer
来消费库存扣减和返还事件即可。
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Text.Json;
using System.Threading.Tasks;
using CapDemo.Shared;
using CapDemo.Shared.Models;
using DotNetCore.CAP;
namespace CapDemo.InventoryService.Consumers
public class InventoryConsumer : ICapSubscribe
private readonly ILogger<InventoryConsumer> _logger;
private readonly ICapPublisher _capPublisher;
public InventoryConsumer(ILogger<InventoryConsumer> logger, ICapPublisher capPublisher)
_logger = logger;
_capPublisher = capPublisher;
[CapSubscribe(TopicConsts.DeduceInventoryCommand)]
public async Task DeduceInventory(DeduceInventoryDto deduceStockDto)
// 省略扣减库存逻辑,直接成功
_logger.LogInformation($"Inventory has been deducted for order [deduceStockDto.OrderId]!");
var amount = deduceStockDto.DeduceStockItems.Sum(t => t.Price * t.Qty);
await _capPublisher.PublishAsync(TopicConsts.PayOrderCommand, new PayDto(deduceStockDto.OrderId, amount),
callbackName: TopicConsts.ReturnInventoryTopic);
[CapSubscribe(TopicConsts.ReturnInventoryTopic)]
public void ReturnInventory(PayResult payResult)
// 若支付失败,则执行库存返还并发布取消订单命令
if (!payResult.IsSucceed)
// 省略返还库存逻辑
_logger.LogWarning($"Inventory has been returned for order [payResult.OrderId]");
_capPublisher.PublishAsync(TopicConsts.CancelOrderCommand, payResult.OrderId);
以上的库存扣减实现中省略了扣减库存逻辑,直接模拟成功扣减,也就无需触发回调,那就可以通过将方法签名定义为public async Task DeduceInventory(DeduceInventoryDto deduceStockDto)
,这样就不会触发订单服务发布扣减库存事件时指定的回调。库存扣减成功随即发布支付订单的命令,由于不涉及其他数据持久化,因此无需手动开启事务。发布支付订单命令时指定了callbackName: TopicConsts.ReturnInventoryTopic
,其将根据订单支付结果也就是ReturnInventory(PayResult payResult)
中指定的入参决定是否返还库存。最后同样需要在Program.cs
中注入CAP服务和消费者:
using CapDemo.InventoryService;
using CapDemo.InventoryService.Consumers;
IHost host = Host.CreateDefaultBuilder(args)
.ConfigureServices((context, services) =>
var connStr = context.Configuration.GetConnectionString("Default");
services.AddCap(x =>
x.UseMySql(connStr);
x.UseRabbitMQ("localhost");
);
services.AddTransient<InventoryConsumer>();
)
.Build();
await host.RunAsync();
支付服务
对于下单流程的支付用例来说,要么成功要么失败,并不需要像以上两个服务一样定义补偿逻辑,因此仅需要订阅支付订单命令即可,定义PaymentConsumers
如下所示,因为库存服务发布支付订单命令时指定的回调依赖支付结果,因此该方法必须指定与回调匹配的返回参数类型,也就是PayResult
。
using CapDemo.Shared;
using CapDemo.Shared.Models;
using DotNetCore.CAP;
namespace CapDemo.PaymentService.Consumers;
public class PaymentConsumers:ICapSubscribe
private readonly ICapPublisher _capPublisher;
private readonly ILogger<PaymentConsumers> _logger;
public PaymentConsumers(ICapPublisher capPublisher,ILogger<PaymentConsumers> logger)
_capPublisher = capPublisher;
_logger = logger;
[CapSubscribe(TopicConsts.PayOrderCommand)]
public async Task<PayResult> Pay(PayDto payDto)
bool isSucceed = false;
if (payDto.Amount % 2 == 0)
isSucceed = true;
_logger.LogInformation($"Order [payDto.OrderId] paid successfully!");
await _capPublisher.PublishAsync(TopicConsts.PayOrderSucceedTopic, payDto.OrderId);
else
isSucceed = false;
_logger.LogWarning($"Order [payDto.OrderId] payment failed!");
return new PayResult(payDto.OrderId, isSucceed);
最后同样需要在Program.cs
中注入CAP服务和消费者:
using CapDemo.PaymentService;
using CapDemo.PaymentService.Consumers;
IHost host = Host.CreateDefaultBuilder(args)
.ConfigureServices((context, services) =>
var connStr = context.Configuration.GetConnectionString("Default");
services.AddCap(x =>
x.UseMySql(connStr);
x.UseRabbitMQ("localhost");
);
services.AddTransient<PaymentConsumers>();
)
.Build();
await host.RunAsync();
运行结果
使用docker启动MySQL和RabbitMQ,然后再启动三个服务,并在订单服务的Swagger中发起订单创建请求,如下图所示:
最终执行结果如下图所示:
打开RabbitMQ后台,可以看见CAP为每个服务创建了一个唯一队列接收消息,并通过创建的名为cap.default.router
的Exchange根据事件名称作为RoutingKey
进行消息路由。
其中通过dotnetcore/CAP发布的消息结构如下图所示,该图是订单服务发布的扣减库存的消息。
打开MySQL,可以发现dotnetcore/CAP 根据配置的连接字符串,分别为各个服务创建了cap.published
和cap.received
消息表,如下图所示:
小结
通过以上示例,可以发现dotnetcore/CAP无疑是一个出色的事件总线,简单易用且能确保事件的有效送达。同时基于dotnetcore/CAP的本地消息表模式和补偿模式,也可以有效的实现分布式事务。但相较而言,补偿仅限于直接上下游服务之间,不能链式反向补偿,控制逻辑比较分散,属于协同式事务,各个服务需要订阅自己关注的事件并实现,适用于小中型项目,对于大型项目而言尤其需要注意事件的流转,以避免陷入事件漩涡。
以上是关于聊聊分布式事务一致性与本地消息表的主要内容,如果未能解决你的问题,请参考以下文章
分布式技术专题「架构实践于案例分析」总结和盘点目前常用分布式事务特别及问题分析(中)
分布式事务解决方案 | Seata | 本地消息表 | 事务消息 | 最大努力通知 | 消息丢失重复消费堆积 有序| 缓存数据库一致性