用 Go 实现 TCP 连接的双向拷贝

Posted 51reboot运维开发

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了用 Go 实现 TCP 连接的双向拷贝相关的知识,希望对你有一定的参考价值。


本文主要给大家介绍了关于 Golang 实现 TCP 连接的双向拷贝的相关内容,分享出来供大家参考学习,下面话不多说了,来一起看看详细的介绍吧。


最简单的实现


每次来一个 Server 的连接,就新开一个 Client 的连接。用一个 goroutine 从 server 拷贝到 client,再用另外一个 goroutine 从 client 拷贝到 server。任何一方断开连接,双向都断开连接。

func main() {    runtime.GOMAXPROCS(1)    listener, err := net.Listen("tcp", "127.0.0.1:8848")    if err != nil {        panic(err)    }    for {        conn, err := listener.Accept()        if err != nil {            panic(err)        }        go handle(conn.(*net.TCPConn))    } } func handle(server *net.TCPConn) {    defer server.Close()    client, err := net.Dial("tcp", "127.0.0.1:8849")    if err != nil {        fmt.Print(err)        return    }    defer client.Close()    go func() {        defer server.Close()        defer client.Close()        buf := make([]byte, 2048)        io.CopyBuffer(server, client, buf)    }()    buf := make([]byte, 2048)    io.CopyBuffer(client, server, buf) }

一个值得注意的地方是 io.Copy 的默认 buffer 比较大,给一个小的 buffer 可以支持更多的并发连接。


这两个 goroutine 并序在一个退出之后,另外一个也退出。这个的实现是通过关闭 server 或者 client 的 socket 来实现的。因为 socket 被关闭了,io.CopyBuffer 就会退出。


Client 端实现连接池


一个显而易见的问题是,每次 Server 的连接进来之后都需要临时去建立一个新的 Client 的端的连接。这样在代理的总耗时里就包括了一个 tcp 连接的握手时间。如果能够让 Client 端实现连接池复用已有连接的话,可以缩短端到端的延迟。

var pool = make(chan net.Conn, 100)

func borrow() (net.Conn, error) {
    select {
    case conn := <- pool:
        return conn, nil
    default:
        return net.Dial("tcp", "127.0.0.1:8849")
    }
}

func release(conn net.Conn) error {
    select {
    case pool <- conn:
        // returned to pool
        return nil
    default:
        // pool is overflow
        return conn.Close()
    }
}

func handle(server *net.TCPConn) {
    defer server.Close()
    client, err := borrow()
    if err != nil {
        fmt.Print(err)
        return
    }
    defer release(client)
    go func() {
        defer server.Close()
        defer release(client)
        buf := make([]byte, 2048)
        io.CopyBuffer(server, client, buf)
    }()
    buf := make([]byte, 2048)
    io.CopyBuffer(client, server, buf)
}

这个版本的实现是显而易见有问题的。因为连接在归还到池里的时候并不能保证是还保持连接的状态。另外一个更严重的问题是,因为 client 的连接不再被关闭了,当 server 端关闭连接时,从 client 向 server 做 io.CopyBuffer 的goroutine 就无法退出了。


所以,有以下几个问题要解决:


  • 如何在一个 goroutine 时退出时另外一个 goroutine 也退出?

  • 怎么保证归还给 pool 的连接是有效的?

  • 怎么保持在 pool 中的连接仍然是一直有效的?


通过 SetDeadine 中断 Goroutine


一个普遍的观点是 Goroutine 是无法被中断的。当一个 Goroutine 在做conn.Read 时,这个协程就被阻塞在那里了。实际上并不是毫无办法的,我们可以通过 conn.Close 来中断 Goroutine。但是在连接池的情况下,又无法Close 链接。另外一种做法就是通过 SetDeadline 为一个过去的时间戳来中断当前正在进行的阻塞读或者阻塞写。

var pool = make(chan net.Conn, 100) type client struct {    conn net.Conn    inUse *sync.WaitGroup } func borrow() (clt *client, err error) {    var conn net.Conn    select {    case conn = <- pool:    default:        conn, err = net.Dial("tcp", "127.0.0.1:18849")    }    if err != nil {        return nil, err    }    clt = &client{        conn: conn,        inUse: &sync.WaitGroup{},    }    return } func release(clt *client) error {    clt.conn.SetDeadline(time.Now().Add(-time.Second))    clt.inUse.Done()    clt.inUse.Wait()    select {    case pool <- clt.conn:        // returned to pool        return nil    default:        // pool is overflow        return clt.conn.Close()    } } func handle(server *net.TCPConn) {    defer server.Close()    clt, err := borrow()    if err != nil {        fmt.Print(err)        return    }    clt.inUse.Add(1)    defer release(clt)    go func() {        clt.inUse.Add(1)        defer server.Close()        defer release(clt)        buf := make([]byte, 2048)        io.CopyBuffer(server, clt.conn, buf)    }()    buf := make([]byte, 2048)    io.CopyBuffer(clt.conn, server, buf) }

通过 SetDeadline 实现了 goroutine 的中断,然后通过 sync.WaitGroup 来保证这些使用方都退出了之后再归还给连接池。否则一个连接被复用的时候,之前的使用方可能还没有退出。


连接有效性


为了保证在归还给 pool 之前,连接仍然是有效的。连接在被读写的过程中如果发现了 error,我们就要标记这个连接是有问题的,会释放之后直接 close掉。但是 SetDeadline 必然会导致读取或者写入的时候出现一次 timeout 的错误,所以还需要把 timeout 排除掉。

var pool = make(chan net.Conn, 100) type client struct {    conn net.Conn    inUse *sync.WaitGroup    isValid int32 } const maybeValid = 0 const isValid = 1 const isInvalid = 2 func (clt *client) Read(b []byte) (n int, err error) {    n, err = clt.conn.Read(b)    if err != nil {        if !isTimeoutError(err) {            atomic.StoreInt32(&clt.isValid, isInvalid)        }    } else {        atomic.StoreInt32(&clt.isValid, isValid)    }    return } func (clt *client) Write(b []byte) (n int, err error) {    n, err = clt.conn.Write(b)    if err != nil {        if !isTimeoutError(err) {            atomic.StoreInt32(&clt.isValid, isInvalid)        }    } else {        atomic.StoreInt32(&clt.isValid, isValid)    }    return } type timeoutErr interface {    Timeout() bool } func isTimeoutError(err error) bool {    timeoutErr, _ := err.(timeoutErr)    if timeoutErr == nil {        return false    }    return timeoutErr.Timeout() } func borrow() (clt *client, err error) {    var conn net.Conn    select {    case conn = <- pool:    default:        conn, err = net.Dial("tcp", "127.0.0.1:18849")    }    if err != nil {        return nil, err    }    clt = &client{        conn: conn,        inUse: &sync.WaitGroup{},        isValid: maybeValid,    }    return } func release(clt *client) error {    clt.conn.SetDeadline(time.Now().Add(-time.Second))    clt.inUse.Done()    clt.inUse.Wait()    if clt.isValid == isValid {        return clt.conn.Close()    }    select {    case pool <- clt.conn:        // returned to pool        return nil    default:        // pool is overflow        return clt.conn.Close()    } } func handle(server *net.TCPConn) {    defer server.Close()    clt, err := borrow()    if err != nil {        fmt.Print(err)        return    }    clt.inUse.Add(1)    defer release(clt)    go func() {        clt.inUse.Add(1)        defer server.Close()        defer release(clt)        buf := make([]byte, 2048)        io.CopyBuffer(server, clt, buf)    }()    buf := make([]byte, 2048)    io.CopyBuffer(clt, server, buf) }

判断 error 是否是 timeout 需要类型强转来实现。


对于连接池里的 conn 是否仍然是有效的,如果用后台不断 ping 的方式来实现成本比较高。因为不同的协议要连接保持需要不同的 ping 的方式。一个最简单的办法就是下次用的时候试一下。如果连接不好用了,则改成新建一个连接,避免连续拿到无效的连接。通过这种方式把无效的连接给淘汰掉。


原文链接:https://studygolang.com/articles/11167

转载|Go语言中文网



Golang 实战班第2期火热报名进行中


招生要求:


有 Linux 基础,有志于使用 Go 语言做分布式系统编程的人员,想往系统架构师方向发展的同学。BAT 架构师带你一起飞。


课程内容:


  • Golang入门

  • Golang程序结构

  • Golang的基础数据类型

  • Golang复合数据类型

  • Golang的函数

  • Golang的方法

  • Golang的接口

  • Golang的协程和Channel

  • Golang基于共享变量的并发

  • Golang包和工具


上课模式:网络直播班    线下面授班


咨询报名联系:

QQ(1):979950755    小月   

QQ(2):279312229    ada   

WeChat : 1902433859   小月

WeChat : 1251743084   小单


开课时间10月14日(周六)


课程大纲http://51reboot.com/course/go/

(阅读原文,即可跳转)



以上是关于用 Go 实现 TCP 连接的双向拷贝的主要内容,如果未能解决你的问题,请参考以下文章

Go: WebSockets单元测试

Go语言实战简简单单的几十行代码实现 TCP 通信

[Go] 测试go连接imap的tcp长连接

怎样用python3.4建立一个双向通信的,监听10用户的socket服务器

Android:通过MQTT实现用户端与服务器数据双向传输

初识socket