[11][go] go concurrency

Posted WhateverYoung

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了[11][go] go concurrency相关的知识,希望对你有一定的参考价值。

绝妙的 CHANNEL

channel1

  • channel提供了一种强大的、在不使用锁或临界区的情况下,从某个 goroutine 向其他 goroutine 发送数据流的方法。
  • 发送者才能关闭channel

一个已经被关闭的 channel 永远都不会阻塞

  • 已经被关闭的 channel。当一个 channel 一旦被关闭,就不能再向这个 channel 发送数据,不过你仍然可以尝试从 channel 中获取值。能够检测 channel 是否关闭是一个很有用的特性,可用于对 channel 进行 range 操作,并且当 channel 清空后退出循环。
  • 上述其真正的价值是与 select 联合时体现的,场景是不知道有多少个go程在等待一个channel时,此时发送者关闭finish标志位channel,可以关闭上述所有channel且不阻塞。这个强大的机制在无需知道未知数量的 goroutine 的任何细节而向它们发送信号而成为可能,同时也不用担心死锁。
package main
 
import "fmt"
 
func main() 
        ch := make(chan bool, 2)
        ch <- true
        ch <- true
        close(ch)
 
        for i := 0; i < cap(ch) +1 ; i++ 
                v, ok := <- ch
                fmt.Println(v, ok)
        


true true
true true
false false

func main() 
        ch := make(chan bool, 2)
        ch <- true
        ch <- true
        close(ch)
 
        for v := range ch 
                fmt.Println(v) // 被调用两次
        



func main() 
        finish := make(chan bool)
        var done sync.WaitGroup
        done.Add(1)
        go func() 
                select 
                case <-time.After(1 * time.Hour):
                case <-finish:
                
                done.Done()
        ()
        t0 := time.Now()
        finish <- true // 发送关闭信号
        done.Wait()    // 等待 goroutine 结束
        fmt.Printf("Waited %v for goroutine to stop\\n", time.Since(t0))

有可能出问题的地方,第一,finish <- true // 发送关闭信号有可能阻塞;第二,如果多个go listen finish,且无法控制具体多少个的时候;前者可以通过缓冲防止阻塞或者select 带default防止阻塞。最好的方式是,利用关闭的channel不会阻塞,切回返回false这一特性,如下,是否需要Wait取决于是否需要等待所有go程返回?
func main() 
        const n = 100
        finish := make(chan bool)
        var done sync.WaitGroup
        for i := 0; i < n; i++ 
                done.Add(1)
                go func() 
                        select 
                        case <-time.After(1 * time.Hour):
                        case <-finish:
                        
                        done.Done()
                ()
        
        t0 := time.Now()
        close(finish)    // 关闭 finish 使其立即返回
        done.Wait()      // 等待所有的 goroutine 结束
        fmt.Printf("Waited %v for %d goroutines to stop\\n", time.Since(t0), n)

当 close(finish) 依赖于关闭 channel 的消息机制,而没有数据收发时,将 finish 定义为 type chan struct 表示 channel 没有任何数据;只对其关闭的特性感兴趣。
func main() 
        finish := make(chan struct)
        var done sync.WaitGroup
        done.Add(1)
        go func() 
                select 
                case <-time.After(1 * time.Hour):
                case <-finish:
                
                done.Done()
        ()
        t0 := time.Now()
        close(finish)
        done.Wait()
        fmt.Printf("Waited %v for goroutine to stop\\n", time.Since(t0))


一个 nil channel 永远都是阻塞的

  • 当 channel 的值尚未进行初始化或赋值为 nil 是,永远都是阻塞的,发送和接收都是如此
  • 当使用已经关闭的 channel 机制来等待多个 channel 关闭的时候,这确实是一个很有用的特性。例如当 nil channel 是 select 语句的一部分时,它实际上会被忽略,因此,将 a 设置为 nil 便会将其从 select 中移除,仅仅留下 b 等待它被关闭,进而退出循环。
func main() 
        var ch chan bool
        ch <- true // 永远阻塞


func main() 
	var ch chan bool
	go func() 
		fmt.Println("1")
		ch <- true // 永远阻塞
		fmt.Println("2")
	()
	fmt.Println("sleep start")
	time.Sleep(3 * time.Second)
	fmt.Println("sleep finish")
	ch = make(chan bool, 1) // 也不生效,因为go程传递的ch是没初始化的ch
	time.Sleep(10 * time.Second)


func main() 
        var ch chan bool
        <- ch // 永远阻塞


// WaitMany 等待 a 和 b 关闭。
func WaitMany(a, b chan bool) 
        var aclosed, bclosed bool
        for !aclosed || !bclosed 
                select 
                case <-a:
                		    fmt.Printf("close b\\n")
                        aclosed = true
                case <-b:
                		    fmt.Printf("close b\\n")
                        bclosed = true
                
        

WaitMany() 用于等待 channel a 和 b 关闭是个不错的方法,但是有一个问题。
假设 channel a 首先被关闭,然后它会立刻返回。
但是由于 bclosed 仍然是 false,程序会进入死循环,
而让 channel b 永远不会被判定为关闭。(实际测试并没有死循环,而是仍有可能跑到b,只不过依靠概率,而改进则每个case只走1次,更为优雅)

func WaitMany(a, b chan bool) 
        for a != nil || b != nil 
                select 
                case <-a:
                			 fmt.Printf("close a\\n")
                        a = nil
                case <-b:
                			 fmt.Printf("close b\\n")
                        b = nil
                
        


func main() 
	a, b := make(chan bool), make(chan bool)
	t0 := time.Now()
	go func() 
		close(a)
		close(b)
	()
	WaitMany(a, b)
	fmt.Printf("waited %v for WaitMany\\n", time.Since(t0))


channel app

生产者消费者

func producer(c chan int64, max int) 
    defer close(c)
    for i:= 0; i < max; i ++ 
        c <- time.Now().Unix()
    



func consumer(c chan int64) 
    var v int64
    ok := true
    for ok 
        if v, ok = <-c; ok 
            fmt.Println(v)
        
    


自增长 ID 生成器

type AutoInc struct 
    start, step int
    queue chan int
    running bool


func New(start, step int) (ai *AutoInc) 
    ai = &AutoInc
        start: start,
        step: step,
        running: true,
        queue: make(chan int, 4),
    
    go ai.process()
    return


func (ai *AutoInc) process() 
    defer func() recover()()
    for i := ai.start; ai.running ; i=i+ai.step 
        ai.queue <- i
    


func (ai *AutoInc) Id() int 
    return <-ai.queue


func (ai *AutoInc) Close() 
    ai.running = false
    close(ai.queue)

信号量

var sem = make(chan int, MaxOutstanding)
 
func handle(r *Request) 
    sem <- 1    // 等待放行;
    process(r)  // 可能需要一个很长的处理过程;
    <-sem       // 完成,放行另一个过程。

 
func Serve(queue chan *Request) 
    for 
        req := <-queue
        go handle(req)  // 无需等待 handle 完成。
    

随机序列生成器

func producer(c chan int64, max int) 
    defer close(c)
    for i:= 0; i < max; i ++ 
        select  // randomized select
            case c <- 0:
            case c <- 1:
        
    


超时定时器

  • 当一个 channel 被 read/write 阻塞时,它会被永远阻塞下去,直到 channel 被关闭,这时会产生一个 panic。channel 没有内建用于超时的定时器。
  • 解决方案是使用另一个channel,以及select,这和time中的timer原理类似
c := make(chan int64, 5)
defer close(c)
timeout := make(chan bool)
defer close(timeout)
go func() 
    time.Sleep(time.Second) // 等一秒
    timeout <- true // 向超时队列中放入标志
()
select 
    case <-timeout: // 超时
        fmt.Println("timeout...")
    case <-c: // 收到数据
        fmt.Println("Read a date.")


channel trouble

channel trouble

  • 有太多的东西要考虑,并且永远记得:什么时候、如何关闭 channel;
  • 如何传递错误
  • 如何释放资源

你可以说,这些模式都不常见。不过……我在我的项目中不得不实现它们中的大多数。每!一!次!可能我不怎么走运,而你的项目会跟写给初学者的指南一样简单。

我知道,你们中的大多数会说“世界是艰辛的,编程是苦难的”。我会继续打击你:至少有一些语言展示了部分解决这些问题的示例。至少,在尝试解决它。Haskell 和 Scala 的类型系统提供了构建强大的高级抽象的能力,甚至自定义控制流来处理并发。而另一阵营的 Clojure 利用动态类型鼓励和共享高级的抽象。Rust 有 channel 和泛型。

让它工作 -> 让它优雅 -> 让它可重用。

现在,第一步已经完成。接下来呢?不要误会,go 是一个有远见的语言:channel 和 goroutine 比起例如 pthread 来说更好,不过是不是真得就停留在此?

func merge(cs ...<-chan int) <-chan int 
    var wg sync.WaitGroup
    out := make(chan int)
 
    // 为 cs 中每个输入的 channel 启动一个输出用的 goroutine。
    // 从 c 中复制值出来直到 c 被关闭,然后又调用 wg.Done。
    output := func(c <-chan int) 
        for n := range c 
            out <- n
        
        wg.Done()
    
    wg.Add(len(cs))
    for _, c := range cs 
        go output(c)
    
 
    // 一旦所有输出的 goroutine 完成的,就启动一个 goroutine 来关闭 out。
    // 这必须在 wg.Add 调用后启动。
    go func() 
        wg.Wait()
        close(out)
    ()
    return out


以上是关于[11][go] go concurrency的主要内容,如果未能解决你的问题,请参考以下文章

[11][go] go concurrency

北京大学机试 I Wanna Go Home 需要二刷 *最短路径衍生问题

Go’s Philosophy on Concurrency

GO语言基础之并发concurrency

go14--并发concurrency,Goroutine ,channel

go语言设计模式之Concurrency future