ABP工作单元

Posted zzqvq

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ABP工作单元相关的知识,希望对你有一定的参考价值。

简介

Unit of work:维护受业务事务影响的对象列表,并协调变化的写入和并发问题的解决。即管理对象的CRUD操作,以及相应的事务与并发问题等。Unit of Work是用来解决领域模型存储和变更工作,而这些数据层业务并不属于领域模型本身具有的
我们知道在ABP中应用服务层,仓储。默认是启用工作单元模式的
若我们关闭了全局的工作单元,则必须以特性的方式在 class,method interface上加上[Unit of work]
然后ABP会通过Castle 的动态代理(Dynamic Proxy)拦截,Unit of work Attribute,进行动态注入,实现了 UnitOfworkManager对方法的管理(通过事务)
其流程如下
Abp初始化=>注册uow相关组件=>监听ioc注册事件=>是否有unitofwork特性.=>有的话注册拦截器=>在拦截器中通过using包裹原有方法代码,并执行=>最后看是否调用了Complete方法=>是的话Savechanges

启动流程

private AbpBootstrapper([NotNull] Type startupModule, [CanBeNull] Action<AbpBootstrapperOptions> optionsAction = null)
{
    Check.NotNull(startupModule, nameof(startupModule));

    var options = new AbpBootstrapperOptions();
    optionsAction?.Invoke(options);

    if (!typeof(AbpModule).GetTypeInfo().IsAssignableFrom(startupModule))
    {
        throw new ArgumentException($"{nameof(startupModule)} should be derived from {nameof(AbpModule)}.");
    }

    StartupModule = startupModule;

    IocManager = options.IocManager;
    PlugInSources = options.PlugInSources;

    _logger = NullLogger.Instance;

    if (!options.DisableAllInterceptors)
    {
        AddInterceptorRegistrars();
    }
}

在AddAbp创建abpBootsstrapper时,会对执行这个ctor函数,可以看到最后一行有个AddInterceptorRegistrars
这里就是注册所有的拦截器

private void AddInterceptorRegistrars()
{
    ValidationInterceptorRegistrar.Initialize(IocManager);
    AuditingInterceptorRegistrar.Initialize(IocManager);
    EntityHistoryInterceptorRegistrar.Initialize(IocManager);
    UnitOfWorkRegistrar.Initialize(IocManager);
    AuthorizationInterceptorRegistrar.Initialize(IocManager);
}

其中UnitOfWorkRegistrar.Initialize(IocManager);就是注册工作单元拦截器

internal static class UnitOfWorkRegistrar
{
    /// <summary>
    /// Initializes the registerer.
    /// </summary>
    /// <param name="iocManager">IOC manager</param>
    public static void Initialize(IIocManager iocManager)
    {
        //  添加组件注册事件
        iocManager.IocContainer.Kernel.ComponentRegistered += (key, handler) =>
        {
            var implementationType = handler.ComponentModel.Implementation.GetTypeInfo();
            // 根据unitofwork特性注册
            HandleTypesWithUnitOfWorkAttribute(implementationType, handler);
            //  按照约定注册
            HandleConventionalUnitOfWorkTypes(iocManager, implementationType, handler);
        };
    }

    private static void HandleTypesWithUnitOfWorkAttribute(TypeInfo implementationType, IHandler handler)
    {
        //  IsUnitOfWorkType:如果给定类型实现有unitofwork返回true
        //  AnyMethodHasUnitOfWork:给定类型的成员有unitofwork返回tue
        //  这里只做了一件事 如果当前类型有Unitofwork特性。则会注册拦截器
        if (IsUnitOfWorkType(implementationType) || AnyMethodHasUnitOfWork(implementationType))
        {
            handler.ComponentModel.Interceptors.Add(new InterceptorReference(typeof(UnitOfWorkInterceptor)));
        }
    }

    private static void HandleConventionalUnitOfWorkTypes(IIocManager iocManager, TypeInfo implementationType, IHandler handler)
    {
        //  IUnitOfWorkDefaultOptions:用于设置/获取工作单元的默认选项 (范围超时隔离等级等)
        //  这里是判断ioc容器中有没有注册   IUnitOfWorkDefaultOptions   防止后面获取不到出异常
        if (!iocManager.IsRegistered<IUnitOfWorkDefaultOptions>())
        {
            return;
        }
        //  从ioc容器中获取 IUnitOfWorkDefaultOptions
        var uowOptions = iocManager.Resolve<IUnitOfWorkDefaultOptions>();
        //  当前类型是否是 约定的类型,是的话注册拦截器
        //  IRepository,IApplicationService实现了这两个的会注册拦截器
        if (uowOptions.IsConventionalUowClass(implementationType.AsType()))
        {
            handler.ComponentModel.Interceptors.Add(new InterceptorReference(typeof(UnitOfWorkInterceptor)));
        }
    }

    private static bool IsUnitOfWorkType(TypeInfo implementationType)
    {
        return UnitOfWorkHelper.HasUnitOfWorkAttribute(implementationType);
    }

    private static bool AnyMethodHasUnitOfWork(TypeInfo implementationType)
    {
        return implementationType
            .GetMethods(BindingFlags.Instance | BindingFlags.Public | BindingFlags.NonPublic)
            .Any(UnitOfWorkHelper.HasUnitOfWorkAttribute);
    }
}

实现原理

工作单元拦截器

AbpBootstraper创建之后会执行工作单元拦截器的注册. 下面看看注册的拦截器长什么样子的

internal class UnitOfWorkInterceptor : IInterceptor
{
    private readonly IUnitOfWorkManager _unitOfWorkManager;
    private readonly IUnitOfWorkDefaultOptions _unitOfWorkOptions;

    public UnitOfWorkInterceptor(IUnitOfWorkManager unitOfWorkManager, IUnitOfWorkDefaultOptions unitOfWorkOptions)
    {
        _unitOfWorkManager = unitOfWorkManager;
        _unitOfWorkOptions = unitOfWorkOptions;
    }

    /// <summary>
    /// 拦截方法
    /// </summary>
    public void Intercept(IInvocation invocation)
    {
        MethodInfo method;
        try
        {
            method = invocation.MethodInvocationTarget;
        }
        catch
        {
            method = invocation.GetConcreteMethod();
        }
        //  工作单元特性
        var unitOfWorkAttr = _unitOfWorkOptions.GetUnitOfWorkAttributeOrNull(method);
        //  如果没有工作单元这个特性,直接执行原方法里的代码
        if (unitOfWorkAttr == null || unitOfWorkAttr.IsDisabled)
        {
            invocation.Proceed();
            return;
        }

        // 这里分异步和同步执行的,
        PerformUow(invocation, unitOfWorkAttr.CreateOptions());
    }

    private void PerformUow(IInvocation invocation, UnitOfWorkOptions options)
    {
        //  异步
        if (invocation.Method.IsAsync())
        {
            PerformAsyncUow(invocation, options);
        }
        //  同步
        else
        {
            PerformSyncUow(invocation, options);
        }
    }

    private void PerformSyncUow(IInvocation invocation, UnitOfWorkOptions options)
    {   //  开启一个工作单元
        using (var uow = _unitOfWorkManager.Begin(options))
        {
            //  执行原方法内部代码 
            //  如果没有出错,就会执行Complete方法
            invocation.Proceed();
            uow.Complete();
        }
    }

    private void PerformAsyncUow(IInvocation invocation, UnitOfWorkOptions options)
    {   //  开启一个工作单元
        var uow = _unitOfWorkManager.Begin(options);
        //  执行原方法内部代码,如果出错了则,释放当前工作单元
        try
        {
            invocation.Proceed();
        }
        catch
        {
            uow.Dispose();
            throw;
        }
        //  如果异步方法没有返回值.
        if (invocation.Method.ReturnType == typeof(Task))
        {
            invocation.ReturnValue = InternalAsyncHelper.AwaitTaskWithPostActionAndFinally(
                (Task) invocation.ReturnValue,//    Task actualReturnValue,
                async () => await uow.CompleteAsync(),//    Func<Task> postAction
                exception => uow.Dispose()//    Action<Exception> finalAction
            );
        }
        // 如果异步方法返回值是Task<TResult>
        else 
        {
            invocation.ReturnValue = InternalAsyncHelper.CallAwaitTaskWithPostActionAndFinallyAndGetResult(
                invocation.Method.ReturnType.GenericTypeArguments[0],// Type taskReturnType
                invocation.ReturnValue,// object actualReturnValue
                async () => await uow.CompleteAsync(),// Func<Task> action
                exception => uow.Dispose()//Action<Exception> finalAction
            );
        }
    }
}
}

下面先看看同步的uow方法:同步的没什么好说的,就是开启一个工作单元,或者可以说开启了一个事务.
执行内部的方法,执行没有错误的情况下,会调用Complete()方法. Complete方法等下在讲.
下面看看实际情况工作单元下类之间的方法是怎么调用的.

public class InvoiceService
{
    private readonly InvoiceCoreService _invoiceCoreService;
    public InvoiceService(InvoiceCoreService invoiceCoreService)
    {
        _invoiceCoreService=InvoiceCoreService;
    }

    public bool Invoice(InvoiceDto input)
    {
        return  _invoiceCoreService.InvoiceElectronic(input);
    }
}
public class InvoiceCoreService:ITransientDependency
{
    public readonly ThridPartyService _thridPartyService;
    public InvoiceCoreService(ThridPartyService thridPartyService)
    {
        _thridPartyService=thridPartyService;
    }
    [UnitOfWork]
    public bool InvoiceElectronic(InvoiceDto input)
    {
       var res= _thridPartyService.Run(input);
       Console.WriteLine("调用ThirdParty完成");
       return res;
    }
}
public class ThridPartyService:ITransientDependency
{
    [UnitOfWork]
    public bool Run(InvoiceDto input)
    {
        Console.WriteLine("调百旺开电子票");
        return true;
    }
}

这是我工作中的例子,首先要做的就是模拟开电子票
InvoiceService会调用InvoiceCoreService,InvoiceCoreService会调用ThirdPartyService.
那么执行的过程是怎么样的呢?

public bool Invoice(InvoiceDto Input)
{
    using(var uow=_unitOfWrokManager.Begin(options))
    {
        bool res=false;
        using(var uow2=_unitOfWrokManager.Begin(options))
        {
           res=  ThridPartyService.Run();
           uow2.Complete();
           return res;
        }
        // invoiceCoreService
        Console.WriteLine("调用ThirdParty完成");
        Uow.Complete();
    }
}

两个工作单元是用using嵌套的 ,只要代码执行失败了,就会导致Complete不会调用.
而 Complete() 方法没有执行,则会导致 uow 对象被释放的时候
uow.Dispose() 内部检测到 Complete() 没有被调用,Abp 框架也会自己抛出异常
下面看下异步方法
首先是没有返回值的情况也就是返回值是Task

public static async Task AwaitTaskWithPostActionAndFinally(
Task actualReturnValue, 
Func<Task> postAction,
Action<Exception> finalAction)
{
    Exception exception = null;

    try
    {
        // 执行原方法
        await actualReturnValue;
        await postAction();// async () => await uow.CompleteAsync() 提交更改
    }
    catch (Exception ex)
    {
        exception = ex;
        throw;
    }
    finally
    {
        //   exception => uow.Dispose() 最后都会执行uow的释放方法
        finalAction(exception);
    }
}

下面看看有返回值的情况

public static object CallAwaitTaskWithPostActionAndFinallyAndGetResult(Type taskReturnType, object actualReturnValue, 
Func<Task> action, Action<Exception> finalAction)
{   //  反射获取到 AwaitTaskWithPostActionAndFinallyAndGetResult 并调用
    return typeof (InternalAsyncHelper)
        .GetMethod("AwaitTaskWithPostActionAndFinallyAndGetResult", BindingFlags.Public | BindingFlags.Static)
        .MakeGenericMethod(taskReturnType)
        .Invoke(null, new object[] { actualReturnValue, action, finalAction });
}
public static async Task<T> AwaitTaskWithPostActionAndFinallyAndGetResult<T>(Task<T> actualReturnValue, Func<Task> postAction, Action<Exception> finalAction)
{
    Exception exception = null;

    try
    {
        //  执行原方法内部代码 并获取返回值
        var result = await actualReturnValue;
        //  执行CompleteAsync方法
        await postAction();
        return result;
    }
    catch (Exception ex)
    {
        exception = ex;
        throw;
    }
    finally
    {
        //  Dispose方法
        finalAction(exception);
    }
}

工作单元拦截器就到此为止了

IUnitOfWorkManager

拦截器中有_unitOfWorkManager.Begin,之前只是说了 是开启了一个工作单元,那么这个是什么呢,我们来看看吧.

 internal class UnitOfWorkManager : IUnitOfWorkManager, ITransientDependency
    {
        private readonly IIocResolver _iocResolver;
        private readonly ICurrentUnitOfWorkProvider _currentUnitOfWorkProvider;
        private readonly IUnitOfWorkDefaultOptions _defaultOptions;

        public IActiveUnitOfWork Current
        {
            get { return _currentUnitOfWorkProvider.Current; }
        }

        public UnitOfWorkManager(
            IIocResolver iocResolver,
            ICurrentUnitOfWorkProvider currentUnitOfWorkProvider,
            IUnitOfWorkDefaultOptions defaultOptions)
        {
            _iocResolver = iocResolver;
            _currentUnitOfWorkProvider = currentUnitOfWorkProvider;
            _defaultOptions = defaultOptions;
        }

        public IUnitOfWorkCompleteHandle Begin()
        {
            return Begin(new UnitOfWorkOptions());
        }

        public IUnitOfWorkCompleteHandle Begin(TransactionScopeOption scope)
        {
            return Begin(new UnitOfWorkOptions { Scope = scope });
        }

        public IUnitOfWorkCompleteHandle Begin(UnitOfWorkOptions options)
        {
            //  没有设置参数 填充默认参数
            options.FillDefaultsForNonProvidedOptions(_defaultOptions);
            //  获取当前工作单元
            var outerUow = _currentUnitOfWorkProvider.Current;
            //  当前已有工作单元,直接创建一个在它内部的工作单元
            if (options.Scope == TransactionScopeOption.Required && outerUow != null)
            {
                return new InnerUnitOfWorkCompleteHandle();
            }
            //  如果没有外部的工作单元,从ioc容器中直接获取一个
            var uow = _iocResolver.Resolve<IUnitOfWork>();
            //  绑定外部工作单元的一些事件.
            //  调用Complete方法后 设置当前工作单元为null
            uow.Completed += (sender, args) =>
            {
                _currentUnitOfWorkProvider.Current = null;
            };
            //  失败的时候,设置当前工作单元为null
            uow.Failed += (sender, args) =>
            {
                _currentUnitOfWorkProvider.Current = null;
            };
            //  从ioc容器释放
            uow.Disposed += (sender, args) =>
            {
                _iocResolver.Release(uow);
            };

            //  设置过滤器
            if (outerUow != null)
            {
                options.FillOuterUowFiltersForNonProvidedOptions(outerUow.Filters.ToList());
            }
            //  调用UnitOfWorkBase的Begin方法
            uow.Begin(options);

            //  设置租户id
            if (outerUow != null)
            {
                uow.SetTenantId(outerUow.GetTenantId(), false);
            }
            //  绑定外部工作单元为刚才创建的工作单元
            _currentUnitOfWorkProvider.Current = uow;

            return uow;
        }
    }
}

可以看到返回值是IUnitOfWorkCompleteHandle

public interface IUnitOfWorkCompleteHandle : IDisposable
{
    void Complete();
    Task CompleteAsync();
}

他有个默认实现InnerUnitOfWorkCompleteHandle

internal class InnerUnitOfWorkCompleteHandle : IUnitOfWorkCompleteHandle
{
    public const string DidNotCallCompleteMethodExceptionMessage = "Did not call Complete method of a unit of work.";

    private volatile bool _isCompleteCalled;
    private volatile bool _isDisposed;

    public void Complete()
    {
        _isCompleteCalled = true;
    }

    public Task CompleteAsync()
    {
        _isCompleteCalled = true;
        return Task.FromResult(0);
    }

    public void Dispose()
    {
        if (_isDisposed)
        {
            return;
        }

        _isDisposed = true;

        if (!_isCompleteCalled)
        {
            if (HasException())
            {
                return;
            }

            throw new AbpException(DidNotCallCompleteMethodExceptionMessage);
        }
    }
    
    private static bool HasException()
    {
        try
        {
            return Marshal.GetExceptionCode() != 0;
        }
        catch (Exception)
        {
            return false;
        }
    }
}
}

就是调用Complete的时候把_isCompleteCalled设置为true,在Dispose的时候判断,如果没有调用Complete那么会抛出异常
那么这里仅仅是做个标记的作用,savachanges 并没有在这里调用,那么数据库的保存是什么时候执行的呢?
其实之前在UnitOfManager内部的工作单元的类型就是InnerUnitOfWorkCompleteHandle,那么外部的工作单元是从Ioc容器中获取的.IUnitOfWork
它有几个默认实现,其中就有EfCoreUnitOfWork。里面就有savechanges方法. 它继承自UnitOfWorkBase。 UnitOfWorkBase继承自IUnitOfWork

public interface IUnitOfWork : IActiveUnitOfWork, IUnitOfWorkCompleteHandle
{
    /// <summary>
    /// 工作单元唯一Id,
    /// </summary>
    string Id { get; }

    /// <summary>
    /// 外部工作单元的引用对象
    /// </summary>
    IUnitOfWork Outer { get; set; }
}
public abstract class UnitOfWorkBase:IUnitOfWork
{
    //  其他代码 略.
    // 由具体的子类实现
    protected abstract void CompleteUow();
    
    public void Complete()
    {
        // 确保Complete方法没有被调用过
        PreventMultipleComplete();
        try
        {
            CompleteUow();
            _succeed = true;
            OnCompleted();
        }
        catch (Exception ex)
        {
            _exception = ex;
            throw;
        }
    }
}
public class EfCoreUnitOfWork : UnitOfWorkBase, ITransientDependency
{
    // 遍历所有有效的dbcontext,依次调用SaveChanges方法
    public override void SaveChanges()
    {
        foreach (var dbContext in GetAllActiveDbContexts())
        {
            SaveChangesInDbContext(dbContext);
        }
    }
}

以上是关于ABP工作单元的主要内容,如果未能解决你的问题,请参考以下文章

ABP官方文档翻译 3.6 工作单元

ABP工作单元

[Abp 源码分析]工作单元的实现

ABP领域层——工作单元

手工搭建基于ABP的框架 - 工作单元以及事务管理

解析ABP框架中的事务处理和工作单元,ABP事务处理