处理多个 websocket 连接

Posted

技术标签:

【中文标题】处理多个 websocket 连接【英文标题】:Handling multiple websocket connections 【发布时间】:2019-06-13 19:51:04 【问题描述】:

我正在尝试创建一个程序,该程序将通过 gorilla web-sockets 连接到多个服务器。我目前有一个程序,它将遍历服务器地址列表并创建一个新的 goroutine,它将创建自己的 Websocket.conn 并处理读取和写入。

问题是每次创建一个新的 goroutine 时,之前的 goroutine 都会被阻塞,只有最后一个可以继续。我相信这是因为 gorilla websocket 库阻塞了每个 gorotutine,但我可能弄错了。

我尝试在服务器列表迭代器中放置一个计时器,每个 goroutine 都可以正常工作,但是当使用另一个地址创建一个新的 goroutine 时,前一个 goroutine 被阻塞。

我的代码的相关位:

在我的main.go

for _, server := range servers 
  go control(ctx, server, port)

control()


func control(ctx context.Context, server, port string)  
  url := url.URL
    Scheme: "ws",
    Host: server + ":" + port,
    Path: "",
  
  conn, _, err := websocket.DefaultDialer.Dial(url.String(), nil)
  if err != nil 
    panic(err)
  
  defer conn.Close()
  go sendHandler(ctx, conn)
  go readHandler(ctx, conn)


readHandler(ctx context.Context, conn *websocket.Con) 
  for 
    _, p, err := conn.ReadMessage(); if err != nil 
      panic(err)
    
    select 
      case <-ctx.Done():
        goto TERM
      default:
        // do nothing
    
  
  TERM:
  // do termination  


sendHandler(ctx context.Context, conn *websocket.Con) 
  for _, msg := range msges 
    err = conn.WriteMessage(websocket.TextMessage, msg)
    if err != nil 
      panic(err)
    
  
  <-ctx.Done()

我删除了添加等待组和其他不必要代码的部分。

所以我期望有 3n 个 goroutines 运行(其中 n 是服务器的数量)而不会阻塞,但现在我看到只有 3 个 goroutines 在运行,它们是服务器列表的最后一次迭代调用的那些。

谢谢!

编辑 2019 年 6 月 14 日:

我花了一些时间制作一个小的工作示例,在示例中没有出现错误 - 没有一个线程相互阻塞。我仍然不确定是什么原因造成的,但这是我的小型工作示例:

main.go

package main

import (
    "context"
    "fmt"
    "os"
    "time"
    "os/signal"
    "syscall"
    "sync"
    "net/url"
    "github.com/gorilla/websocket"
    )

func main() 
    servers := []string"5555","5556", "5557"
    comms := make(chan os.Signal, 1)
    signal.Notify(comms, os.Interrupt, syscall.SIGTERM)

    ctx := context.Background()
    ctx, cancel := context.WithCancel(ctx)
    var wg sync.WaitGroup

    for _, server := range servers 
        wg.Add(1)
        go control(server,
                   ctx,
                   &wg)
    

    <-comms
    cancel()
    wg.Wait()


func control(server string, ctx context.Context, wg *sync.WaitGroup) 
    fmt.Printf("Started control for %s\n", server)
    url := url.URL 
        Scheme: "ws",
        Host: "0.0.0.0" + ":" + server,
        Path: "",
    
    conn, _, err := websocket.DefaultDialer.Dial(url.String(), nil)
    if err != nil 
        panic(err)
    
    defer conn.Close()

    var localwg sync.WaitGroup

    localwg.Add(1)
    go sendHandler(ctx, conn, &localwg, server)
    localwg.Add(1)
    go readHandler(ctx, conn, &localwg, server)

    <- ctx.Done()
    localwg.Wait()
    wg.Done()
    return


func sendHandler(ctx context.Context, conn *websocket.Conn, wg *sync.WaitGroup, server string) 
    for i := 0; i < 50; i++ 
        err := conn.WriteMessage(websocket.TextMessage, []byte("ping"))
        if err != nil 
            panic(err)
        
        fmt.Printf("sent msg to %s\n", server)
        time.Sleep(1 * time.Second)
    
    <- ctx.Done()
    wg.Done()


func readHandler(ctx context.Context, conn *websocket.Conn, wg *sync.WaitGroup, server string) 
    for 

        select 

            case <- ctx.Done():
                wg.Done()
                return
            default:
                _, p, err :=  conn.ReadMessage()
                if err != nil 
                    wg.Done()
                    fmt.Println("done")
                
                fmt.Printf("Got [%s] from %s\n", string(p), server)
        
    

我分别在 5555、5556 和 5557 上的服务器上使用 dpallot 的 simple-websocket-server 对其进行了测试。

【问题讨论】:

sendHandlerreadHandler 没有显示,所以,谁知道呢,可能是任何东西。我知道,既然你 defer conn.Close() 然后在立即返回(从而关闭连接)之前启动两个使用 conn 的 goroutine,那么这些函数将根本无法使用 conn 如帖子中所述,我有等待组,可以防止 defer conn.Close()sendHandlerReadHandler 终止之前运行。我可以添加sendHandlerReadHandler,但它们是非常简单的函数。 这些不是代码中唯一相关的部分。正确的minimal reproducible example 将允许某人按照描述复制问题 问题中的程序将在其中一个处理函数中出现恐慌而退出。发布实际代码。大猩猩连接是相互独立的。一个连接上的方法调用不会导致其他连接阻塞。 添加了一个最小的可重现示例,它没有相同的阻塞问题。我对错误是什么并不完全有信心,但感谢您告诉我做这个例子! ^ 【参考方案1】:

这部分代码导致了问题:

conn, _, err := websocket.DefaultDialer.Dial(url.String(), nil)
if err != nil 
    panic(err)

defer conn.Close()
go sendHandler(ctx, conn)
go readHandler(ctx, conn)

你创建连接,延迟关闭,启动另外两个 goroutine,然后结束函数。由于您的延迟,函数 end 关闭了套接字。

【讨论】:

我删除了 defer conn.Close() 并且同样的事情发生了,只要一个新的 goroutine 创建一个新的 websocket 另一个阻塞。我正在尝试获取一个小示例,但它有点具有挑战性,因为我需要找到一些示例服务器。

以上是关于处理多个 websocket 连接的主要内容,如果未能解决你的问题,请参考以下文章

websocket实现心跳连接

如何忽略所有与 mitmproxy 的 websocket 连接?

015_NGINX作为WebSocket Proxy的设置

实现单台测试机6万websocket长连接

实现单台测试机6万websocket长连接

websocke