golang--Channel有无缓存区别,以及关闭原则

Posted ivan-blog

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了golang--Channel有无缓存区别,以及关闭原则相关的知识,希望对你有一定的参考价值。

有无缓存的区别

无缓存并不等价于缓存为1

func main(){
    ch := make(chan int)
    ch <- 1
}

这句话会报错,当向无缓存的chan放数据时,如果一直没有接收者,那么它会一直堵塞,直到有接收者。

无缓冲的 就是一个送信人去你家门口送信,你不在家他不走,你一定要接下信,他才会走。无缓冲保证信能到你手上有缓冲的 就是一个送信人去你家仍到你家的信箱转身就走 ,除非你的信箱满了 他必须等信箱空下来。有缓冲的 保证 信能进你家的邮箱

关闭channel

参考,这篇文章已经总结的很好

The Channel Closing Principle(channel 关闭原则):

在使用Go channel的时候,一个适用的原则是不要从接收端关闭channel,也不要关闭有多个并发发送者的channel。换句话说,如果sender(发送者)只是唯一的sender或者是channel最后一个活跃的sender,那么你应该在sender的goroutine关闭channel,从而通知receiver(s)(接收者们)已经没有值可以读了。维持这条原则将保证永远不会发生向一个已经关闭的channel发送值或者关闭一个已经关闭的channel。

优雅方案

M个receivers,一个sender

sender通过关闭data channel说“不再发送”,这是最简单的场景了,就只是当sender不想再发送的时候让sender关闭data 来关闭channel:

package main

import (
    "time"
    "math/rand"
    "sync"
    "log"
)

func main() {
    rand.Seed(time.Now().UnixNano())
    log.SetFlags(0)
    
    // ...
    const MaxRandomNumber = 100000
    const NumReceivers = 100
    
    wgReceivers := sync.WaitGroup{}
    wgReceivers.Add(NumReceivers)
    
    // ...
    dataCh := make(chan int, 100)
    
    // the sender
    go func() {
        for {
            if value := rand.Intn(MaxRandomNumber); value == 0 {
                // the only sender can close the channel safely.
                close(dataCh)
                return
            } else {            
                dataCh <- value
            }
        }
    }()
    
    // receivers
    for i := 0; i < NumReceivers; i++ {
        go func() {
            defer wgReceivers.Done()
            
            // receive values until dataCh is closed and
            // the value buffer queue of dataCh is empty.
            for value := range dataCh {
                log.Println(value)
            }
        }()
    }
    
    wgReceivers.Wait()
}

这个例子好理解,sender发送消息,并当符合条件时关闭dataCh。reciver中,用for range接收消息直到dataCh关闭,每个协程输出完消息后用WaitGroup确认下Done。

注意这里reciver循环时,dataCh没有关闭,是边接收后来关闭了。sender有个return因为他是无限循环,然而reciver没有return因为它可以自己退出。 <-Ch可以不关闭,里面有数据时。

一个receiver,N个sender。

receiver通过关闭一个额外的signal channel说“请停止发送”这种场景比上一个要复杂一点。我们不能让receiver关闭data channel,因为这么做将会打破channel closing principle。但是我们可以让receiver关闭一个额外的signal channel来通知sender停止发送值:

package main

import (
    "time"
    "math/rand"
    "sync"
    "log"
)

func main() {
    rand.Seed(time.Now().UnixNano())
    log.SetFlags(0)
    
    // ...
    const MaxRandomNumber = 100000
    const NumSenders = 1000
    
    wgReceivers := sync.WaitGroup{}
    wgReceivers.Add(1)
    
    // ...
    dataCh := make(chan int, 100)
    stopCh := make(chan struct{})
        // stopCh is an additional signal channel.
        // Its sender is the receiver of channel dataCh.
        // Its reveivers are the senders of channel dataCh.
    
    // senders
    for i := 0; i < NumSenders; i++ {
        go func() {
            for {
                value := rand.Intn(MaxRandomNumber)
                
                select {
                case <- stopCh:
                    return
                case dataCh <- value:
                }
            }
        }()
    }
    
    // the receiver
    go func() {
        defer wgReceivers.Done()
        
        for value := range dataCh {
            if value == MaxRandomNumber-1 {
                // the receiver of the dataCh channel is
                // also the sender of the stopCh cahnnel.
                // It is safe to close the stop channel here.
                close(stopCh)
                return
            }
            
            log.Println(value)
        }
    }()
    
    // ...
    wgReceivers.Wait()
}

注意这个例子没有关闭dataCh,sender里的无线循环时刻盯着stopCh,一旦它能返回来值就退出。reciver里,满足退出条件后,关闭stopCh,因为只有关闭了channel在(sender)无限循环里,stopCh才能返回值,正常情况下一个Channel没有关闭,无限循环会返回deadlock。

还要注意这里的reciver,关闭stopCh后直接return了,这样就还没有取到dataCh最后的值,也就不用关闭dataCh,但我觉得一般情况下需要在sender里面关闭。

还需注意,这两个例子的WaitGroup的Done都放在reciver里面,因为wg是为了保证程序运行结束,结束只有当reciver接收完才结束,而不是sender发送完结束,所以放在reciver里面判断waitgroup是否Done一个。

正如注释说的,对于额外的signal channel来说,它的sender是data channel的receiver。这个额外的signal channel被它唯一的sender关闭,遵守了channel closing principle。

判断sender的标准就是能否有权利关闭channel

M个receiver,N个sender

它们当中任意一个通过通知一个moderator(仲裁者)关闭额外的signal channel来说“让我们结束游戏吧”,这是最复杂的场景了。

我们不能让任意的receivers和senders关闭data channel,也不能让任何一个receivers通过关闭一个额外的signal channel来通知所有的senders和receivers退出游戏。这么做的话会打破channel closing principle。但是,我们可以引入一个moderator来关闭一个额外的signal channel。这个例子的一个技巧是怎么通知moderator去关闭额外的signal channel:

package main

import (
    "time"
    "math/rand"
    "sync"
    "log"
    "strconv"
)

func main() {
    rand.Seed(time.Now().UnixNano())
    log.SetFlags(0)
    
    // ...
    const MaxRandomNumber = 100000
    const NumReceivers = 10
    const NumSenders = 1000
    
    wgReceivers := sync.WaitGroup{}
    wgReceivers.Add(NumReceivers)
    
    // ...
    dataCh := make(chan int, 100)
    stopCh := make(chan struct{})
        // stopCh is an additional signal channel.
        // Its sender is the moderator goroutine shown below.
        // Its reveivers are all senders and receivers of dataCh.
    toStop := make(chan string, 1)
        // the channel toStop is used to notify the moderator
        // to close the additional signal channel (stopCh).
        // Its senders are any senders and receivers of dataCh.
        // Its reveiver is the moderator goroutine shown below.
    
    var stoppedBy string
    
    // moderator
    go func() {
        stoppedBy = <- toStop // part of the trick used to notify the moderator
                              // to close the additional signal channel.
        close(stopCh)
    }()
    
    // senders
    for i := 0; i < NumSenders; i++ {
        go func(id string) {
            for {
                value := rand.Intn(MaxRandomNumber)
                if value == 0 {
                    // here, a trick is used to notify the moderator
                    // to close the additional signal channel.
                    select {
                    case toStop <- "sender#" + id:
                    default:
                    }
                    return
                }
                
                // the first select here is to try to exit the
                // goroutine as early as possible.
                select {
                case <- stopCh:
                    return
                default:
                }
                
                select {
                case <- stopCh:
                    return
                case dataCh <- value:
                }
            }
        }(strconv.Itoa(i))
    }
    
    // receivers
    for i := 0; i < NumReceivers; i++ {
        go func(id string) {
            defer wgReceivers.Done()
            
            for {
                // same as senders, the first select here is to 
                // try to exit the goroutine as early as possible.
                select {
                case <- stopCh:
                    return
                default:
                }
                
                select {
                case <- stopCh:
                    return
                case value := <-dataCh:
                    if value == MaxRandomNumber-1 {
                        // the same trick is used to notify the moderator 
                        // to close the additional signal channel.
                        select {
                        case toStop <- "receiver#" + id:
                        default:
                        }
                        return
                    }
                    
                    log.Println(value)
                }
            }
        }(strconv.Itoa(i))
    }
    
    // ...
    wgReceivers.Wait()
    log.Println("stopped by", stoppedBy)
}

在这个例子中,仍然遵守着channel closing principle。

请注意channel toStop的缓冲大小是1.这是为了避免当mederator goroutine 准备好之前第一个通知就已经发送了,导致丢失。

if value == 0 {
// here, a trick is used to notify the moderator
// to close the additional signal channel.
select {
case toStop <- "sender#" + id:
default:
}
return
}

这里用select的目的是:

toStop的buffer只有1,如果多个同时发送给toStop的话,会导致阻塞在 toStop <- id,所以使用了select,这样子当不能发送的时候就知道已经有其他goroutine发送了信号了。其实也可以将toStop的buffer大小改成接收者和发送者数量之和,这样子就可以直接发送了。

// the first select here is to try to exit the
// goroutine as early as possible.
select {
case <- stopCh:
return
default:
}

select {
case <- stopCh:
return
case dataCh <- value:
}

这里写两次的目的:

为了提前知道channel是否已经关闭了,如果省略了这个select,有可能计算关闭了channel,也会执行发送操作,因为在一个select里面,是随机选择一个能执行的case来执行的

经验

不是所有channel都需要关闭的, 因为它完全遵循GC回收规则. 但是如果用channel来通知其他协程停止工作的话, 就需要用到关闭了. 典型的例子就是其他协程使用for xxx := range channel 这样的语句时, 如果不关闭channel的话, 这些代码会一直堵住

以上是关于golang--Channel有无缓存区别,以及关闭原则的主要内容,如果未能解决你的问题,请参考以下文章

Golang channel

golang channel本质——共享内存

golang - channel

golang channel 超时如何处理

golang channel tips

JavaScript找出两个一维数组中相同元素以及它们各自在所在数组中的下标有无return和return fasle区别