一种基于CQRS的实践实现
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了一种基于CQRS的实践实现相关的知识,希望对你有一定的参考价值。
在CQRS中,查询方面,直接通过方法查询数据库,然后通过DTO将数据返回。操作(Command)方面,通过发送Command实现,由CommandBus处理特定的Command,然后由Command将特定的Event发布到EventBus上,然后EventBus使用特定的Handler来处理事件,执行一些诸如,修改,删除,更新等操作。这里,所有与Command相关的操作都通过Event实现。这样我们可以通过记录Event来记录系统的运行历史记录,并且能够方便的回滚到某一历史状态。Event Sourcing就是用来进行存储和管理事件的。
CQRS的简单实现
CQRS模式在思想上简洁明了,但是实现上还是有些复杂。它涉及到DDD,以及Event Sourcing,下面用一个在线记日志(Diary)系统,实现了日志的增删改查功能。整体结构如下:
上图很清晰的说明了CQRS在读写方面的分离,在读方面,通过QueryFacade到数据库里去读取数据,这个库有可能是ReportingDB。在写方面,比较复杂,操作通过Command发送到CommandBus上,然后特定的CommandHandler处理请求,产生对应的Event,将Eevnt持久化后,通过EventBus特定的EevntHandler对数据库进行修改等操作。
整体结构如下:
由三个项目构成,Diary.CQRS包含了所有的Domain和消息对象。Configuration通过使用一个名为StructMap的IOC来初始化一些变量方便Web调用,Web是一个简单的MVC3项目,在Controller中有与CQRS交互的代码。
下面分别看Query和Command方面的实现:
Query方向的实现
查询方面很简单,日志列表和明细获取就是简单的查询。下面先看列表查询部分的代码。
public ActionResult Index() { ViewBag.Model = ServiceLocator.ReportDatabase.GetItems(); return View(); } public ActionResult Edit(Guid id) { var item = ServiceLocator.ReportDatabase.GetById(id); var model = new DiaryItemDto() { Description = item.Description, From = item.From, Id = item.Id, Title = item.Title, To = item.To, Version = item.Version }; return View(model); }
ReportDatabase的GetItems和GetById(id)方法就是简单的查询,从命名可以看出他是ReportDatabase。
public class ReportDatabase : IReportDatabase { static List<DiaryItemDto> items = new List<DiaryItemDto>(); public DiaryItemDto GetById(Guid id) { return items.Where(a => a.Id == id).FirstOrDefault(); } public void Add(DiaryItemDto item) { items.Add(item); } public void Delete(Guid id) { items.RemoveAll(i => i.Id == id); } public List<DiaryItemDto> GetItems() { return items; } }
ReportDataBase只是在内部维护了一个List的DiaryItemDto列表。在使用的时候,是通过IRepositoryDatabase对其进行操作的,这样便于mock代码。
Query方面的代码很简单。在实际的应用中,这一块就是直接对DB进行查询,然后通过DTO对象返回,这个DB可能是应对特定场景的报表数据库,这样可以提升查询性能。
Command方向的实现
Command的实现比较复杂,下面以简单的创建一个新的日志来说明。
在MVC的Control中,可以看到Add的Controller中只调用了一句话:
[HttpPost] public ActionResult Add(DiaryItemDto item) { ServiceLocator.CommandBus.Send(new CreateItemCommand(Guid.NewGuid(), item.Title, item.Description, -1, item.From, item.To)); return RedirectToAction("Index"); }
首先声明了一个CreateItemCommand,这个Command只是保存了一些必要的信息。
public class CreateItemCommand:Command { public string Title { get; internal set; } public string Description { get;internal set; } public DateTime From { get; internal set; } public DateTime To { get; internal set; } public CreateItemCommand(Guid aggregateId, string title, string description,int version,DateTime from, DateTime to) : base(aggregateId,version) { Title = title; Description = description; From = from; To = to; } }
然后将Command发送到了CommandBus上,其实就是让CommandBus来选择合适的CommandHandler来处理。
public class CommandBus:ICommandBus { private readonly ICommandHandlerFactory _commandHandlerFactory; public CommandBus(ICommandHandlerFactory commandHandlerFactory) { _commandHandlerFactory = commandHandlerFactory; } public void Send<T>(T command) where T : Command { var handler = _commandHandlerFactory.GetHandler<T>(); if (handler != null) { handler.Execute(command); } else { throw new UnregisteredDomainCommandException("no handler registered"); } } }
这个里面需要值得注意的是CommandHandlerFactory这个类型的GetHandler方法,他接受一个类型为T的泛型,这里就是我们之前传入的CreateItemCommand。来看他的GetHandler方法。
public class StructureMapCommandHandlerFactory : ICommandHandlerFactory { public ICommandHandler<T> GetHandler<T>() where T : Command { var handlers = GetHandlerTypes<T>().ToList(); var cmdHandler = handlers.Select(handler => (ICommandHandler<T>)ObjectFactory.GetInstance(handler)).FirstOrDefault(); return cmdHandler; } private IEnumerable<Type> GetHandlerTypes<T>() where T : Command { var handlers = typeof(ICommandHandler<>).Assembly.GetExportedTypes() .Where(x => x.GetInterfaces() .Any(a => a.IsGenericType && a.GetGenericTypeDefinition() == typeof(ICommandHandler<>) )) .Where(h=>h.GetInterfaces() .Any(ii=>ii.GetGenericArguments() .Any(aa=>aa==typeof(T)))).ToList(); return handlers; } }
这里可以看到,他首先查找当前的程序集中(ICommandHandler)所在的程序集中的所有的实现了ICommandHandler的接口的类型,然后在所有的类型找查找实现了该泛型接口并且泛型的类型参数类型为T类型的所有类型。以上面的代码为例,就是要找出实现了ICommandHandler<CreateItemCommand>接口的类型。可以看到就是CreateItemCommandHandler类型。
public class CreateItemCommandHandler : ICommandHandler<CreateItemCommand> { private IRepository<DiaryItem> _repository; public CreateItemCommandHandler(IRepository<DiaryItem> repository) { _repository = repository; } public void Execute(CreateItemCommand command) { if (command == null) { throw new ArgumentNullException("command"); } if (_repository == null) { throw new InvalidOperationException("Repository is not initialized."); } var aggregate = new DiaryItem(command.Id, command.Title, command.Description, command.From, command.To); aggregate.Version = -1; _repository.Save(aggregate, aggregate.Version); } }
找到之后然后使用IOC实例化了该对象返回。
现在CommandBus中,找到了处理特定Command的Handler。然后执行该类型的Execute方法。
可以看到在该类型中实例化了一个名为aggregate的DiaryItem对象。这个和我们之前查询所用到的DiaryItemDto有所不同,这个一个领域对象,里面包含了一系列事件。
public class DiaryItem : AggregateRoot, IHandle<ItemCreatedEvent>, IHandle<ItemRenamedEvent>, IHandle<ItemFromChangedEvent>, IHandle<ItemToChangedEvent>, IHandle<ItemDescriptionChangedEvent>, IOriginator { public string Title { get; set; } public DateTime From { get; set; } public DateTime To { get; set; } public string Description { get; set; } public DiaryItem() { } public DiaryItem(Guid id,string title, string description, DateTime from, DateTime to) { ApplyChange(new ItemCreatedEvent(id, title,description, from, to)); } public void ChangeTitle(string title) { ApplyChange(new ItemRenamedEvent(Id, title)); } public void Handle(ItemCreatedEvent e) { Title = e.Title; From = e.From; To = e.To; Id = e.AggregateId; Description = e.Description; Version = e.Version; } public void Handle(ItemRenamedEvent e) { Title = e.Title; } ... }
ItemCreatedEvent 事件的定义如下,其实就是用来存储传输过程中需要用到的数据。
public class ItemCreatedEvent:Event { public string Title { get; internal set; } public DateTime From { get; internal set; } public DateTime To { get; internal set; } public string Description { get;internal set; } public ItemCreatedEvent(Guid aggregateId, string title , string description, DateTime from, DateTime to) { AggregateId = aggregateId; Title = title; From = from; To = to; Description = description; } }
可以看到在Domain对象中,除了定义基本的字段外,还定义了一些相应的事件,比如在构造函数中,实际上是发起了一个名为ItemCreateEvent的事件,同时还定义了处理时间的逻辑,这些逻辑都放在名为Handle的接口方法发,例如ItemCerateEvent的处理方法为Handle(ItemCreateEvent)方法。
ApplyChange方法在AggregateRoot对象中,他是聚集根,这是DDD中的概念。通过这个根可以串起所有对象。 该类实现了IEventProvider接口,他保存了所有在_changes中的所有没有提交的变更,其中的ApplyChange的用来为特定的Event查找Eventhandler的方法:
public abstract class AggregateRoot : IEventProvider { private readonly List<Event> _changes; public Guid Id { get; internal set; } public int Version { get; internal set; } public int EventVersion { get; protected set; } protected AggregateRoot() { _changes = new List<Event>(); } public IEnumerable<Event> GetUncommittedChanges() { return _changes; } public void MarkChangesAsCommitted() { _changes.Clear(); } public void LoadsFromHistory(IEnumerable<Event> history) { foreach (var e in history) ApplyChange(e, false); Version = history.Last().Version; EventVersion = Version; } protected void ApplyChange(Event @event) { ApplyChange(@event, true); } private void ApplyChange(Event @event, bool isNew) { dynamic d = this; d.Handle(Converter.ChangeTo(@event, @event.GetType())); if (isNew) { _changes.Add(@event); } } }
在ApplyChange的实现中,this其实就是对应的实现了AggregateRoot的DiaryItem的Domain对象,调用的Handle方法就是我们之前在DiaryItem中定义的行为。然后将该event保存在内部的未提交的事件列表中。相关的信息及事件都保存在了定义的aggregate对象中并返回。
然后Command继续执行,然后调用了_repository.Save(aggregate, aggregate.Version);这个方法。先看这个Repository对象。
public class Repository<T> : IRepository<T> where T : AggregateRoot, new() { private readonly IEventStorage _storage; private static object _lockStorage = new object(); public Repository(IEventStorage storage) { _storage = storage; } public void Save(AggregateRoot aggregate, int expectedVersion) { if (aggregate.GetUncommittedChanges().Any()) { lock (_lockStorage) { var item = new T(); if (expectedVersion != -1) { item = GetById(aggregate.Id); if (item.Version != expectedVersion) { throw new ConcurrencyException(string.Format("Aggregate {0} has been previously modified",item.Id)); } } _storage.Save(aggregate); } } } public T GetById(Guid id) { IEnumerable<Event> events; var memento = _storage.GetMemento<BaseMemento>(id); if (memento != null) { events = _storage.GetEvents(id).Where(e=>e.Version>=memento.Version); } else { events = _storage.GetEvents(id); } var obj = new T(); if(memento!=null) ((IOriginator)obj).SetMemento(memento); obj.LoadsFromHistory(events); return obj; } }
这个方法主要是用来对事件进行持久化的。 所有的聚合的变动都会存在该Repository中,首先,检查当前的聚合是否和之前存储在storage中的聚合一致,如果不一致,则表示对象在其他地方被更改过,抛出ConcurrencyException,否则将该变动保存在Event Storage中。
IEventStorage用来存储所有的事件,其实现类型为InMemoryEventStorage。
public class InMemoryEventStorage:IEventStorage { private List<Event> _events; private List<BaseMemento> _mementos; private readonly IEventBus _eventBus; public InMemoryEventStorage(IEventBus eventBus) { _events = new List<Event>(); _mementos = new List<BaseMemento>(); _eventBus = eventBus; } public IEnumerable<Event> GetEvents(Guid aggregateId) { var events = _events.Where(p => p.AggregateId == aggregateId).Select(p => p); if (events.Count() == 0) { throw new AggregateNotFoundException(string.Format("Aggregate with Id: {0} was not found", aggregateId)); } return events; } public void Save(AggregateRoot aggregate) { var uncommittedChanges = aggregate.GetUncommittedChanges(); var version = aggregate.Version; foreach (var @event in uncommittedChanges) { version++; if (version > 2) { if (version % 3 == 0) { var originator = (IOriginator)aggregate; var memento = originator.GetMemento(); memento.Version = version; SaveMemento(memento); } } @event.Version=version; _events.Add(@event); } foreach (var @event in uncommittedChanges) { var desEvent = Converter.ChangeTo(@event, @event.GetType()); _eventBus.Publish(desEvent); } } public T GetMemento<T>(Guid aggregateId) where T : BaseMemento { var memento = _mementos.Where(m => m.Id == aggregateId).Select(m=>m).LastOrDefault(); if (memento != null) return (T) memento; return null; } public void SaveMemento(BaseMemento memento) { _mementos.Add(memento); } }
在GetEvent方法中,会找到所有的聚合根Id相关的事件。在Save方法中,将所有的事件保存在内存中,然后每隔三个事件建立一个快照。可以看到这里面使用了备忘录模式。
然后在foreach循环中,对于所有的没有提交的变更,EventBus将该事件发布出去。
现在,所有的发生变更的事件已经记录下来了。事件已经被发布到EventBus上,然后对应的EventHandler再处理对应的事件,然后与DB交互。现在来看EventBus的Publish方法。
public class EventBus:IEventBus { private IEventHandlerFactory _eventHandlerFactory; public EventBus(IEventHandlerFactory eventHandlerFactory) { _eventHandlerFactory = eventHandlerFactory; } public void Publish<T>(T @event) where T : Event { var handlers = _eventHandlerFactory.GetHandlers<T>(); foreach (var eventHandler in handlers) { eventHandler.Handle(@event); } } }
可以看到EventBus的Publish和CommandBus中的Send方法很相似,都是首先通过EventHandlerFactory查找对应Event的Handler,然后调用其Handler方法。比如
public class StructureMapEventHandlerFactory : IEventHandlerFactory { public IEnumerable<IEventHandler<T>> GetHandlers<T>() where T : Event { var handlers = GetHandlerType<T>(); var lstHandlers = handlers.Select(handler => (IEventHandler<T>) ObjectFactory.GetInstance(handler)).ToList(); return lstHandlers; } private static IEnumerable<Type> GetHandlerType<T>() where T : Event { var handlers = typeof(IEventHandler<>).Assembly.GetExportedTypes() .Where(x => x.GetInterfaces() .Any(a => a.IsGenericType && a.GetGenericTypeDefinition() == typeof(IEventHandler<>))) .Where(h => h.GetInterfaces() .Any(ii => ii.GetGenericArguments() .Any(aa => aa == typeof(T)))) .ToList(); return handlers; } }
然后返回并实例化了ItemCreatedEventHandler 对象,该对象的实现如下:
public class ItemCreatedEventHandler : IEventHandler<ItemCreatedEvent> { private readonly IReportDatabase _reportDatabase; public ItemCreatedEventHandler(IReportDatabase reportDatabase) { _reportDatabase = reportDatabase; } public void Handle(ItemCreatedEvent handle) { DiaryItemDto item = new DiaryItemDto() { Id = handle.AggregateId, Description = handle.Description, From = handle.From, Title = handle.Title, To=handle.To, Version = handle.Version }; _reportDatabase.Add(item); } }
可以看到在Handler方法中,从事件中获取参数,然后新建DTO对象,然后将该对象更新到DB中。
到此,整个Command执行完成。
结语
CQRS是一种思想很简单清晰的设计模式,他通过在业务上分离操作和查询来使得系统具有更好的可扩展性及性能,使得能够对系统的不同部分进行扩展和优化。在CQRS中,所有的涉及到对DB的操作都是通过发送Command,然后特定的Command触发对应事件来完成操作,这个过程是异步的,并且所有涉及到对系统的变更行为都包含在具体的事件中,结合Eventing Source模式,可以记录下所有的事件,而不是以往的某一点的数据信息,这些信息可以作为系统的操作日志,可以来对系统进行回退或者重放。
以上是关于一种基于CQRS的实践实现的主要内容,如果未能解决你的问题,请参考以下文章
3.16 Go微服务实战(微服务理论) --- Go语言基于ES-CQRS的微服务实践
3.16 Go微服务实战(微服务理论) --- Go语言基于ES-CQRS的微服务实践