NetCore微服务实现事务一致性masstransit之saga使用
Posted 星仔007
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了NetCore微服务实现事务一致性masstransit之saga使用相关的知识,希望对你有一定的参考价值。
demo如下,一个订单处理的小例子:
首先看看结果很简单:
核心代码如下:
using MassTransit; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using OrderProcessor.Event; using ServiceModel; using ServiceModel.Command; using ServiceModel.DTO; using ServiceModel.Event; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; namespace OrderProcessor.Service public class OrderProcessorStateMachine:MassTransitStateMachine<ProcessingOrderState> private readonly ILogger<OrderProcessorStateMachine> logger; public OrderProcessorStateMachine() this.logger = GlobalServiceProvider.Instance.CreateScope().ServiceProvider.GetService<ILogger<OrderProcessorStateMachine>>(); this.InstanceState(x => x.State); this.State(() => this.Processing); this.ConfigureCorrelationIds(); this.Initially(this.SetOrderSummitedHandler()); this.During(Processing, this.SetStockReservedHandler(), SetPaymentProcessedHandler(), SetOrderShippedHandler()); SetCompletedWhenFinalized(); private void ConfigureCorrelationIds() this.Event(() => this.OrderSubmitted, x => x.CorrelateById(c => c.Message.CorrelationId).SelectId(c => c.Message.CorrelationId)); this.Event(() => this.StockReserved, x => x.CorrelateById(c => c.Message.CorrelationId)); this.Event(() => this.PaymentProcessed, x => x.CorrelateById(c => c.Message.CorrelationId)); this.Event(() => this.OrderShipped, x => x.CorrelateById(c => c.Message.CorrelationId)); private EventActivityBinder<ProcessingOrderState, IOrderSubmitted> SetOrderSummitedHandler() => When(OrderSubmitted).Then(c => this.UpdateSagaState(c.Instance, c.Data.Order)) .Then(c => this.logger.LogInformation($"Order submitted to c.Data.CorrelationId received")) .ThenAsync(c => this.SendCommand<IReserveStock>("rabbitWarehouseQueue", c)) .TransitionTo(Processing); private EventActivityBinder<ProcessingOrderState, IStockReserved> SetStockReservedHandler() => When(StockReserved).Then(c => this.UpdateSagaState(c.Instance, c.Data.Order)) .Then(c => this.logger.LogInformation($"Stock reserved to c.Data.CorrelationId received")) .ThenAsync(c => this.SendCommand<IProcessPayment>("rabbitCashierQueue", c)); private EventActivityBinder<ProcessingOrderState, IPaymentProcessed> SetPaymentProcessedHandler() => When(PaymentProcessed).Then(c => this.UpdateSagaState(c.Instance, c.Data.Order)) .Then(c => this.logger.LogInformation($"Payment processed to c.Data.CorrelationId received")) .ThenAsync(c => this.SendCommand<IShipOrder>("rabbitDispatcherQueue", c)); private EventActivityBinder<ProcessingOrderState, IOrderShipped> SetOrderShippedHandler() => When(OrderShipped).Then(c => this.UpdateSagaState(c.Instance, c.Data.Order); c.Instance.Order.Status = Status.Processed; ) .Publish(c => new OrderProcessed(c.Data.CorrelationId, c.Data.Order)) .Finalize(); private void UpdateSagaState(ProcessingOrderState state, Order order) var currentDate = DateTime.Now; state.Created = currentDate; state.Updated = currentDate; state.Order = order; private async Task SendCommand<TCommand>(string endpointKey, BehaviorContext<ProcessingOrderState, IMessage> context) where TCommand : class, IMessage var sendEndpoint = await context.GetSendEndpoint(new Uri("")); await sendEndpoint.Send<TCommand>(new CorrelationId = context.Data.CorrelationId, Order = context.Data.Order ); public State Processing get; private set; public Event<IOrderSubmitted> OrderSubmitted get; private set; public Event<IOrderShipped> OrderShipped get; set; public Event<IPaymentProcessed> PaymentProcessed get; private set; public Event<IStockReserved> StockReserved get; private set;
using MassTransit; using MassTransit.MongoDbIntegration.Saga; using OrderProcessor; using OrderProcessor.Service; var builder = WebApplication.CreateBuilder(args); // Add services to the container. builder.Services.AddControllers(); builder.Services.AddMassTransit(x => x.UsingRabbitMq((context, cfg) => var connection = "amqp://lx:admin@ip:5672/my_vhost";//不加主机会报错 cfg.Host(connection); cfg.UseDelayedRedelivery(r => r.Intervals(TimeSpan.FromMinutes(5), TimeSpan.FromMinutes(15), TimeSpan.FromMinutes(30))); cfg.UseMessageRetry(r => r.Immediate(5)); cfg.ConfigureEndpoints(context); cfg.ReceiveEndpoint("", ep => ep.StateMachineSaga(new OrderProcessorStateMachine(), MongoDbSagaRepository<ProcessingOrderState>.Create("connecturl","db")); ); ); ); var app = builder.Build(); app.Run();
这是整个订单的几个步骤。
想把代码都贴出来,过程梳理给大家参考,但是时间有限这个点没那么多了,而且我理应要把这个程序跑起来的。明天照常上班,暂不过多研究。
整个demo代码:
exercise/MassTransitDemo/MassTransitSagasDemo at master · liuzhixin405/exercise (github.com)
有兴趣可以还有一个demo:
exercise/MassTransitDemo/SagaTest-master at master · liuzhixin405/exercise (github.com)
masstransit官网:
MassTransit (masstransit-project.com)
不得不说这个东西真的很不错,不过暂时没找到翻译,大概的过了下文档,还有好多不清楚的,英文水平有限。demo都是来自外国大佬贡献的,很遗憾国内有这方面的文章,但是深入一点的都是国外友人的贡献,而且现成的微服务demo写的很好很多,视情况项目可借鉴。
此demo有待后续完善,或大佬帮忙补充后,再完整这个随笔的流程和代码,今天只是起个头。
以上是关于NetCore微服务实现事务一致性masstransit之saga使用的主要内容,如果未能解决你的问题,请参考以下文章