工作单元 — Unit Of Work

Posted sylone

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了工作单元 — Unit Of Work相关的知识,希望对你有一定的参考价值。

        在进行数据库添加、修改、删除时,为了保证事务的一致性,即操作要么全部成功,要么全部失败。例如银行A、B两个账户的转账业务。一方失败都会导致事务的不完整性,从而事务回滚。而工作单元模式可以跟踪事务,在操作完成时对事务进行统一提交。

具体实践

首先,讲解下设计思想:领域层通过相应的库实现泛型仓储接口来持久化聚合类,之后在抽象库中将对泛型仓储接口提供基础实现,并将对应的实体转化为SQl语句。这样领域层就不需要操作Sql语句即可完成CRUD操作,同时使用工作单元对事务进行统一提交。

1)定义仓储接口,包含基本的CRUD操作及其重载不同的查询

public interface IRepository<T>
    {
        /// <summary>
        /// 插入对象
        /// </summary>
        /// <param name="entity"></param>
        int Insert(T entity);

        /// <summary>
        /// 更新对象
        /// </summary>
        /// <param name="entity"></param>
        /// <param name="predicate"></param>
        int Update(T entity, Expression<Func<T, bool>> express);

        /// <summary>
        /// 删除对象
        /// </summary>
        /// <param name="predicate"></param>
        int Delete(Expression<Func<T, bool>> express = null);

        /// <summary>
        /// 查询对象集合
        /// </summary>
        /// <param name="predicate"></param>
        /// <returns></returns>
        List<T> QueryAll(Expression<Func<T, bool>> express = null);

        /// <summary>
        /// 查询对象集合
        /// </summary>
        /// <param name="index"></param>
        /// <param name="pagesize"></param>
        /// <param name="order"></param>
        /// <param name="asc"></param>
        /// <param name="express"></param>
        /// <returns></returns>
        List<T> QueryAll(int index,int pagesize,List<PropertySortCondition> orderFields, Expression<Func<T, bool>> express = null);

        /// <summary>
        /// 查询对象集合
        /// </summary>
        /// <param name="type"></param>
        /// <param name="predicate"></param>
        /// <returns></returns>
        List<object> QueryAll(Type type, Expression<Func<T, bool>> express = null);

        /// <summary>
        /// 查询对象
        /// </summary>
        /// <param name="predicate"></param>
        /// <returns></returns>
        T Query(Expression<Func<T, bool>> express);

        /// <summary>
        /// 查询数量
        /// </summary>
        /// <param name="predicate"></param>
        /// <returns></returns>
        object QueryCount(Expression<Func<T, bool>> express = null);
    }

其次,对仓储接口提供基本实现,这里由于使用了Lambda表达式,所以就需要进行表达式树的解析(这里我希望园友能自己去研究)。

public abstract class BaseRepository<T> : IRepository<T>
        where T:class,new()
    {
        private IUnitOfWork unitOfWork;

        private IUnitOfWorkContext context;

        public BaseRepository(IUnitOfWork unitOfWork, IUnitOfWorkContext context)
        {
            this.unitOfWork = unitOfWork;
            this.context = context;
        }

        Lazy<ConditionBuilder> builder = new Lazy<ConditionBuilder>();

        public string tableName {
            get
            {
                TableNameAttribute attr= (TableNameAttribute)typeof(T).GetCustomAttribute(typeof(TableNameAttribute));
                return attr.Name; 
            }
        }

        /// <summary>
        /// 插入对象
        /// </summary>
        /// <param name="entity"></param>
        public virtual int Insert(T entity)
        {
            Func<PropertyInfo[], string, IDictionary<string, object>, int> excute = (propertys, condition, parameters) =>
            {
                List<string> names = new List<string>();
                foreach (PropertyInfo property in propertys)
                {
                    if (property.GetCustomAttribute(typeof(IncrementAttribute)) == null)
                    {
                        string attrName = property.Name;
                        object value = property.GetValue(entity);
                        names.Add(string.Format("@{0}", attrName));
                        parameters.Add(attrName, value);
                    }
                }
                string sql = "Insert into {0} values({1})";
                string combineSql = string.Format(sql, tableName, string.Join(",", names), builder.Value.Condition);
                return unitOfWork.Command(combineSql, parameters);
            };
            return CreateExcute<int>(null, excute);
        }

        /// <summary>
        /// 修改对象
        /// </summary>
        /// <param name="entity"></param>
        /// <param name="express"></param>
        public virtual int Update(T entity, Expression<Func<T, bool>> express)
        {

            Func<PropertyInfo[], string, IDictionary<string, object>, int> excute = (propertys, condition, parameters) =>
            {
                List<string> names = new List<string>();
                foreach (PropertyInfo property in propertys)
                {
                    if (property.GetCustomAttribute(typeof(IncrementAttribute)) == null)
                    {
                        string attrName = property.Name;
                        object value = property.GetValue(entity);
                        names.Add(string.Format("{0}[email protected]{1}", attrName, attrName));
                        parameters.Add(attrName, value);
                    }
                }
                string sql = "update {0} set {1} where {2}";
                string combineSql = string.Format(sql, tableName, string.Join(",", names), builder.Value.Condition);
                return unitOfWork.Command(combineSql, parameters);
            };
            return CreateExcute<int>(express, excute);
        }
        /// <summary>
        /// 删除对象
        /// </summary>
        /// <param name="express"></param>
        public virtual int Delete(Expression<Func<T, bool>> express = null)
        {
            Func<PropertyInfo[], string, IDictionary<string, object>, int> excute = (propertys, condition, parameters) =>
            {
                string sql = "delete from {0} {1}";
                string combineSql = string.Format(sql, tableName, condition);
                return unitOfWork.Command(combineSql, parameters);
            };
            return CreateExcute<int>(express, excute);
        }

        /// <summary>
        /// 查询对象集合
        /// </summary>
        /// <param name="express"></param>
        /// <returns></returns>
        public virtual List<T> QueryAll(Expression<Func<T, bool>> express = null)
        {
            Func<PropertyInfo[], string, IDictionary<string, object>, List<T>> excute = (propertys, condition, parameters) =>
            {
                string sql = "select {0} from {1} {2}";
                string combineSql = string.Format(sql, string.Join(",", propertys.Select(x => x.Name)), tableName, condition);
                return context.ReadValues<T>(combineSql, parameters);
            };
            return CreateExcute<List<T>>(express, excute);
        }

        /// <summary>
        /// 查询对象集合(分页)
        /// </summary>
        /// <param name="index"></param>
        /// <param name="pagesize"></param>
        /// <param name="order"></param>
        /// <param name="asc"></param>
        /// <param name="express"></param>
        /// <returns></returns>
        public virtual List<T> QueryAll(int index,int pagesize,List<PropertySortCondition> orderFields,Expression<Func<T, bool>> express = null)
        {
            Func<PropertyInfo[], string, IDictionary<string, object>, List<T>> excute = (propertys, condition, parameters) =>
            {
                if (orderFields == null) { throw new Exception("排序字段不能为空"); }
                string sql = "select * from (select {0} , ROW_NUMBER() over(order by {1}) as rownum from {2} {3}) as t where t.rownum >= {4} and t.rownum < {5}";
                string combineSql = string.Format(sql, string.Join(",", propertys.Select(x => x.Name)),string.Join(",", orderFields), tableName, condition, (index - 1) * pagesize + 1, index * pagesize);
                return context.ReadValues<T>(combineSql, parameters);
            };
            return CreateExcute<List<T>>(express, excute);
        }

        /// <summary>
        /// 查询对象集合
        /// </summary>
        /// <param name="type"></param>
        /// <param name="express"></param>
        /// <returns></returns>
        public virtual List<object> QueryAll(Type type, Expression<Func<T, bool>> express = null)
        {
            Func<PropertyInfo[], string, IDictionary<string, object>, List<object>> excute = (propertys, condition, parameters) =>
            {
                string sql = "select {0} from {1} {2}";
                string combineSql = string.Format(sql, string.Join(",", propertys.Select(x => x.Name)), tableName, condition);
                return context.ReadValues(combineSql, type, parameters);
            };
            return CreateExcute<List<object>>(express, excute);
        }

        /// <summary>
        /// 查询对象
        /// </summary>
        /// <param name="express"></param>
        /// <returns></returns>
        public virtual T Query(Expression<Func<T, bool>> express)
        {
            Func<PropertyInfo[], string, IDictionary<string, object>, T> excute = (propertys, condition, parameters) =>
            {
                string sql = "select {0} from {1} {2}";
                string combineSql = string.Format(sql, string.Join(",", propertys.Select(x => x.Name)), tableName, condition);
                return context.ExecuteReader<T>(combineSql, parameters);
            };
            return CreateExcute<T>(express, excute);
        }

        /// <summary>
        /// 查询数量
        /// </summary>
        /// <param name="express"></param>
        /// <returns></returns>
        public virtual object QueryCount(Expression<Func<T, bool>> express = null)
        {
            Func<PropertyInfo[], string, IDictionary<string, object>, object> excute = (propertys, condition, parameters) =>
            {
                string sql = "select * from {0} {1}";
                string combineSql = string.Format(sql, string.Join(",", propertys.Select(x => x.Name)), tableName, condition);
                return context.ExecuteScalar(combineSql, parameters);
            };

            return CreateExcute<object>(express, excute);
        }

        private TValue CreateExcute<TValue>(Expression<Func<T, bool>> express, Func<PropertyInfo[], string, IDictionary<string, object>, TValue> excute)
        {
            Dictionary<string, object> parameters = new Dictionary<string, object>();
            PropertyInfo[] propertys = typeof(T).GetProperties();
            string condition = "";
            if (express != null)
            {
                builder.Value.Build(express, tableName);
                condition = string.Format("where {0} ", builder.Value.Condition);
                for (int i = 0; i < builder.Value.Arguments.Length; i++)
                {
                    parameters.Add(string.Format("Param{0}", i), builder.Value.Arguments[i]);
                }
            }
            return excute(propertys, condition, parameters);
        }
    }

接下来,定义工作单元,所有的添加、删除、修改操作都会被储存到工作单元中。

public interface IUnitOfWork
    {
        /// <summary>
        /// 命令
        /// </summary>
        /// <param name="commandText"></param>
        /// <param name="parameters"></param>
        /// <returns></returns>
        int Command(string commandText, IDictionary<string, object> parameters);

        /// <summary>
        /// 事务的提交状态
        /// </summary>
        bool IsCommited { get; set; }

        /// <summary>
        /// 提交事务
        /// </summary>
        /// <returns></returns>
        void Commit();

        /// <summary>
        /// 回滚事务
        /// </summary>
        void RollBack();
    }

接下来是对工作单元的实现,其内部维护了一个命令集合,存储Sql语句。

public class UnitOfWork:IUnitOfWork
    {
        /// <summary>
        /// 注入对象
        /// </summary>
        private IUnitOfWorkContext context;

        /// <summary>
        /// 维护一个Sql语句的命令列表
        /// </summary>
        private List<CommandObject> commands;

        public UnitOfWork(IUnitOfWorkContext context)
        {
            commands = new List<CommandObject>();
            this.context = context;
        }

        /// <summary>
        /// 增、删、改命令 
        /// </summary>
        /// <param name="commandText"></param>
        /// <param name="parameters"></param>
        /// <returns></returns>
        public int Command(string commandText, IDictionary<string, object> parameters)
        {
            IsCommited = false;
            commands.Add(new CommandObject(commandText, parameters));
            return 1;
        }

        /// <summary>
        /// 提交状态
        /// </summary>
        public bool IsCommited{ get; set; }

        /// <summary>
        /// 提交方法
        /// </summary>
        /// <returns></returns>
        public void Commit()
        {
            if (IsCommited) { return ; }
            using (TransactionScope scope = new TransactionScope())
            {
                foreach (var command in commands)
                {
                    context.ExecuteNonQuery(command.command, command.parameters);
                }
                scope.Complete();
                IsCommited = true;
            }
        }

        /// <summary>
        /// 事务回滚
        /// </summary>
        public void RollBack()
        {
            IsCommited = false;
        }
    }

最后定义工作单元对事务提交处理的上下文及其实现,同仓储模式中的代码。

public interface IUnitOfWorkContext
    {

        /// <summary>
        /// 注册新对象到上下文
        /// </summary>
        /// <param name="commandText"></param>
        /// <param name="parameters"></param>
        int ExecuteNonQuery(string commandText, IDictionary<string, object> parameters = null);

        /// <summary>
        ///  查询对象集合
        /// </summary>
        /// <typeparam name="T"></typeparam>
        /// <param name="commandText"></param>
        /// <param name="parameters"></param>
        /// <param name="load">自定义处理</param>
        /// <returns></returns>
        List<T> ReadValues<T>(string commandText, IDictionary<string, object> parameters = null, Func<IDataReader, T> load = null) where T : class, new();

        /// <summary>
        /// 查询对象集合
        /// </summary>
        /// <param name="commandText"></param>
        /// <param name="type"></param>
        /// <param name="parameters"></param>
        /// <param name="setItem"></param>
        /// <returns></returns>
        List<object> ReadValues(string commandText, Type type, IDictionary<string, object> parameters = null, Action<dynamic> setItem = null);

        /// <summary>
        /// 查询对象
        /// </summary>
        /// <typeparam name="TEntity"></typeparam>
        /// <param name="commandText"></param>
        /// <param name="parameters"></param>
        /// <param name="excute"></param>
        /// <returns></returns>
        T ExecuteReader<T>(string commandText, IDictionary<string, object> parameters = null, Func<IDataReader, T> load = null) where T : class,new();

        /// <summary>
        /// 查询数量
        /// </summary>
        /// <param name="commandText"></param>
        /// <param name="parameters"></param>
        /// <returns></returns>
        object ExecuteScalar(string commandText, IDictionary<string, object> parameters = null);
    }

最后实现。

public abstract class UnitOfWorkContext : IUnitOfWorkContext,IDisposable
    {
        /// <summary>
        /// 数据库连接字符串标识
        /// </summary>
        public abstract string Key { get; }

        private SqlConnection connection;

        private SqlConnection Connection 
        {
            get 
            {
                if (connection == null)
                {
                    ConnectionStringSettings settings = ConfigurationManager.ConnectionStrings[Key];
                    connection = new SqlConnection(settings.ConnectionString);
                }
                return connection;
            }    
        }

        /// <summary>
        /// 注册新对象到事务
        /// </summary>
        /// <typeparam name="TEntity"></typeparam>
        /// <param name="entity"></param>
        public int ExecuteNonQuery(string commandText, IDictionary<string, object> parameters = null)
        {
            Func<SqlCommand, int> excute = (commend) =>
            {
                return commend.ExecuteNonQuery();
            };
            return CreateDbCommondAndExcute<int>(commandText, parameters, excute);
        }

        /// <summary>
        /// 查询对象集合
        /// </summary>
        /// <typeparam name="T"></typeparam>
        /// <param name="commandText"></param>
        /// <param name="parameters"></param>
        /// <param name="load">自定义处理</param>
        /// <returns>泛型实体集合</returns>

        public List<T> ReadValues<T>(string commandText, IDictionary<string, object> parameters = null, Func<IDataReader, T> load = null) where T : class,new()
        {
            Func<SqlCommand, List<T>> excute = (dbCommand) =>
            {
                List<T> result = new List<T>();
                using (IDataReader reader = dbCommand.ExecuteReader())
                {
                    while (reader.Read())
                    {
                        if (load == null)
                        {
                            load = (s) => { return s.GetReaderData<T>(); };
                        }
                        var item = load(reader);
                        result.Add(item);
                    }
                    return result;
                }
            };
            return CreateDbCommondAndExcute(commandText, parameters, excute);
        }

        /// <summary>
        /// 查询对象集合
        /// </summary>
        /// <param name="commandText"></param>
        /// <param name="parameters"></param>
        /// <param name="setItem"></param>
        /// <returns></returns>
        public List<object> ReadValues(string commandText, Type type, IDictionary<string, object> parameters = null, Action<dynamic> setItem = null)
        {
            Func<SqlCommand, List<object>> excute = (dbCommand) =>
            {
                var result = new List<object>();

                using (IDataReader dataReader = dbCommand.ExecuteReader())
                {
                    while (dataReader.Read())
                    {
                        var item = dataReader.GetReaderData(type);
                        if (setItem != null)
                        {
                            setItem(item);
                        }
                        result.Add(item);
                    }
                }
                return result;
            };
            return CreateDbCommondAndExcute<List<object>>(commandText, parameters,
                excute);
        }

        /// <summary>
        /// 查询对象
        /// </summary>
        /// <typeparam name="TEntity"></typeparam>
        /// <param name="commandText"></param>
        /// <param name="parameters"></param>
        /// <param name="excute"></param>
        /// <returns></returns>
        public T ExecuteReader<T>(string commandText, IDictionary<string, object> parameters = null, Func<IDataReader, T> load = null) where T : class,new()
        {
            Func<SqlCommand, T> excute = (dbCommand) =>
            {
                var result = default(T);
                using (IDataReader reader = dbCommand.ExecuteReader())
                {
                    while (reader.Read())
                    {
                        if (load == null)
                        {
                            load = (s) => { return s.GetReaderData<T>(); };
                        }
                        result = load(reader);
                    }
                    return result;
                }
            };
            return CreateDbCommondAndExcute<T>(commandText, parameters, excute);
        }

        /// <summary>
        /// 查询数量
        /// </summary>
        /// <param name="commandText"></param>
        /// <param name="parameters"></param>
        /// <returns></returns>
        public object ExecuteScalar(string commandText, IDictionary<string, object> parameters = null)
        {
            Func<SqlCommand, object> excute = (dbCommand) =>
            {
                return dbCommand.ExecuteScalar();
            };
            return CreateDbCommondAndExcute(commandText, parameters, excute);
        }

        /// <summary>
        /// 创建命令并执行
        /// </summary>
        /// <typeparam name="TValue"></typeparam>
        /// <param name="commandText"></param>
        /// <param name="parameters"></param>
        /// <param name="excute"></param>
        /// <returns></returns>
        private TValue CreateDbCommondAndExcute<TValue>(string commandText,
            IDictionary<string, object> parameters, Func<SqlCommand, TValue> excute)
        {
            if (Connection.State == ConnectionState.Closed) { Connection.Open(); };
            using (SqlCommand command = new SqlCommand())
            {
                command.CommandType = CommandType.Text;
                command.CommandText = commandText;;
                command.Connection = Connection;
                command.SetParameters(parameters);
                return excute(command);
            }
        }

        /// <summary>
        /// 关闭连接
        /// </summary>
        public void Dispose()
        {
            if (connection != null)
            {
                Connection.Dispose();//非托管资源
            }
        }
    }

在调用方法时需要注意,一个事务涉及多个聚合时,需要保证传递同一工作单元,并在方法的最后调用Commit() 方法。

public class Services : IService
    {
        private IMemberRespository member;

        private IUnitOfWork unitOfWork;

        public Services(IMemberRespository member, IUnitOfWork unitOfWork)
        {
            this.member = member;
            this.unitOfWork = unitOfWork;
        }

        /// <summary>
        /// 测试用例
        /// </summary>
        public void Demo()
        {

            member.Test();

            unitOfWork.Commit();
        }
    }

 

以上是关于工作单元 — Unit Of Work的主要内容,如果未能解决你的问题,请参考以下文章

基于DDD的.NET开发框架 - ABP工作单元(Unit of Work)

ABAP术语-LUW (Logical Unit of Work)

Unit Of Work的设计

Generic repository pattern and Unit of work with Entity framework

English trip -- Review Unit8 Work 工作

毕设教程python区块链实现 - proof of work工作量证明共识算法