手撸golang GO与微服务 ES-CQRS模式之1

Posted ioly

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了手撸golang GO与微服务 ES-CQRS模式之1相关的知识,希望对你有一定的参考价值。

手撸golang GO与微服务 ES-CQRS模式之1

缘起

最近阅读 [Go微服务实战] (刘金亮, 2021.1)
本系列笔记拟采用golang练习之
git地址: https://gitee.com/ioly/learning.gooop

ES-CQRS模式

ES(Event Sourcing)事件溯源非常好理解,
指的是将每次的事件都记录下来,
而不是去记录对象的状态。
比如新建、修改等都会作为事件记录下来,
当需要最新的状态时,通过事件的堆叠来计算最新的状态。
按照事件溯源的模式进行架构设计,
就是事件驱动架构(Event DrivenArchitecture, EDA)。

命令查询职责分离(CQRS)最早来自Betrand Meyer写的
Object-OrientedSoftware Construction一书,
指的是命令查询分离(Command Query Separation,CQS)。
其基本思想是任何一个对象的方法都可以分为以下两大类:
▪ 命令(Command):不返回任何结果(void),但会改变对象的状态。
▪ 查询(Query):返回结果,但是不会改变对象的状态,对系统没有副作用。
CQRS的核心出发点就是把系统分为读和写两部分,从而方便分别进行优化。

目标(Day 1)

  • 根据ES-CQRS模式, 设计"TODO - 待办事宜"程序

设计

  • TodoDTO: 待办事宜数值对象
  • TodoCreatedEvent: 创建todo事件
  • TodoUpdatedEvent: 修改todo事件
  • TodoRemovedEvent: 删除todo事件
  • IEventBus: 事件总线接口
  • iTodoEventSerializer: 事件序列化到JSON数据的接口
  • iTodoReader: todo读取接口
  • iTodoWriter: todo写入接口
  • iJSONStore: json文件读写接口
  • tEventBus: 事件总线的实现
  • tTodoEventSerializer: 事件序列化到JSON的实现
  • tTodoWriter: 事件写入器的实现
  • tMockJSONStore: 虚拟的JSON文件读写实现
  • tTodoReader: 未完成

TodoDTO.go

待办事宜数值对象

package todo_app

type TodoDTO struct {
    NO int
    Title string
    Content string
}

TodoCreatedEvent.go

todo事项创建事件

package todo_app

type TodoCreatedEvent struct {
    Data *TodoDTO
}

TodoUpdatedEvent.go

todo事项修改事件

package todo_app

type TodoUpdatedEvent struct {
    Data *TodoDTO
}

TodoRemovedEvent.go

todo事项删除事件

package todo_app

type TodoRemovedEvent struct {
    NO int
}

IEventBus.go

事件总线接口

package todo_app


type EventHandleFunc func(e string, args interface{})
type EventHandler struct {
    ID string
    Handler EventHandleFunc
}

type IEventBus interface {
    Pub(e string, args interface{})
    Sub(e string, id string, handleFunc EventHandleFunc)
    Unsub(e string, id string)
}


const EventTodoCreated = "todo.created"
const EventTodoUpdated = "todo.updated"
const EventTodoRemoved = "todo.removed"

iTodoEventSerializer.go

事件序列化到JSON数据的接口

package todo_app

type iTodoEventSerializer interface {
    SerializeCreatedEvent(it *TodoCreatedEvent) *tJSONData
    SerializeUpdatedEvent(it *TodoUpdatedEvent) *tJSONData
    SerializeRemovedEvent(it *TodoRemovedEvent) *tJSONData
}

iTodoReader.go

todo读取接口

package todo_app

type iTodoReader interface {
    All() []*TodoDTO
}

iTodoWriter.go

todo写入接口

package todo_app

type iTodoWriter interface {
    HandleCreated(e *TodoCreatedEvent)
    HandleUpdated(e *TodoUpdatedEvent)
    HandleRemoved(e *TodoRemovedEvent)
}

iJSONStore.go

json文件读写接口

package todo_app

type iJSONStore interface {
    Load()
    Append(it *tJSONData)
}

tEventBus.go

事件总线的实现

package todo_app

import (
    "learning/gooop/saga/mqs/logger"
    "sync"
)

type tEventBus struct {
    rwmutex *sync.RWMutex
    items map[string][]*EventHandler
}


func newEventHandler(id string, handleFunc EventHandleFunc) *EventHandler {
    return &EventHandler{
        id, handleFunc,
    }
}

func newEventBus() IEventBus {
    it := new(tEventBus)
    it.init()
    return it
}

func (me *tEventBus) init() {
    me.rwmutex = new(sync.RWMutex)
    me.items = make(map[string][]*EventHandler)
}

func (me *tEventBus) Pub(e string, args interface{}) {
    me.rwmutex.RLock()
    defer me.rwmutex.RUnlock()

    handlers,ok := me.items[e]
    if ok {
        for _,it := range handlers {
            logger.Logf("eventbus.Pub, event=%s, handler=%s", e, it.ID)
            go it.Handler(e, args)
        }
    }
}

func (me *tEventBus) Sub(e string, id string, handleFunc EventHandleFunc) {
    me.rwmutex.Lock()
    defer me.rwmutex.Unlock()

    handler := newEventHandler(id, handleFunc)
    handlers,ok := me.items[e]

    if ok {
        me.items[e] = append(handlers, handler)
    } else {
        me.items[e] = []*EventHandler{handler }
    }
}


func (me *tEventBus) Unsub(e string, id string) {
    me.rwmutex.Lock()
    defer me.rwmutex.Unlock()

    handlers,ok := me.items[e]
    if ok {
        for i,it := range handlers {
            if it.ID == id {
                lastI := len(handlers) - 1
                if i != lastI {
                    handlers[i], handlers[lastI] = handlers[lastI], handlers[i]
                }
                me.items[e] = handlers[:lastI]
            }
        }
    }
}

var GlobalEventBus = newEventBus()

tTodoEventSerializer.go

事件序列化到JSON的实现

package todo_app

type tTodoEventSerializer struct {
}

func newEventSeiralizer() iTodoEventSerializer {
    it := new(tTodoEventSerializer)
    return it
}


func (me *tTodoEventSerializer) serializeWithTag(tag int, v interface{}) *tJSONData {
    it := new(tJSONData)
    err := it.Set(TagCreated, v)
    if err != nil {
        return nil
    }
    return it
}

func (me *tTodoEventSerializer) SerializeCreatedEvent(e *TodoCreatedEvent) *tJSONData {
    return me.serializeWithTag(TagCreated, e)
}

func (me *tTodoEventSerializer) SerializeUpdatedEvent(e *TodoUpdatedEvent) *tJSONData {
    return me.serializeWithTag(TagUpdated, e)
}

func (me *tTodoEventSerializer) SerializeRemovedEvent(e *TodoRemovedEvent) *tJSONData {
    return me.serializeWithTag(TagRemoved, e)
}


const TagCreated = 1
const TagUpdated = 2
const TagRemoved = 3

var gDefaultEventSerializer = newEventSeiralizer()

tTodoWriter.go

事件写入器的实现

package todo_app


type tTodoWriter struct {
}


func newTodoWriter() iTodoWriter {
    it := new(tTodoWriter)
    it.init()
    return it
}

func (me *tTodoWriter) init() {
    GlobalEventBus.Sub("todo.created", "", me.handleEvent)
}


func (me *tTodoWriter) handleEvent(e string, args interface{}) {
    switch e {
    case EventTodoCreated:
        if it,ok := args.(*TodoCreatedEvent);ok {
            me.HandleCreated(it)
        }
        break

    case EventTodoUpdated:
        if it,ok := args.(*TodoUpdatedEvent);ok {
            me.HandleUpdated(it)
        }
        break

    case EventTodoRemoved:
        if it,ok := args.(*TodoRemovedEvent);ok {
            me.HandleRemoved(it)
        }
        break
    }
}


func (me *tTodoWriter) HandleCreated(e *TodoCreatedEvent) {
    j := gDefaultEventSerializer.SerializeCreatedEvent(e)
    if j != nil {
        MockJSONStore.Append(j)
    }
}

func (me *tTodoWriter) HandleUpdated(e *TodoUpdatedEvent) {
    j := gDefaultEventSerializer.SerializeUpdatedEvent(e)
    if j != nil {
        MockJSONStore.Append(j)
    }
}

func (me *tTodoWriter) HandleRemoved(e *TodoRemovedEvent) {
    j := gDefaultEventSerializer.SerializeRemovedEvent(e)
    if j != nil {
        MockJSONStore.Append(j)
    }
}

tMockJSONStore.go

虚拟的JSON文件读写实现

package todo_app

import "sync"

type tMockJSONStore struct {
    rwmutex *sync.RWMutex
    once sync.Once
    items []*tJSONData
}

func newMockJSONStore() iJSONStore {
    it := new(tMockJSONStore)
    it.init()
    return it
}

func (me *tMockJSONStore) init() {
    me.rwmutex = new(sync.RWMutex)
    me.items = []*tJSONData{}
}


func (me *tMockJSONStore) Load() {
    me.once.Do(func() {
        me.rwmutex.RLock()
        defer me.rwmutex.RUnlock()

        for _,it := range me.items {
            switch it.Tag {
            case TagCreated:
                v := new(TodoCreatedEvent)
                e := it.Get(v)
                if e == nil {
                    GlobalEventBus.Pub(EventTodoCreated, e)
                }
                break

            case TagUpdated:
                v := new(TodoUpdatedEvent)
                e := it.Get(v)
                if e == nil {
                    GlobalEventBus.Pub(EventTodoUpdated, e)
                }
                break

            case TagRemoved:
                v := new(TodoRemovedEvent)
                e := it.Get(v)
                if e == nil {
                    GlobalEventBus.Pub(EventTodoRemoved, e)
                }
                break
            }

        }
    })
}

func (me *tMockJSONStore) Append(it *tJSONData) {
    me.rwmutex.Lock()
    defer me.rwmutex.Unlock()

    me.items = append(me.items, it)
}

var MockJSONStore = newMockJSONStore()

(未完待续)

以上是关于手撸golang GO与微服务 ES-CQRS模式之1的主要内容,如果未能解决你的问题,请参考以下文章

手撸golang GO与微服务 聚合模式之1

手撸golang GO与微服务 聚合模式之2

手撸golang GO与微服务 Saga模式之8 集成测试

手撸golang GO与微服务 Saga模式之1

手撸golang GO与微服务 Saga模式之7

手撸golang GO与微服务 Saga模式之5