处理多个 websocket 连接
Posted
技术标签:
【中文标题】处理多个 websocket 连接【英文标题】:Handling multiple websocket connections 【发布时间】:2019-06-13 19:51:04 【问题描述】:我正在尝试创建一个程序,该程序将通过 gorilla web-sockets 连接到多个服务器。我目前有一个程序,它将遍历服务器地址列表并创建一个新的 goroutine,它将创建自己的 Websocket.conn 并处理读取和写入。
问题是每次创建一个新的 goroutine 时,之前的 goroutine 都会被阻塞,只有最后一个可以继续。我相信这是因为 gorilla websocket 库阻塞了每个 gorotutine,但我可能弄错了。
我尝试在服务器列表迭代器中放置一个计时器,每个 goroutine 都可以正常工作,但是当使用另一个地址创建一个新的 goroutine 时,前一个 goroutine 被阻塞。
我的代码的相关位:
在我的main.go
for _, server := range servers
go control(ctx, server, port)
在control()
func control(ctx context.Context, server, port string)
url := url.URL
Scheme: "ws",
Host: server + ":" + port,
Path: "",
conn, _, err := websocket.DefaultDialer.Dial(url.String(), nil)
if err != nil
panic(err)
defer conn.Close()
go sendHandler(ctx, conn)
go readHandler(ctx, conn)
readHandler(ctx context.Context, conn *websocket.Con)
for
_, p, err := conn.ReadMessage(); if err != nil
panic(err)
select
case <-ctx.Done():
goto TERM
default:
// do nothing
TERM:
// do termination
sendHandler(ctx context.Context, conn *websocket.Con)
for _, msg := range msges
err = conn.WriteMessage(websocket.TextMessage, msg)
if err != nil
panic(err)
<-ctx.Done()
我删除了添加等待组和其他不必要代码的部分。
所以我期望有 3n 个 goroutines 运行(其中 n 是服务器的数量)而不会阻塞,但现在我看到只有 3 个 goroutines 在运行,它们是服务器列表的最后一次迭代调用的那些。
谢谢!
编辑 2019 年 6 月 14 日:
我花了一些时间制作一个小的工作示例,在示例中没有出现错误 - 没有一个线程相互阻塞。我仍然不确定是什么原因造成的,但这是我的小型工作示例:
main.go
package main
import (
"context"
"fmt"
"os"
"time"
"os/signal"
"syscall"
"sync"
"net/url"
"github.com/gorilla/websocket"
)
func main()
servers := []string"5555","5556", "5557"
comms := make(chan os.Signal, 1)
signal.Notify(comms, os.Interrupt, syscall.SIGTERM)
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
var wg sync.WaitGroup
for _, server := range servers
wg.Add(1)
go control(server,
ctx,
&wg)
<-comms
cancel()
wg.Wait()
func control(server string, ctx context.Context, wg *sync.WaitGroup)
fmt.Printf("Started control for %s\n", server)
url := url.URL
Scheme: "ws",
Host: "0.0.0.0" + ":" + server,
Path: "",
conn, _, err := websocket.DefaultDialer.Dial(url.String(), nil)
if err != nil
panic(err)
defer conn.Close()
var localwg sync.WaitGroup
localwg.Add(1)
go sendHandler(ctx, conn, &localwg, server)
localwg.Add(1)
go readHandler(ctx, conn, &localwg, server)
<- ctx.Done()
localwg.Wait()
wg.Done()
return
func sendHandler(ctx context.Context, conn *websocket.Conn, wg *sync.WaitGroup, server string)
for i := 0; i < 50; i++
err := conn.WriteMessage(websocket.TextMessage, []byte("ping"))
if err != nil
panic(err)
fmt.Printf("sent msg to %s\n", server)
time.Sleep(1 * time.Second)
<- ctx.Done()
wg.Done()
func readHandler(ctx context.Context, conn *websocket.Conn, wg *sync.WaitGroup, server string)
for
select
case <- ctx.Done():
wg.Done()
return
default:
_, p, err := conn.ReadMessage()
if err != nil
wg.Done()
fmt.Println("done")
fmt.Printf("Got [%s] from %s\n", string(p), server)
我分别在 5555、5556 和 5557 上的服务器上使用 dpallot 的 simple-websocket-server 对其进行了测试。
【问题讨论】:
sendHandler
和 readHandler
没有显示,所以,谁知道呢,可能是任何东西。我知道,既然你 defer conn.Close()
然后在立即返回(从而关闭连接)之前启动两个使用 conn
的 goroutine,那么这些函数将根本无法使用 conn
。
如帖子中所述,我有等待组,可以防止 defer conn.Close()
在 sendHandler
和 ReadHandler
终止之前运行。我可以添加sendHandler
和ReadHandler
,但它们是非常简单的函数。
这些不是代码中唯一相关的部分。正确的minimal reproducible example 将允许某人按照描述复制问题
问题中的程序将在其中一个处理函数中出现恐慌而退出。发布实际代码。大猩猩连接是相互独立的。一个连接上的方法调用不会导致其他连接阻塞。
添加了一个最小的可重现示例,它没有相同的阻塞问题。我对错误是什么并不完全有信心,但感谢您告诉我做这个例子! ^
【参考方案1】:
这部分代码导致了问题:
conn, _, err := websocket.DefaultDialer.Dial(url.String(), nil)
if err != nil
panic(err)
defer conn.Close()
go sendHandler(ctx, conn)
go readHandler(ctx, conn)
你创建连接,延迟关闭,启动另外两个 goroutine,然后结束函数。由于您的延迟,函数 end 关闭了套接字。
【讨论】:
我删除了defer conn.Close()
并且同样的事情发生了,只要一个新的 goroutine 创建一个新的 websocket 另一个阻塞。我正在尝试获取一个小示例,但它有点具有挑战性,因为我需要找到一些示例服务器。以上是关于处理多个 websocket 连接的主要内容,如果未能解决你的问题,请参考以下文章