golang中如何释放websocket和redis网关服务器资源?

Posted

技术标签:

【中文标题】golang中如何释放websocket和redis网关服务器资源?【英文标题】:How to release a websocket and redis gateway server resource in golang? 【发布时间】:2019-04-12 07:06:36 【问题描述】:

我有一个网关服务器,它可以使用 websocket 将消息推送到客户端,一个新的客户端连接到我的服务器,我将为它生成一个cid。然后我还订阅了一个使用cid 的频道。如果有任何消息发布到此频道,我的服务器会将其推送到客户端。目前,所有单元都工作正常,但是当我尝试通过thor 进行基准测试时,它会崩溃,我很好DeliverMessage 有一些问题,它永远不会退出,因为它有一个死循环。但是由于redis需要订阅一些东西,我不知道如何避免循环。

func (h *Hub) DeliverMessage(pool *redis.Pool) 
    conn := pool.Get()
    defer conn.Close()
    var gPubSubConn *redis.PubSubConn
    gPubSubConn = &redis.PubSubConnConn: conn
    defer gPubSubConn.Close()

    for 
        switch v := gPubSubConn.Receive().(type) 
        case redis.Message:
            // fmt.Printf("Channel=%q |  Data=%s\n", v.Channel, string(v.Data))
            h.Push(string(v.Data))
        case redis.Subscription:
            fmt.Printf("Subscription message: %s : %s %d\n", v.Channel, v.Kind, v.Count)
        case error:
            fmt.Println("Error pub/sub, delivery has stopped", v)
            panic("Error pub/sub")
        
    

在主函数中,我将上述函数调用为:

go h.DeliverMessage(pool)

但是当我用巨大的连接测试它时,它会给我一些错误,例如:

已达到 ERR 最大客户端数

所以,我通过更改 MaxIdle 来更改 redis 池大小:

func newPool(addr string) *redis.Pool 
    return &redis.Pool
        MaxIdle:     5000,
        IdleTimeout: 240 * time.Second,
        Dial:        func() (redis.Conn, error)  return redis.Dial("tcp", addr) ,
    

但它仍然不起作用,所以我想知道,在我的 websocket 在以下选择中与我的服务器断开连接后,是否有任何好的方法可以杀死 goroutine:

case client := <-h.Unregister:
    if _, ok := h.Clients[client]; ok 
        delete(h.Clients, client)
        delete(h.Connections, client.CID)
        close(client.Send)
        if err := gPubSubConn.Unsubscribe(client.CID); err != nil 
            panic(err)
        
        // TODO kill subscribe goroutine if don't client-side disconnected ...

    

但是我如何识别这个 goroutine?我怎么能像unix 那样做呢。 kill -9 &lt;PID&gt;?

【问题讨论】:

How to stop one of multilpe of the same goroutine的可能重复 看起来您正在使用conn := pool.Get() 建立连接?如果您多次运行go h.DeliverMessage(pool),那么我猜您可能会收到“达到最大连接数/客户端数”错误。或许你应该先conn := pool.Get() 看看是否有可用的连接(如果没有,可能会阻塞?),然后将连接传递给 goroutine 而不是整个池。 你可以看到我的代码有conn := pool.Get() defer conn.Close(),如果我没有得到一个池作为参数,它会在我交付后断开连接.. 【参考方案1】:

看例子here

一旦您没有收到更多信息,您可以通过在 DeliverMessage 的 switch case 中添加一个 return 语句来退出 goroutine。我猜case error,或者如示例中所见,case 0 你想从那里返回,你的 goroutine 将取消。或者,如果我对事情有误解,并且 case client := &lt;-h.Unregister: 在 DeliverMessage 中,请返回。

您还关闭了两次连接。 defer gPubSubConn.Close() 只需调用 conn.Close() 所以你不需要 defer conn.Close()

还可以查看the Pool 并查看所有参数的实际作用。如果要处理许多连接,请将 MaxActive 设置为 0 “当为零时,池中的连接数没有限制。” (你真的想要空闲超时吗?)

【讨论】:

【参考方案2】:

其实我弄错了设计架构,我来解释一下我想做什么。

    客户端可以连接到我的 websocket 服务器;

    服务器有多个http的handler,admin可以通过handler发布数据,数据的结构可以是:

    
       "cid": "something",
       "body": 
        
    
    

因为,我有几个节点正在运行来为我们的客户端提供服务,nginx 可以将来自admin 的每个请求分派到完全不同的节点,但是只有一个节点通过“某事”保持与cid 的连接,所以我需要把这个数据发布到Redis,如果有任何节点得到了数据,它就会把这个消息发送到客户端。

3.寻找 NodeID,我将通过给定 cid 将其转到 Publish

// redis code & golang
NodeID, err := conn.Do("HGET", "NODE_MAP", cid)

4.现在,我可以从admin发布任何消息,并发布到NodeID,这是我们在第3步得到的。

// redis code & golang
NodeID, err := conn.Do("PUBLISH", NodeID, data)

    是时候展示与这个问题相关的核心代码了。我要订阅一个频道,名字是 NodeID。像下面这样。

    go func()
      for 
        switch v := gPubSubConn.Receive().(type) 
        case redis.Message:
            fmt.Println("Got a message", v.Data)
            h.Broadcast <- v.Data
            pipeline <- v.Data
        case error:
            panic(v)
        
      
    ()
    

6.要管理您的 websocket,您还需要一个 goroutine 来执行此操作。像下面这样:

   go func () 
     for 
            select 
            case client := <-h.Register:
                h.Clients[client] = true
                cid := client.CID
                h.Connections[cid] = client

                body := "something"
                client.Send <- msg // greeting

            case client := <-h.Unregister:
                if _, ok := h.Clients[client]; ok 
                    delete(h.Clients, client)
                    delete(h.Connections, client.CID)
                    close(client.Send)
                
            case message := <-h.Broadcast:
                fmt.Println("message is", message)
            
        
      ()

    最后一件事是管理一个 redis 池,你现在并不真的需要一个连接池。因为我们只有两个goroutine,一个主进程。

    func newPool(addr string) *redis.Pool 
        return &redis.Pool
            MaxIdle:     100,
            IdleTimeout: 240 * time.Second,
            Dial:        func() (redis.Conn, error)  return redis.Dial("tcp", addr) ,
        
    
    
    var (
        pool        *redis.Pool
        redisServer = flag.String("redisServer", ":6379", "")
    )
    pool = newPool(*redisServer)
    conn := pool.Get()
    defer conn.Close()
    

【讨论】:

以上是关于golang中如何释放websocket和redis网关服务器资源?的主要内容,如果未能解决你的问题,请参考以下文章

node.js中如何配置redis与连接池?

发送连接升级后,如何在golang中将客户端http连接升级到websockets

golang 分布式锁

golang websocket

golang开发:类库篇 Redis连接池的使用

golang 如何连接redis --- 2022-04-03