手撸golang GO与微服务 ES-CQRS模式之2

Posted ioly

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了手撸golang GO与微服务 ES-CQRS模式之2相关的知识,希望对你有一定的参考价值。

手撸golang GO与微服务 ES-CQRS模式之2

缘起

最近阅读 [Go微服务实战] (刘金亮, 2021.1)
本系列笔记拟采用golang练习之
gitee:

  • https://gitee.com/ioly/learning.gooop

ES-CQRS模式

ES(Event Sourcing)事件溯源非常好理解,
指的是将每次的事件都记录下来,
而不是去记录对象的状态。
比如新建、修改等都会作为事件记录下来,
当需要最新的状态时,通过事件的堆叠来计算最新的状态。
按照事件溯源的模式进行架构设计,
就是事件驱动架构(Event DrivenArchitecture, EDA)。

命令查询职责分离(CQRS)最早来自Betrand Meyer写的
Object-OrientedSoftware Construction一书,
指的是命令查询分离(Command Query Separation,CQS)。
其基本思想是任何一个对象的方法都可以分为以下两大类:
▪ 命令(Command):不返回任何结果(void),但会改变对象的状态。
▪ 查询(Query):返回结果,但是不会改变对象的状态,对系统没有副作用。
CQRS的核心出发点就是把系统分为读和写两部分,从而方便分别进行优化。

目标(Day 2)

  • 根据ES-CQRS模式, 大幅重构Day 1的设计, 并进行单元测试

设计

  • TodoDTO: 待办事宜数值对象
  • OperationTag: todo写入事件的类型标记
  • TodoEvent: todo写入事件
  • ClassTag: json序列化的类型标记
  • tJSONData: json序列化的数据容器
  • IEventBus: 事件总线接口
  • iTodoEventSerializer: 事件序列化到JSON数据的接口
  • iTodoReader: todo读取接口
  • iTodoWriter: todo写入接口
  • iJSONStore: json文件读写接口
  • ITodoService: todo待办事宜服务接口
  • tEventBus: 事件总线的实现
  • tTodoEventSerializer: 事件序列化到JSON的实现
  • tTodoWriter: 事件写入器的实现, 监听write指令, 并持久化到json存储
  • tMockJSONStore: 虚拟的JSON文件读写实现
  • tTodoReader: 待办事宜读取器, 监听write和load指令, 并计算todo列表的当前状态
  • tMockTodoService: 待办事宜服务的实现

单元测试

todo_app_test.go

package es_cqrs

import (
    td "learning/gooop/es_cqrs/todo_app"
    "testing"
)

func fnAssertTrue (t *testing.T, b bool, msg string) {
    if !b {
        t.Fatal(msg)
    }
}

func Test_TodoApp(t *testing.T) {
    t1 := &td.TodoDTO{ 1, "title-1", "content-1" }
    td.MockTodoService.Create(t1)

    all := td.MockTodoService.GetAll()
    fnAssertTrue(t, len(all) == 1, "expecting 1 item")
    fnAssertTrue(t, all[0].Title == t1.Title, "expecting " + t1.Title)
    t.Log("pass creating")

    t1.Content = "content-1 updated"
    t1.Title = "title-1 updated"
    td.MockTodoService.Update(t1)
    all = td.MockTodoService.GetAll()
    fnAssertTrue(t, len(all) == 1, "expecting 1 item")
    fnAssertTrue(t, all[0].Content == t1.Content, "expecting " + t1.Content)
    t.Log("pass updating")

    td.MockTodoService.Delete(t1)
    all = td.MockTodoService.GetAll()
    fnAssertTrue(t, len(all) == 0, "expecting 0 items")
    t.Log("pass deleting")
}

测试输出

$ go test -v todo_app_test.go 
=== RUN   Test_TodoApp
22:38:08.180382833 eventbus.Pub, event=todo.write.cmd, handler=tTodoWriter.1
22:38:08.180533659 tMockJSONStore.items: {"Tag":1,"Data":{"NO":1,"Title":"title-1","Content":"content-1"}}
22:38:08.180539669 eventbus.Pub, event=todo.write.cmd, handler=tTodoReader.2
22:38:08.180552255 tTodoReader.items: [&{1 title-1 content-1}]
22:38:08.180557245 eventbus.Pub, event=todo.read.cmd, handler=tTodoReader.2
22:38:08.180560995 eventbus.Pub, event=todo.read.ret, handler=tMockTodoService
    todo_app_test.go:21: pass creating
22:38:08.180580644 eventbus.Pub, event=todo.write.cmd, handler=tTodoWriter.1
22:38:08.180604465 tMockJSONStore.items: {"Tag":1,"Data":{"NO":1,"Title":"title-1","Content":"content-1"}}, {"Tag":2,"Data":{"NO":1,"Title":"title-1 updated","Content":"content-1 updated"}}
22:38:08.180612665 eventbus.Pub, event=todo.write.cmd, handler=tTodoReader.2
22:38:08.180618512 tTodoReader.items: [&{1 title-1 updated content-1 updated}]
22:38:08.18062244 eventbus.Pub, event=todo.read.cmd, handler=tTodoReader.2
22:38:08.180626445 eventbus.Pub, event=todo.read.ret, handler=tMockTodoService
    todo_app_test.go:29: pass updating
22:38:08.180642172 eventbus.Pub, event=todo.write.cmd, handler=tTodoWriter.1
22:38:08.180656612 tMockJSONStore.items: {"Tag":1,"Data":{"NO":1,"Title":"title-1","Content":"content-1"}}, {"Tag":2,"Data":{"NO":1,"Title":"title-1 updated","Content":"content-1 updated"}}, {"Tag":3,"Data":{"NO":1,"Title":"title-1 updated","Content":"content-1 updated"}}
22:38:08.180669129 eventbus.Pub, event=todo.write.cmd, handler=tTodoReader.2
22:38:08.180672774 tTodoReader.items: []
22:38:08.180675952 eventbus.Pub, event=todo.read.cmd, handler=tTodoReader.2
22:38:08.180679309 eventbus.Pub, event=todo.read.ret, handler=tMockTodoService
    todo_app_test.go:34: pass deleting
--- PASS: Test_TodoApp (0.00s)
PASS
ok      command-line-arguments  0.002s

TodoDTO.go

待办事宜数值对象

package todo_app

type TodoDTO struct {
    NO      int
    Title   string
    Content string
}

func (me *TodoDTO) Clone() *TodoDTO {
    return &TodoDTO{
        me.NO, me.Title, me.Content,
    }
}

OperationTag.go

todo写入事件的类型标记

package todo_app


type OperationTag int

const OPCreated OperationTag = 1
const OPUpdated OperationTag = 2
const OPDeleted OperationTag = 3

TodoEvent.go

todo写入事件

package todo_app

type TodoEvent struct {
    Tag OperationTag
    Data *TodoDTO
}

ClassTag.go

json序列化的类型标记

package todo_app

type ClassTag int

const TodoEventClass ClassTag = 1

tJSONData.go

json序列化的数据容器

package todo_app

import "encoding/json"

type tJSONData struct {
    Tag     ClassTag
    Content []byte
}



func (me *tJSONData) Set(tag ClassTag, it interface{}) error {
    me.Tag = tag

    j, e := json.Marshal(it)
    if e != nil {
        return e
    }
    me.Content = j
    return nil
}

func (me *tJSONData) Get(it interface{}) error {
    return json.Unmarshal(me.Content, it)
}

IEventBus.go

事件总线接口

package todo_app

type EventHandleFunc func(e string, args interface{})
type EventHandler struct {
    ID      string
    Handler EventHandleFunc
}

type IEventBus interface {
    Pub(e string, args interface{})
    Sub(e string, id string, handleFunc EventHandleFunc)
    Unsub(e string, id string)
}

const EventWriteTodoCmd = "todo.write.cmd"
const EventReadTodoCmd = "todo.read.cmd"
const EventReadTodoRet = "todo.read.ret"
const EventLoadTodoCmd = "todo.load.cmd"

iTodoEventSerializer.go

事件序列化到JSON数据的接口

package todo_app

type iTodoEventSerializer interface {
    Serialize(it *TodoEvent) *tJSONData
}

iTodoReader.go

todo读取接口

package todo_app

type iTodoReader interface {
    All() []*TodoDTO
    HandleTodoEvent(e *TodoEvent)
}

iTodoWriter.go

todo写入接口

package todo_app

type iTodoWriter interface {
    HandleTodoEvent(e *TodoEvent)
}

iJSONStore.go

json文件读写接口

package todo_app

type iJSONStore interface {
    Load()
    Append(it *tJSONData)
}

ITodoService.go

todo待办事宜服务接口

package todo_app

type ITodoService interface {
    Create(it *TodoDTO)
    Update(it *TodoDTO)
    Delete(it *TodoDTO)

    GetAll() []*TodoDTO
}

tEventBus.go

事件总线的实现

package todo_app

import (
    "learning/gooop/saga/mqs/logger"
    "sync"
)

type tEventBus struct {
    rwmutex *sync.RWMutex
    items   map[string][]*EventHandler
}

func newEventHandler(id string, handleFunc EventHandleFunc) *EventHandler {
    return &EventHandler{
        id, handleFunc,
    }
}

func newEventBus() IEventBus {
    it := new(tEventBus)
    it.init()
    return it
}

func (me *tEventBus) init() {
    me.rwmutex = new(sync.RWMutex)
    me.items = make(map[string][]*EventHandler)
}

func (me *tEventBus) Pub(e string, args interface{}) {
    me.rwmutex.RLock()
    defer me.rwmutex.RUnlock()

    handlers, ok := me.items[e]
    if ok {
        for _, it := range handlers {
            logger.Logf("eventbus.Pub, event=%s, handler=%s", e, it.ID)
            it.Handler(e, args)
        }
    }
}

func (me *tEventBus) Sub(e string, id string, handleFunc EventHandleFunc) {
    me.rwmutex.Lock()
    defer me.rwmutex.Unlock()

    handler := newEventHandler(id, handleFunc)
    handlers, ok := me.items[e]

    if ok {
        me.items[e] = append(handlers, handler)
    } else {
        me.items[e] = []*EventHandler{handler}
    }
}

func (me *tEventBus) Unsub(e string, id string) {
    me.rwmutex.Lock()
    defer me.rwmutex.Unlock()

    handlers, ok := me.items[e]
    if ok {
        for i, it := range handlers {
            if it.ID == id {
                lastI := len(handlers) - 1
                if i != lastI {
                    handlers[i], handlers[lastI] = handlers[lastI], handlers[i]
                }
                me.items[e] = handlers[:lastI]
            }
        }
    }
}

var GlobalEventBus = newEventBus()

tTodoEventSerializer.go

事件序列化到JSON的实现

package todo_app

type tTodoEventSerializer struct {
}

func newEventSeiralizer() iTodoEventSerializer {
    it := new(tTodoEventSerializer)
    return it
}

func (me *tTodoEventSerializer) Serialize(e *TodoEvent) *tJSONData {
    it := new(tJSONData)
    err := it.Set(TodoEventClass, e)
    if err != nil {
        return nil
    }
    return it
}

var gDefaultEventSerializer = newEventSeiralizer()

tTodoWriter.go

事件写入器的实现, 监听write指令, 并持久化到json存储

package todo_app

import (
    "fmt"
    "sync/atomic"
)

type tTodoWriter struct {
    id string
}

func newTodoWriter() iTodoWriter {
    it := new(tTodoWriter)
    it.init()
    return it
}

func (me *tTodoWriter) init() {
    me.id = fmt.Sprintf("tTodoWriter.%d", atomic.AddInt32(&gWriterCounter, 1))

    GlobalEventBus.Sub(EventWriteTodoCmd, me.id, me.handleWriteTodoCmd)
}

func (me *tTodoWriter) handleWriteTodoCmd(e string, args interface{}) {
    switch e {
    case EventWriteTodoCmd:
        if it, ok := args.(*TodoEvent); ok {
            me.HandleTodoEvent(it)
        }
        break
    }
}

func (me *tTodoWriter) HandleTodoEvent(e *TodoEvent) {
    j := gDefaultEventSerializer.Serialize(e)
    if j != nil {
        MockJSONStore.Append(j)
    }
}

var gWriterCounter int32 = 0

tMockJSONStore.go

虚拟的JSON文件读写实现

package todo_app

import (
    "fmt"
    "learning/gooop/saga/mqs/logger"
    "strings"
    "sync"
)

type tMockJSONStore struct {
    rwmutex *sync.RWMutex
    once    sync.Once
    items   []*tJSONData
}

func newMockJSONStore() iJSONStore {
    it := new(tMockJSONStore)
    it.init()
    return it
}

func (me *tMockJSONStore) init() {
    me.rwmutex = new(sync.RWMutex)
    me.items = []*tJSONData{}
}

func (me *tMockJSONStore) Load() {
    me.once.Do(func() {
        me.rwmutex.RLock()
        defer me.rwmutex.RUnlock()

        for _, it := range me.items {
            switch it.Tag {
            case TodoEventClass:
                v := new(TodoEvent)
                e := it.Get(v)
                if e == nil {
                    GlobalEventBus.Pub(EventLoadTodoCmd, e)
                }
                break
            }
        }
    })
}

func (me *tMockJSONStore) Append(it *tJSONData) {
    me.rwmutex.Lock()
    defer me.rwmutex.Unlock()

    me.items = append(me.items, it)

    lines := []string{}
    for _,it := range me.items {
        lines = append(lines, fmt.Sprintf("%s", string(it.Content)))
    }
    logger.Logf("tMockJSONStore.items: %s", strings.Join(lines, ", "))
}

var MockJSONStore = newMockJSONStore()

tTodoReader.go

待办事宜读取器, 监听write和load指令, 并计算todo列表的当前状态

package todo_app

import (
    "fmt"
    "learning/gooop/saga/mqs/logger"
    "strings"
    "sync"
    "sync/atomic"
)

type tTodoReader struct {
    id string
    rwmutex *sync.RWMutex
    items []*TodoDTO
}

func newTodoReader() iTodoReader {
    it := new(tTodoReader)
    it.init()
    return it
}

func (me *tTodoReader) init() {
    id := fmt.Sprintf("tTodoReader.%d", atomic.AddInt32(&gReaderCounter, 1))
    me.id = id
    me.rwmutex = new(sync.RWMutex)

    GlobalEventBus.Sub(EventWriteTodoCmd, me.id, me.handleEvent)
    GlobalEventBus.Sub(EventLoadTodoCmd, me.id, me.handleEvent)
    GlobalEventBus.Sub(EventReadTodoCmd, me.id, me.handleEvent)
}


func (me *tTodoReader) handleEvent(e string, args interface{}) {
    switch e {
    case EventWriteTodoCmd:
        fallthrough
    case EventLoadTodoCmd:
        if v,ok := args.(*TodoEvent);ok {
            me.HandleTodoEvent(v)
        }
        break

    case EventReadTodoCmd:
        me.handleReadTodoList()
    }
}

func (me *tTodoReader) handleReadTodoList() {
    GlobalEventBus.Pub(EventReadTodoRet, me.All())
}

func (me *tTodoReader) All() []*TodoDTO {
    me.rwmutex.RLock()
    defer me.rwmutex.RUnlock()

    lst := make([]*TodoDTO, len(me.items))
    for i,it := range me.items {
        lst[i] = it
    }
    return lst
}

func (me *tTodoReader) HandleTodoEvent(e *TodoEvent) {
    me.rwmutex.Lock()
    defer me.rwmutex.Unlock()

    switch e.Tag {
    case OPCreated:
        me.items = append(me.items, e.Data.Clone())
        break

    case OPUpdated:
        for i,it := range me.items {
            if it.NO == e.Data.NO {
                me.items[i] = e.Data.Clone()
                break
            }
        }
        break

    case OPDeleted:
        for i,it := range me.items {
            if it.NO == e.Data.NO {
                lastI := len(me.items) - 1
                if i == lastI {
                    me.items[i] = nil
                } else {
                    me.items[i], me.items[lastI] = me.items[lastI], nil
                }

                me.items = me.items[:lastI]
                break
            }
        }
        break
    }

    lines := []string{}
    for _,it := range me.items {
        lines = append(lines, fmt.Sprintf("%v", it))
    }
    logger.Logf("tTodoReader.items: [%s]", strings.Join(lines, ", "))
}

var gReaderCounter int32 = 1

tMockTodoService.go

待办事宜服务的实现, 提供todo项的CRUD

package todo_app

type tMockTodoService struct {
    items []*TodoDTO
    writer iTodoWriter
    reader iTodoReader
}

func newMockTodoService() ITodoService {
    it := new(tMockTodoService)
    it.init()
    return it
}

func (me *tMockTodoService) init() {
    me.writer = newTodoWriter()
    me.reader = newTodoReader()

    GlobalEventBus.Sub(EventReadTodoRet, "tMockTodoService", me.handleReadTodoRet)
}

func (me *tMockTodoService) handleReadTodoRet(e string, args interface{}) {
    switch e {
    case EventReadTodoRet:
        if it,ok := args.([]*TodoDTO);ok {
            me.items = it
        }
        break
    }
}

func (me *tMockTodoService) Create(it *TodoDTO) {
    GlobalEventBus.Pub(EventWriteTodoCmd, &TodoEvent{ OPCreated, it.Clone() })
}

func (me *tMockTodoService) Update(it *TodoDTO) {
    GlobalEventBus.Pub(EventWriteTodoCmd, &TodoEvent{ OPUpdated, it.Clone() })
}

func (me *tMockTodoService) Delete(it *TodoDTO) {
    GlobalEventBus.Pub(EventWriteTodoCmd, &TodoEvent{ OPDeleted, it.Clone() })
}

func (me *tMockTodoService) GetAll() []*TodoDTO {
    me.items = nil
    GlobalEventBus.Pub(EventReadTodoCmd, nil)

    lst := me.items
    me.items = nil
    return lst
}

var MockTodoService = newMockTodoService()

(ES-CQRS end)

以上是关于手撸golang GO与微服务 ES-CQRS模式之2的主要内容,如果未能解决你的问题,请参考以下文章

手撸golang GO与微服务 聚合模式之1

手撸golang GO与微服务 聚合模式之2

手撸golang GO与微服务 Saga模式之8 集成测试

手撸golang GO与微服务 Saga模式之1

手撸golang GO与微服务 Saga模式之7

手撸golang GO与微服务 Saga模式之5