手撸golang GO与微服务 ChatServer之2

Posted ioly

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了手撸golang GO与微服务 ChatServer之2相关的知识,希望对你有一定的参考价值。

缘起

最近阅读<<Go微服务实战>> (刘金亮, 2021.1)
本系列笔记拟采用golang练习之

案例需求(聊天服务器)

  • 用户可以连接到服务器。
  • 用户可以设定自己的用户名。
  • 用户可以向服务器发送消息,同时服务器也会向其他用户广播该消息。

目标

  • 实现聊天服务端, 支持端口监听, 多个客户端的连入, 消息收发, 广播, 断开, 并采集日志
  • 改造已有的聊天客户端, 使之能同时适配客户端和服务端的通信, 并设置写缓冲以防止死锁
  • 测试多个客户端的连入, 收发和断开, 并诊断服务端日志

设计

  • IMsg: 定义消息接口, 以及相关消息的实现. 为方便任意消息内容的解码, 消息传输时, 采用base64转码
  • IMsgDecoder: 定义消息解码器及其实现
  • IChatClient: 定义聊天客户端接口. 本次添加关闭通知方法, 以适配服务端.
  • tChatClient: 聊天客户端, 实现IChatClient接口. 本次添加关闭通知, 写缓冲和读超时控制, 修复写循环细节问题.
  • IChatServer: 定义聊天服务器接口, 为方便测试, 提供日志采集方法
  • tChatServer: 实现聊天服务器IChatServer

单元测试

ChatServer_test.go

package chat_server

import (
    "fmt"
    cs "learning/gooop/chat_server"
    "strings"
    "testing"
    "time"
)

func Test_ChatServer(t *testing.T) {
    fnAssertTrue := func(b bool, msg string) {
        if !b {
            t.Fatal(msg)
        }
    }

    port := 3333
    server := cs.NewChatServer()
    err := server.Open(port)
    if err != nil {
        t.Fatal(err)
    }

    clientCount := 3
    address := fmt.Sprintf("localhost:%v", port)
    for i := 0;i < clientCount;i++ {
        err, client := cs.DialChatClient(address)
        if err != nil {
            t.Fatal(err)
        }

        id := fmt.Sprintf("c%02d", i)
        client.RecvHandler(func(client cs.IChatClient, msg cs.IMsg) {
            t.Logf("%v recv: %v\\n", id, msg)
        })

        go func() {
            client.SetName(id)
            client.Send(&cs.NameMsg{id })

            n := 0
            for range time.Tick(time.Duration(1) * time.Second) {
                client.Send(&cs.ChatMsg{id, fmt.Sprintf("msg %02d from %v", n, id) })

                n++
                if n >= 3 {
                    break
                }
            }

            client.Close()
        }()
    }

    passedSeconds := 0
    for range time.Tick(time.Second) {
        passedSeconds++
        t.Logf("%v seconds passed", passedSeconds)

        if passedSeconds >= 5 {
            break
        }
    }
    server.Close()

    logs := server.GetLogs()
    fnHasLog := func(log string) bool {
        for _,it := range logs {
            if strings.Contains(it, log) {
                return true
            }
        }
        return false
    }

    for i := 0;i < clientCount;i++ {
        msg := fmt.Sprintf("tChatServer.handleIncomingConn, clientCount=%v", i + 1)
        fnAssertTrue(fnHasLog(msg), "expecting log: " + msg)

        msg = fmt.Sprintf("tChatServer.handleClientClosed, c%02d", i)
        fnAssertTrue(fnHasLog(msg), "expecting log: " + msg)
    }
}

测试输出

$ go test -v ChatServer_test.go 
=== RUN   Test_ChatServer
tChatServer.handleIncomingConn, clientCount=1
tChatServer.handleIncomingConn, clientCount=2
tChatServer.handleIncomingConn, clientCount=3
    ChatServer_test.go:59: 1 seconds passed
    ChatServer_test.go:35: c00 recv: &{c00 msg 00 from c00}
    ChatServer_test.go:35: c02 recv: &{c00 msg 00 from c00}
    ChatServer_test.go:35: c02 recv: &{c01 msg 00 from c01}
    ChatServer_test.go:35: c01 recv: &{c00 msg 00 from c00}
    ChatServer_test.go:35: c01 recv: &{c01 msg 00 from c01}
    ChatServer_test.go:35: c01 recv: &{c02 msg 00 from c02}
    ChatServer_test.go:35: c00 recv: &{c01 msg 00 from c01}
    ChatServer_test.go:35: c00 recv: &{c02 msg 00 from c02}
    ChatServer_test.go:35: c02 recv: &{c02 msg 00 from c02}
    ChatServer_test.go:35: c00 recv: &{c01 msg 01 from c01}
    ChatServer_test.go:35: c01 recv: &{c00 msg 01 from c00}
    ChatServer_test.go:35: c01 recv: &{c02 msg 01 from c02}
    ChatServer_test.go:35: c02 recv: &{c01 msg 01 from c01}
    ChatServer_test.go:35: c02 recv: &{c00 msg 01 from c00}
    ChatServer_test.go:59: 2 seconds passed
    ChatServer_test.go:35: c00 recv: &{c00 msg 01 from c00}
    ChatServer_test.go:35: c02 recv: &{c02 msg 01 from c02}
    ChatServer_test.go:35: c00 recv: &{c02 msg 01 from c02}
tChatClient.postConnClosed, c00, serverFlag=false
tChatClient.postConnClosed, c02, serverFlag=false
tChatClient.postConnClosed, c01, serverFlag=false
tChatClient.postConnClosed, c02, serverFlag=true
tChatServer.handleClientClosed, c02
tChatServer.handleClientClosed, c02, clientCount=2
tChatClient.postConnClosed, c01, serverFlag=true
tChatServer.handleClientClosed, c01
tChatServer.handleClientClosed, c01, clientCount=1
    ChatServer_test.go:59: 3 seconds passed
tChatClient.postConnClosed, c00, serverFlag=true
tChatServer.handleClientClosed, c00
tChatServer.handleClientClosed, c00, clientCount=0
    ChatServer_test.go:59: 4 seconds passed
    ChatServer_test.go:59: 5 seconds passed
--- PASS: Test_ChatServer (5.00s)
PASS
ok      command-line-arguments  5.003s

IMsg.go

定义消息接口, 以及相关消息的实现. 为方便任意消息内容的解码, 消息传输时, 采用base64转码

package chat_server

import (
    "encoding/base64"
    "fmt"
)

type IMsg interface {
    Encode() string
}

type NameMsg struct {
    Name string
}

func (me *NameMsg) Encode() string {
    return fmt.Sprintf("NAME %s\\n", base64.StdEncoding.EncodeToString([]byte(me.Name)))
}

type ChatMsg struct {
    Name string
    Words string
}

func (me *ChatMsg) Encode() string {
    return fmt.Sprintf("CHAT %s %s\\n",
        base64.StdEncoding.EncodeToString([]byte(me.Name)),
        base64.StdEncoding.EncodeToString([]byte(me.Words)),
    )
}

IMsgDecoder.go

定义消息解码器及其实现

package chat_server

import (
    "encoding/base64"
    "strings"
)


type IMsgDecoder interface {
    Decode(line string) (bool, IMsg)
}

type tMsgDecoder struct {
}

func (me *tMsgDecoder) Decode(line string) (bool, IMsg) {
    items := strings.Split(line, " ")
    size := len(items)

    if items[0] == "NAME" && size == 2 {
        name, err := base64.StdEncoding.DecodeString(items[1])
        if err != nil {
            return false, nil
        }

        return true, &NameMsg{
            Name: string(name),
        }
    }

    if items[0] == "CHAT" && size == 3 {
        name, err := base64.StdEncoding.DecodeString(items[1])
        if err != nil {
            return false, nil
        }

        words, err := base64.StdEncoding.DecodeString(items[2])
        if err != nil {
            return false, nil
        }

        return true, &ChatMsg{
            Name: string(name),
            Words: string(words),
        }
    }

    return false, nil
}


var MsgDecoder = &tMsgDecoder{}

IChatClient.go

定义聊天客户端接口. 本次添加关闭通知方法, 以适配服务端.

package chat_server

type IChatClient interface {
    GetName() string
    SetName(name string)

    Send(msg IMsg)
    RecvHandler(handler ClientRecvFunc)
    CloseHandler(handler ClientCloseFunc)

    Close()
}

type ClientRecvFunc func(client IChatClient, msg IMsg)
type ClientCloseFunc func(client IChatClient)

tChatClient.go

聊天客户端, 实现IChatClient接口. 本次添加关闭通知, 写缓冲和读超时控制, 修复写循环细节问题.

package chat_server

import (
    "bufio"
    "fmt"
    "io"
    "net"
    "sync/atomic"
    "time"
)

type tChatClient struct {
    conn net.Conn
    name string
    openFlag int32
    closeFlag int32
    serverFlag bool

    closeChan chan bool
    sendChan chan IMsg

    sendLogs []IMsg
    dropLogs []IMsg
    recvLogs []IMsg
    pendingSend int32

    recvHandler ClientRecvFunc
    closeHandler ClientCloseFunc
}

var gMaxPendingSend int32 = 100

func DialChatClient(address string) (error, IChatClient) {
    conn, err := net.Dial("tcp", address)
    if err != nil {
        return err, nil
    }

    return nil, openChatClient(conn, false)
}

func openChatClient(conn net.Conn, serverFlag bool) IChatClient {
    it := &tChatClient{
        conn: conn,
        openFlag: 0,
        closeFlag: 0,
        serverFlag: serverFlag,

        closeChan: make(chan bool),
        sendChan: make(chan IMsg, gMaxPendingSend),

        name: "anonymous",
        sendLogs: []IMsg{},
        dropLogs: []IMsg{},
        recvLogs: []IMsg{},
    }
    it.open()
    return it
}


func (me *tChatClient) GetName() string {
    return me.name
}

func (me *tChatClient) SetName(name string) {
    me.name = name
}

func (me *tChatClient) open(){
    if !atomic.CompareAndSwapInt32(&me.openFlag, 0, 1) {
        return
    }

    go me.beginWrite()
    go me.beginRead()
}


func (me *tChatClient) isClosed() bool {
    return me.closeFlag != 0
}

func (me *tChatClient) isNotClosed() bool {
    return !me.isClosed()
}

func (me *tChatClient) Send(msg IMsg) {
    if me.isClosed() {
        return
    }

    if me.pendingSend < gMaxPendingSend {
        atomic.AddInt32(&me.pendingSend, 1)
        me.sendChan <- msg

    } else {
        me.dropLogs = append(me.dropLogs, msg)
    }
}

func (me *tChatClient) RecvHandler(handler ClientRecvFunc) {
    if me.isNotClosed() {
        me.recvHandler = handler
    }
}


func (me *tChatClient) CloseHandler(handler ClientCloseFunc) {
    if me.isNotClosed() {
        me.closeHandler = handler
    }
}


func (me *tChatClient) Close() {
    if me.isNotClosed() {
        me.closeConn()
    }
}

func (me *tChatClient) beginWrite() {
    writer := io.Writer(me.conn)
    for {
        select {
        case <- me.closeChan:
            _ = me.conn.Close()
            me.closeFlag = 2
            me.postConnClosed()
            return

        case msg := <- me.sendChan:
            atomic.AddInt32(&me.pendingSend, -1)
            _,e := writer.Write([]byte(msg.Encode()))
            if e != nil {
                me.closeConn()
                break
            } else {
                me.sendLogs = append(me.sendLogs, msg)
            }

        case <- time.After(time.Duration(10) * time.Second):
            me.postRecvTimeout()
            break
        }
    }
}

func (me *tChatClient) postRecvTimeout() {
    fmt.Printf("tChatClient.postRecvTimeout, %v, serverFlag=%v\\n", me.name, me.serverFlag)
    me.closeConn()
}

func (me *tChatClient) beginRead() {
    reader := bufio.NewReader(me.conn)
    for {
        line, err := reader.ReadString(\'\\n\')
        if err != nil {
            me.closeConn()
            break
        }

        ok, msg := MsgDecoder.Decode(line)
        if ok {
            fn := me.recvHandler
            if fn != nil {
                fn(me, msg)
            }

            me.recvLogs = append(me.recvLogs, msg)
        }
    }
}

func (me *tChatClient) closeConn() {
    if !atomic.CompareAndSwapInt32(&me.closeFlag, 0, 1) {
        return
    }
    me.closeChan <- true
}

func (me *tChatClient) postConnClosed() {
    fmt.Printf("tChatClient.postConnClosed, %v, serverFlag=%v\\n", me.name, me.serverFlag)

    handler := me.closeHandler
    if handler != nil {
        handler(me)
    }

    me.closeHandler = nil
    me.recvHandler = nil
}

IChatServer.go

定义聊天服务器接口, 为方便测试, 提供日志采集方法

package chat_server

type IChatServer interface {
    Open(port int) error
    Broadcast(msg IMsg)
    Close()
    GetLogs() []string
}

tChatServer.go

实现聊天服务器IChatServer

package chat_server

import (
    "errors"
    "fmt"
    "net"
    "sync"
    "sync/atomic"
)

type tChatServer struct {
    openFlag int32
    closeFlag int32

    clients []IChatClient
    clientCount int
    clientLock *sync.RWMutex

    listener net.Listener
    recvLogs []IMsg

    logs []string
}

func NewChatServer() IChatServer {
    it := &tChatServer{
        openFlag: 0,
        closeFlag: 0,

        clients: []IChatClient{},
        clientCount: 0,
        clientLock: new(sync.RWMutex),

        listener: nil,
        recvLogs: []IMsg{},
    }
    return it
}

func (me *tChatServer) Open(port int) error {
    if !atomic.CompareAndSwapInt32(&me.openFlag, 0, 1) {
        return errors.New("server already opened")
    }

    listener, err := net.Listen("tcp", fmt.Sprintf(":%v", port))
    if err != nil {
        return err
    }

    me.listener = listener
    go me.beginListening()
    return nil
}

func (me *tChatServer) logf(f string, args... interface{}) {
    msg := fmt.Sprintf(f, args...)
    me.logs = append(me.logs, msg)
    fmt.Println(msg)
}

func (me *tChatServer) GetLogs() []string {
    return me.logs
}

func (me *tChatServer) isClosed() bool {
    return me.closeFlag != 0
}

func (me *tChatServer) isNotClosed() bool {
    return !me.isClosed()
}

func (me *tChatServer) beginListening() {
    for !me.isClosed() {
        conn, err := me.listener.Accept()
        if err != nil {
            me.Close()
            break
        }

        me.handleIncomingConn(conn)
    }
}


func (me *tChatServer) Close() {
    if !atomic.CompareAndSwapInt32(&me.closeFlag, 0, 1) {
        return
    }

    _ = me.listener.Close()
    me.closeAllClients()
}

func (me *tChatServer) closeAllClients() {
    me.clientLock.Lock()
    defer me.clientLock.Unlock()

    for i,it := range me.clients {
        if it != nil {
            it.Close()
            me.clients[i] = nil
        }
    }
    me.clientCount = 0
}


func (me *tChatServer) handleIncomingConn(conn net.Conn) {
    // init client
    client := openChatClient(conn, true)
    client.RecvHandler(me.handleClientMsg)
    client.CloseHandler(me.handleClientClosed)

    // lock me.clients
    me.clientLock.Lock()
    defer me.clientLock.Unlock()

    // append to me.clients
    if len(me.clients) > me.clientCount {
        me.clients[me.clientCount] = client
    } else {
        me.clients = append(me.clients, client)
    }
    me.clientCount++

    me.logf("tChatServer.handleIncomingConn, clientCount=%v", me.clientCount)
}

func (me *tChatServer) handleClientMsg(client IChatClient, msg IMsg) {
    me.recvLogs = append(me.recvLogs, msg)

    if nameMsg,ok := msg.(*NameMsg);ok {
        client.SetName(nameMsg.Name)

    } else if _, ok := msg.(*ChatMsg);ok {
        me.Broadcast(msg)
    }
}

func (me *tChatServer) handleClientClosed(client IChatClient) {
    me.logf("tChatServer.handleClientClosed, %s", client.GetName())

    me.clientLock.Lock()
    defer me.clientLock.Unlock()

    if me.clientCount <= 0 {
        return
    }

    lastI := me.clientCount - 1
    for i,it := range me.clients {
        if it == client {
            if i == lastI {
                me.clients[i] = nil
            } else {
                me.clients[i], me.clients[lastI] = me.clients[lastI], nil
            }
            me.clientCount--
            break
        }
    }

    me.logf("tChatServer.handleClientClosed, %s, clientCount=%v", client.GetName(), me.clientCount)
}

func (me *tChatServer) Broadcast(msg IMsg) {
    me.clientLock.RLock()
    defer me.clientLock.RUnlock()

    for _,it := range me.clients {
        if it != nil {
            it.Send(msg)
        }
    }
}

(未完待续)

以上是关于手撸golang GO与微服务 ChatServer之2的主要内容,如果未能解决你的问题,请参考以下文章

手撸golang GO与微服务 聚合模式之2

手撸golang GO与微服务 ChatServer之1

手撸golang GO与微服务 ChatServer之2

手撸golang GO与微服务 ES-CQRS模式之2

手撸golang GO与微服务 ES-CQRS模式之1

手撸golang GO与微服务 Saga模式之8 集成测试