看门外汉如何实现:C#操作 MongoDB基本CURD的事务控制之 第二部分

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了看门外汉如何实现:C#操作 MongoDB基本CURD的事务控制之 第二部分相关的知识,希望对你有一定的参考价值。

 

第二部分 尝试解决BulkWrite(List<WriteModel<T>>)问题

    在上次发表的文章中,得到了一些很好的反馈,真切体会到写博文的好处,有高人指出两大问题,具体可以看看上篇中的评论,下面依然是发表一些个人见解,只做研究,并不保证解决实际问题。

这两大问题终究来说,是发生在BulkWrite(List<WriteModel<T>>)上,针对@ 从来不用 的问题,我试着找出影响的行数据还比对写入操作的数量,如果一致,则提交,如果不一致则回滚。

1.找出影响行数和实际操作数量    BulkWriteResult<T>有很多属性,我用的是ProcessedRequests.Count这个应该是反应的像mssql的影响的行数。而实际操作数量就是writer的个数。

2.备份元数   每添加一个writer之前在内存中或备份数据库中保存一份元数据,我这里是保存在内存中的,声明了几个不同类型的集合

            private List<TAggregate> beforeChange = new List<TAggregate>();//记录更新前的数据
        private List<Guid> beforeAdd = new List<Guid>();   //记录添加前的数据ID
        private List<TAggregate> beforeDelete = new List<TAggregate>();//记录数据删除前的数据

  然后在每添加一个writer之前,在对应的修改、添加、删除集合中添加元数据,这里看来必须要有数据库访问的了。没办法

 

            if (IsUseTransaction)
            {
                try
                {
                    beforeAdd.Add(entity.Id);//记录添加之前的数据的ID
                    writers.Add(new InsertOneModel<TAggregate>(entity));
                    isRollback = false;//控制是否回滚
                    return;
                }

   其它操作同理,后面我会把完整的代码贴出来的。先来分析一下。

3.处理提交事务逻辑   利用Collection.BulkWrite(writers)的返回值属性,找出实际影响的数据行数,这里我就按mssql的命名思路来了,同时如果若BulkWriteResult发生异常,我们也执行回滚

        #region 事务控制
        public void Commit()
        {
            if (!isRollback && writers.Count > 0)//如果不回滚,并且writers有数据
            {
                BulkWriteResult<TAggregate> result;
                try
                {
                    result = Collection.BulkWrite(writers);
                }
                catch (Exception)
                {
                    Rollback();//若BulkWriteResult发生异常
                    throw;
                }
                if(result.ProcessedRequests.Count!=writers.Count)//检查完成写入的数量,如果有误,回滚
                {
                    Rollback();
                }
                writers.Clear();//此时说明已成功提交,清空writers数据
                return;
            }
            Rollback();
        }

  

 4.回滚操作    回滚嘛,我们就来个反操作,根据不同的类型操作集合,遍历执行反操作写入数据库,至于这部分如果出问题,我现在还没时间搞,以后如果有需要,再改

        public void Rollback()
        {
            writers.Clear();//清空writers
            //执行反操作
            beforeDelete.ForEach(o =>
            {
                Collection.InsertOne(o);
            });
            beforeChange.ForEach(o =>
            {
                Collection.ReplaceOne(c => c.Id == o.Id, o);
            });
            beforeAdd.ForEach(o =>
            {
                Collection.DeleteOne(d => d.Id == o);
            });
            
        } 

5.修整后的Repostory

using EFAndMongoRepostory.Entity;
using MongoDB.Driver;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Linq.Expressions;

namespace EFAndMongoRepostory
{
    public class MongoRepostory<TAggregate> where TAggregate :AggregateBase
    {
        #region 初始化及字段属性设置
        /// <summary>
        /// 获取集合
        /// </summary>
        protected IMongoCollection<TAggregate> Collection;
        /// <summary>
        /// 初始化,以类名作为集合名称
        /// </summary>
        /// <param name="collection"></param>
        public MongoRepostory()
        {
            this.Collection = MongoDbContext.GetMongoCollection<TAggregate>(typeof(TAggregate).Name);
        }

        private List<WriteModel<TAggregate>> writers = new List<WriteModel<TAggregate>>();//写入模型

        /// <summary>
        /// 指示是否起用事务,默认true
        /// </summary>
        public bool IsUseTransaction { get; set; } = true;

        private List<TAggregate> beforeChange = new List<TAggregate>();//记录更新前的数据
        private List<Guid> beforeAdd = new List<Guid>();   //记录添加前的数据ID
        private List<TAggregate> beforeDelete = new List<TAggregate>();//记录数据删除前的数据


        private bool isRollback = false;//回滚控制 
        #endregion

        #region 添加
        /// <summary>
        /// 添加一条数据
        /// </summary>
        /// <param name="entity"></param>
        public void Add(TAggregate entity)
        {
            if (entity == null)
                return;
            if (IsUseTransaction)
            {
                try
                {
                    beforeAdd.Add(entity.Id);//记录添加之前的数据的ID
                    writers.Add(new InsertOneModel<TAggregate>(entity));
                    isRollback = false;//控制是否回滚
                    return;
                }
                catch (Exception ex)
                {
                    isRollback = true;
                    throw new Exception(ex.Message);
                }
            }
            try
            {
                Collection.InsertOne(entity);
            }
            catch (Exception ex)
            {
                throw new Exception(ex.Message);
            }
        }
        /// <summary>
        /// 添加数据集合
        /// </summary>
        /// <param name="entities"></param>
        public void Add(IEnumerable<TAggregate> entities)
        {
            if (entities.Count() <= 0)
                return;
            if(IsUseTransaction)
            {
                try
                {
                    entities.ToList().ForEach(o =>
                            {
                                beforeAdd.Add(o.Id);
                                writers.Add(new InsertOneModel<TAggregate>(o));
                            });
                    isRollback = false;
                    return;
                }
                catch (Exception ex)
                {
                    isRollback = true;
                    throw new Exception(ex.Message);
                }
            }
            try
            {
                Collection.InsertMany(entities);
            }
            catch (Exception ex)
            {
                throw new Exception(ex.Message);
            }
        }
        #endregion

        #region 替换
        /// <summary>
        /// 替换一条过滤的数据(请确保此方法Id属性是不能变)
        /// </summary>
        /// <param name="filter">过滤条件</param>
        /// <param name="enity">目标数据(目标数据的Id值必为源数据的Id)</param>
        public void ReplaceOne(Expression<Func<TAggregate, bool>> filter, TAggregate enity)
        {
            if (enity == null)
                return;
            if (IsUseTransaction)
            {
                try
                {
                    //先记录修改之前的数据
                    beforeChange.Add(Collection.Find(Builders<TAggregate>.Filter.Where(filter)).FirstOrDefault());
                    writers.Add(new ReplaceOneModel<TAggregate>(Builders<TAggregate>.Filter.Where(filter), enity));
                    isRollback = false;
                    return;
                }
                catch (Exception ex)
                {
                    isRollback = true;
                    throw new Exception(ex.Message);
                }
            }

            try
            {
                Collection.ReplaceOne(filter, enity);
            }
            catch (Exception ex)
            {
                throw new Exception(ex.Message);
            }
        }
        /// <summary>
        /// 替换一条数据(请确保此方法Id属性是不能变)
        /// </summary>
        /// <param name="id">目标id</param>
        /// <param name="enity">目标数据(目标数据的Id值必为源数据的Id)</param>
        public void ReplaceById(Guid id, TAggregate enity)
        {
            if (enity == null)
                return;
            if(enity.Id!=id)
            {                
                isRollback = true;
                throw new Exception("the id can not change");
            }
            if(IsUseTransaction)
            {
                try
                {
                    beforeChange.Add(Collection.Find(Builders<TAggregate>.Filter.Eq(o => o.Id, id)).FirstOrDefault());
                    writers.Add(new ReplaceOneModel<TAggregate>(Builders<TAggregate>.Filter.Eq(o=>o.Id, id), enity));
                    isRollback = false;
                    return;
                }
                catch (Exception ex)
                {
                    isRollback = true;
                    throw new Exception(ex.Message);
                }
            }
            try
            {
                Collection.ReplaceOne(o => o.Id == id, enity);
            }
            catch (Exception ex)
            {
                throw new Exception(ex.Message);
            }
        }
        /// <summary>
        /// 查找一条数据并且替换
        /// </summary>
        /// <param name="id">目标数据的id</param>
        /// <param name="enity">更改后的数据</param>
        /// <returns>更改前的数据</returns>
        public TAggregate FindOneAndReplace(Guid id, TAggregate enity)
        {
            if (enity == null)
                return null;
            if (enity.Id != id)
            {
                throw new Exception("the id can not change");
            }
            
            return Collection.FindOneAndReplace(o => o.Id == id, enity);
        }
        /// <summary>
        /// 查找一条数据并且替换
        /// </summary>
        /// <param name="filter">条件</param>
        /// <param name="enity">更改后的数据</param>
        /// <returns>更改前的数据</returns>
        public TAggregate FindOneAndReplace(Expression<Func<TAggregate,bool>>filter, TAggregate enity)
        {
            if (enity == null)
                return null;
            return Collection.FindOneAndReplace(filter, enity);
        }

        #endregion

        #region 移除
        /// <summary>
        /// 根据过滤删除数据
        /// </summary>
        /// <param name="filter"></param>
        public void Remoe(Expression<Func<TAggregate, bool>> filter)
        {
            if (IsUseTransaction)
            {
                try
                {
                    if(Collection.Find(filter).FirstOrDefault()==null)//如果要删除的数据不存在数据库中
                    {
                        throw new Exception("要删除的数据不存在数据库中");
                    }
                    beforeDelete.Add(Collection.Find(filter).FirstOrDefault());
                    writers.Add(new DeleteOneModel<TAggregate>(Builders<TAggregate>.Filter.Where(filter)));
                    isRollback = false;
                    return;
                }
                catch (Exception ex)
                {
                    isRollback = true;
                    throw new Exception(ex.Message);
                }
            }
            try
            {
                Collection.DeleteMany(filter);
            }
            catch (Exception ex)
            {
                throw new Exception(ex.Message);
            }
        }
        public void RemoveById(Guid id)
        {
            if (IsUseTransaction)
            {
                try
                {
                    beforeDelete.Add(Collection.Find(Builders<TAggregate>.Filter.Eq(o => o.Id, id)).FirstOrDefault());
                    writers.Add(new DeleteOneModel<TAggregate>(Builders<TAggregate>.Filter.Eq(o => o.Id, id)));
                    isRollback = false;
                    return;
                }
                catch (Exception ex)
                {
                    isRollback = true;
                    throw new Exception(ex.Message);
                }
            }
            try
            {
                Collection.DeleteOne(o => o.Id == id);
            }
            catch (Exception ex)
            {
                throw new Exception(ex.Message);
            }
        }
        #endregion

        #region 更新
        /// <summary>
        /// 过滤数据,执行更新操作(如不便使用,请用Replace相关的方法代替)
        /// 
        /// 一般用replace来代替这个方法。其实这个功能还算强大的,可以很自由修改多个属性
        /// 关健是set参数比较不好配置,并且如果用此方法,调用端必须引用相关的DLL,set举例如下
        /// set = Builders<TAggregate>.Update.Update.Set(o => o.Number, 1).Set(o => o.Description, "002.thml");
        /// set作用:将指定TAggregate类型的实例对象的Number属性值更改为1,Description属性值改为"002.thml"
        /// 说明:Builders<TAggregate>.Update返回类型为UpdateDefinitionBuilder<TAggregate>,这个类有很多静态
        /// 方法,Set()是其中一个,要求传入一个func的表达示,以指示当前要修改的,TAggregate类型中的属性类型,
        /// 另一个参数就是这个属性的值。
        /// 
        /// Builders<TAggregate>类有很多属性,返回很多如UpdateDefinitionBuilder<TAggregate>的很有用帮助类型
        /// 可以能参CSharpDriver-2.2.3.chm文件 下载MongoDB-CSharpDriver时带有些文件 
        /// 或从官网https://docs.mongodb.com/ecosystem/drivers/csharp/看看
        /// 
        /// </summary>
        /// <param name="filter">过滤条件</param>
        /// <param name="set">修改设置</param>
        public void Update(Expression<Func<TAggregate, bool>> filter, UpdateDefinition<TAggregate> set)
        {
            if (set == null)
                return;
            if (IsUseTransaction)//如果启用事务
            {
                try
                {
                    beforeChange.Add(Collection.Find(filter).FirstOrDefault());
                    writers.Add(new UpdateManyModel<TAggregate>(Builders<TAggregate>.Filter.Where(filter), set));
                    isRollback = false;//不回滚
                    return;//不执行后继操作
                }
                catch (Exception ex)
                {
                    isRollback = true;
                    throw new Exception(ex.Message);
                }
            }
            try
            {
                Collection.UpdateMany(filter, set);
            }
            catch (Exception ex)
            {
                throw new Exception(ex.Message);
            }
        }
        /// <summary>
        /// 过滤数据,执行更新操作(如不便使用,请用Replace相关的方法代替)
        /// 
        /// 一般用replace来代替这个方法。其实这个功能还算强大的,可以很自由修改多个属性
        /// 关健是set参数比较不好配置,并且如果用此方法,调用端必须引用相关的DLL,set举例如下
        /// set = Builders<TAggregate>.Update.Update.Set(o => o.Number, 1).Set(o => o.Description, "002.thml");
        /// set作用:将指定TAggregate类型的实例对象的Number属性值更改为1,Description属性值改为"002.thml"
        /// 说明:Builders<TAggregate>.Update返回类型为UpdateDefinitionBuilder<TAggregate>,这个类有很多静态
        /// 方法,Set()是其中一个,要求传入一个func的表达示,以指示当前要修改的,TAggregate类型中的属性类型,
        /// 另一个参数就是这个属性的值。
        /// 
        /// Builders<TAggregate>类有很多属性,返回很多如UpdateDefinitionBuilder<TAggregate>的很有用帮助类型
        /// 可以能参CSharpDriver-2.2.3.chm文件 下载MongoDB-CSharpDriver时带有些文件 
        /// 或从官网https://docs.mongodb.com/ecosystem/drivers/csharp/看看
        /// 
        /// </summary>
        /// <param name="id">找出指定的id数据</param>
        /// <param name="set">修改设置</param>
        public void Update(Guid id, UpdateDefinition<TAggregate> set)
        {
            if (set == null)
                return;
            if (IsUseTransaction)//如果启用事务
            {
                try
                {
                    beforeChange.Add(Collection.Find(Builders<TAggregate>.Filter.Eq(o => o.Id, id)).FirstOrDefault());
                    writers.Add(new UpdateManyModel<TAggregate>(Builders<TAggregate>.Filter.Eq(o => o.Id, id), set));
                    isRollback = false;//不回滚
                    return;//不执行后继操作
                }
                catch (Exception ex)
                {
                    isRollback = true;
                    throw new Exception(ex.Message);
                }
            }
            try
            {
                Collection.UpdateMany(o => o.Id == id, set);
            }
            catch (Exception ex)
            {
                throw new Exception(ex.Message);
            }
        }
        #endregion

        #region 事务控制
        public void Commit()
        {
            if (!isRollback && writers.Count > 0)//如果不回滚,并且writers有数据
            {
                BulkWriteResult<TAggregate> result;
                try
                {
                    result = Collection.BulkWrite(writers);
                }
                catch (Exception)
                {
                    Rollback();//若BulkWriteResult发生异常
                    throw;
                }
                if(result.ProcessedRequests.Count!=writers.Count)//检查完成写入的数量,如果有误,回滚
                {
                    Rollback();
                }
                writers.Clear();//此时说明已成功提交,清空writers数据
                return;
            }
            Rollback();
        }
        public void Rollback()
        {
            writers.Clear();//清空writers
            //执行反操作
            beforeDelete.ForEach(o =>
            {
                Collection.InsertOne(o);
            });
            beforeChange.ForEach(o =>
            {
                Collection.ReplaceOne(c => c.Id == o.Id, o);
            });
            beforeAdd.ForEach(o =>
            {
                Collection.DeleteOne(d => d.Id == o);
            });
            
        } 
        #endregion

        #region 查询
        /// <summary>
        /// 查找所有数据集合
        /// </summary>
        /// <returns></returns>
        public IQueryable<TAggregate> FindAll()
        {
            return Collection.AsQueryable();
        }
        /// <summary>
        /// 根据Id查找一条数据
        /// </summary>
        /// <param name="id"></param>
        /// <returns></returns>
        public TAggregate FindById(Guid id)
        {
            var find = Collection.Find(o => o.Id == id);
            if (!find.Any())
                return null;
            return find.FirstOrDefault();
        }
        /// <summary>
        /// 根据过滤条件找出符合条件的集合
        /// </summary>
        /// <param name="filter"></param>
        /// <returns></returns>
        public List<TAggregate> FindByFilter(Expression<Func<TAggregate, bool>> filter)
        {
            var find = Collection.Find(filter);
            if (!find.Any())
                return null;
            return find.ToList();
        }
        /// <summary>
        /// 根据过滤条件找出一条数据
        /// </summary>
        /// <param name="filter"></param>
        /// <returns></returns>
        public TAggregate FindOne(Expression<Func<TAggregate, bool>> filter)
        {
            return Collection.Find(filter).FirstOrDefault();
        }
        #endregion

        

        /// <summary>
        /// 根据聚合类ID添加导航数据到 导航集合(中间表)
        /// </summary>
        /// <typeparam name="TNav">导航类</typeparam>
        /// <param name="nav">提供参数时直接new一个具体的nav类就行了</param>
        /// <param name="filter"></param>
        /// <param name="foreignKey"></param>
        public void AddByAggregate<TNav>(TNav nav, Expression<Func<TAggregate, bool>> filter, Guid foreignKey)
                                        where TNav : NavgationBase
        {
            //导航类的集合
            var navCollection = MongoDbContext.GetMongoCollection<TNav>(typeof(TNav).Name);
            //遍历当前集合中所有符合条件的数据
            Collection.Find(filter).ToList().ForEach(o =>
            {
                //将导航类的属性赋相应的值
                nav.AggregateId = foreignKey;
                nav.ValueObjectId = o.Id;

                //插入到数据库
                navCollection.InsertOne(nav);
            });
        }

        
    }
}

6.测试一下   马上就要上班了,我也不啰嗦了,有注释

 

using EFAndMongoRepostory;
using EFAndMongoRepostory.Entity;
using MongoDB.Driver;
using System;
using System.Collections.Generic;
using System.Linq;

namespace MongoTest
{
    class Program
    {
        static void Main(string[] args)
        {

            #region 初始化
            var db = MongoDbContext.SetMongoDatabase("mongodb://localhost:27017", "MongoTest");
            #endregion
            

            #region 准备数据
            List<Role> rList = new List<Role>
                    {
                        new Role
                        {
                            Name="r001", Description="rd001"
                        },
                        new Role
                        {
                            Name="r002",Description="rd002"
                        },
                        new Role
                        {
                            Name="r003",Description="rd003"
                        }
                    };
            List<User> uList = new List<User>
            {
                new User
                {
                    Name="001", Pwd="pwd001"
                },
                new User
                {
                    Name="002",  Pwd="pwd002"
                }
                ,
                new User
                {
                    Name="003", Pwd="pwd003"
                }
                ,
                new User
                {
                    Name="004", Pwd="pwd004"
                }
            };
            List<Permission> pList = new List<Permission>
            {
                 new Permission {  Name="001", Url="001.html" },
                 new Permission {  Name="002", Url="002.html" },
                 new Permission {  Name="003", Url="003.html" },
                 new Permission {  Name="004", Url="004.html" },
                 new Permission {  Name="005", Url="005.html" }
            };
            #endregion

            MongoRepostory<User> repostory = new MongoRepostory<User>();
            //清空集合
            db.DropCollection(typeof(User).Name);

            //执行一次批量添加
            repostory.Add(uList);

            //提交后查询所有数据
            repostory.Commit();            
            repostory.FindAll().ToList().ForEach(o =>
            {
                Console.WriteLine(o.Name + ":" + o.Pwd + ":" + o.Number);
            });

            //执行一次插入操作
            repostory.Add(new User { Name = "005", Pwd = "uPwd005", Number = 5 });

            //执行一次替换操作
            var user = repostory.FindOne(o => o.Name == "001");
            user.Pwd = "wd001"; user.Number = 10; user.Name = "u001";
            repostory.ReplaceOne(o => o.Name == "001", user);



            var update2 = Builders<User>.Update.Set(o => o.Name, "u002").Set(o => o.Pwd, "wd002").Set(o => o.Number, 20);
            var update3 = Builders<User>.Update.Set(o => o.Name, "u003").Set(o => o.Pwd, "wd003").Set(o => o.Number, 30);
            var update4 = Builders<User>.Update.Set(o => o.Name, "u004").Set(o => o.Pwd, "wd004").Set(o => o.Number, 40);

            //执行3次更新操作
            repostory.Update(o => o.Name == "002", update2);
            repostory.Update(o => o.Name == "003", update3);
            repostory.Update(o => o.Name == "004", update4);

            //执行一次删除操作
            var u = repostory.FindOne(o => o.Name == "002");
            repostory.Remoe(o => o.Id==u.Id);

            //提交
            repostory.Commit();

            //

以上是关于看门外汉如何实现:C#操作 MongoDB基本CURD的事务控制之 第二部分的主要内容,如果未能解决你的问题,请参考以下文章

MongoDB的C#驱动基本使用

网络通信IO的演变过程(一个门外汉的理解)

C#官方文档阅读笔记--C#语言和.NET简介

C#操作MongoDB实现自增列怎么写

MongoDB简单使用 —— 基本操作

MongoDB简单使用 —— 基本操作