golang中如何释放websocket和redis网关服务器资源?
Posted
技术标签:
【中文标题】golang中如何释放websocket和redis网关服务器资源?【英文标题】:How to release a websocket and redis gateway server resource in golang? 【发布时间】:2019-04-12 07:06:36 【问题描述】:我有一个网关服务器,它可以使用 websocket 将消息推送到客户端,一个新的客户端连接到我的服务器,我将为它生成一个cid
。然后我还订阅了一个使用cid
的频道。如果有任何消息发布到此频道,我的服务器会将其推送到客户端。目前,所有单元都工作正常,但是当我尝试通过thor 进行基准测试时,它会崩溃,我很好DeliverMessage
有一些问题,它永远不会退出,因为它有一个死循环。但是由于redis需要订阅一些东西,我不知道如何避免循环。
func (h *Hub) DeliverMessage(pool *redis.Pool)
conn := pool.Get()
defer conn.Close()
var gPubSubConn *redis.PubSubConn
gPubSubConn = &redis.PubSubConnConn: conn
defer gPubSubConn.Close()
for
switch v := gPubSubConn.Receive().(type)
case redis.Message:
// fmt.Printf("Channel=%q | Data=%s\n", v.Channel, string(v.Data))
h.Push(string(v.Data))
case redis.Subscription:
fmt.Printf("Subscription message: %s : %s %d\n", v.Channel, v.Kind, v.Count)
case error:
fmt.Println("Error pub/sub, delivery has stopped", v)
panic("Error pub/sub")
在主函数中,我将上述函数调用为:
go h.DeliverMessage(pool)
但是当我用巨大的连接测试它时,它会给我一些错误,例如:
已达到 ERR 最大客户端数
所以,我通过更改 MaxIdle
来更改 redis 池大小:
func newPool(addr string) *redis.Pool
return &redis.Pool
MaxIdle: 5000,
IdleTimeout: 240 * time.Second,
Dial: func() (redis.Conn, error) return redis.Dial("tcp", addr) ,
但它仍然不起作用,所以我想知道,在我的 websocket 在以下选择中与我的服务器断开连接后,是否有任何好的方法可以杀死 goroutine:
case client := <-h.Unregister:
if _, ok := h.Clients[client]; ok
delete(h.Clients, client)
delete(h.Connections, client.CID)
close(client.Send)
if err := gPubSubConn.Unsubscribe(client.CID); err != nil
panic(err)
// TODO kill subscribe goroutine if don't client-side disconnected ...
但是我如何识别这个 goroutine?我怎么能像unix
那样做呢。 kill -9 <PID>
?
【问题讨论】:
How to stop one of multilpe of the same goroutine的可能重复 看起来您正在使用conn := pool.Get()
建立连接?如果您多次运行go h.DeliverMessage(pool)
,那么我猜您可能会收到“达到最大连接数/客户端数”错误。或许你应该先conn := pool.Get()
看看是否有可用的连接(如果没有,可能会阻塞?),然后将连接传递给 goroutine 而不是整个池。
你可以看到我的代码有conn := pool.Get() defer conn.Close()
,如果我没有得到一个池作为参数,它会在我交付后断开连接..
【参考方案1】:
看例子here
一旦您没有收到更多信息,您可以通过在 DeliverMessage 的 switch case 中添加一个 return 语句来退出 goroutine。我猜case error
,或者如示例中所见,case 0
你想从那里返回,你的 goroutine 将取消。或者,如果我对事情有误解,并且 case client := <-h.Unregister:
在 DeliverMessage 中,请返回。
您还关闭了两次连接。 defer gPubSubConn.Close()
只需调用 conn.Close() 所以你不需要 defer conn.Close()
还可以查看the Pool 并查看所有参数的实际作用。如果要处理许多连接,请将 MaxActive 设置为 0 “当为零时,池中的连接数没有限制。” (你真的想要空闲超时吗?)
【讨论】:
【参考方案2】:其实我弄错了设计架构,我来解释一下我想做什么。
客户端可以连接到我的 websocket 服务器;
服务器有多个http的handler,admin可以通过handler发布数据,数据的结构可以是:
"cid": "something",
"body":
因为,我有几个节点正在运行来为我们的客户端提供服务,nginx 可以将来自
admin
的每个请求分派到完全不同的节点,但是只有一个节点通过“某事”保持与cid
的连接,所以我需要把这个数据发布到Redis
,如果有任何节点得到了数据,它就会把这个消息发送到客户端。
3.寻找 NodeID,我将通过给定 cid 将其转到 Publish
。
// redis code & golang
NodeID, err := conn.Do("HGET", "NODE_MAP", cid)
4.现在,我可以从admin
发布任何消息,并发布到NodeID
,这是我们在第3步得到的。
// redis code & golang
NodeID, err := conn.Do("PUBLISH", NodeID, data)
是时候展示与这个问题相关的核心代码了。我要订阅一个频道,名字是 NodeID。像下面这样。
go func()
for
switch v := gPubSubConn.Receive().(type)
case redis.Message:
fmt.Println("Got a message", v.Data)
h.Broadcast <- v.Data
pipeline <- v.Data
case error:
panic(v)
()
6.要管理您的 websocket,您还需要一个 goroutine 来执行此操作。像下面这样:
go func ()
for
select
case client := <-h.Register:
h.Clients[client] = true
cid := client.CID
h.Connections[cid] = client
body := "something"
client.Send <- msg // greeting
case client := <-h.Unregister:
if _, ok := h.Clients[client]; ok
delete(h.Clients, client)
delete(h.Connections, client.CID)
close(client.Send)
case message := <-h.Broadcast:
fmt.Println("message is", message)
()
最后一件事是管理一个 redis 池,你现在并不真的需要一个连接池。因为我们只有两个goroutine
,一个主进程。
func newPool(addr string) *redis.Pool
return &redis.Pool
MaxIdle: 100,
IdleTimeout: 240 * time.Second,
Dial: func() (redis.Conn, error) return redis.Dial("tcp", addr) ,
var (
pool *redis.Pool
redisServer = flag.String("redisServer", ":6379", "")
)
pool = newPool(*redisServer)
conn := pool.Get()
defer conn.Close()
【讨论】:
以上是关于golang中如何释放websocket和redis网关服务器资源?的主要内容,如果未能解决你的问题,请参考以下文章