[11][go] go concurrency
Posted WhateverYoung
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了[11][go] go concurrency相关的知识,希望对你有一定的参考价值。
绝妙的 CHANNEL
- channel提供了一种强大的、在不使用锁或临界区的情况下,从某个 goroutine 向其他 goroutine 发送数据流的方法。
- 发送者才能关闭channel
一个已经被关闭的 channel 永远都不会阻塞
- 已经被关闭的 channel。当一个 channel 一旦被关闭,就不能再向这个 channel 发送数据,不过你仍然可以尝试从 channel 中获取值。能够检测 channel 是否关闭是一个很有用的特性,可用于对 channel 进行 range 操作,并且当 channel 清空后退出循环。
- 上述其真正的价值是与 select 联合时体现的,场景是不知道有多少个go程在等待一个channel时,此时发送者关闭finish标志位channel,可以关闭上述所有channel且不阻塞。这个强大的机制在无需知道未知数量的 goroutine 的任何细节而向它们发送信号而成为可能,同时也不用担心死锁。
package main
import "fmt"
func main()
ch := make(chan bool, 2)
ch <- true
ch <- true
close(ch)
for i := 0; i < cap(ch) +1 ; i++
v, ok := <- ch
fmt.Println(v, ok)
true true
true true
false false
func main()
ch := make(chan bool, 2)
ch <- true
ch <- true
close(ch)
for v := range ch
fmt.Println(v) // 被调用两次
func main()
finish := make(chan bool)
var done sync.WaitGroup
done.Add(1)
go func()
select
case <-time.After(1 * time.Hour):
case <-finish:
done.Done()
()
t0 := time.Now()
finish <- true // 发送关闭信号
done.Wait() // 等待 goroutine 结束
fmt.Printf("Waited %v for goroutine to stop\\n", time.Since(t0))
有可能出问题的地方,第一,finish <- true // 发送关闭信号有可能阻塞;第二,如果多个go listen finish,且无法控制具体多少个的时候;前者可以通过缓冲防止阻塞或者select 带default防止阻塞。最好的方式是,利用关闭的channel不会阻塞,切回返回false这一特性,如下,是否需要Wait取决于是否需要等待所有go程返回?
func main()
const n = 100
finish := make(chan bool)
var done sync.WaitGroup
for i := 0; i < n; i++
done.Add(1)
go func()
select
case <-time.After(1 * time.Hour):
case <-finish:
done.Done()
()
t0 := time.Now()
close(finish) // 关闭 finish 使其立即返回
done.Wait() // 等待所有的 goroutine 结束
fmt.Printf("Waited %v for %d goroutines to stop\\n", time.Since(t0), n)
当 close(finish) 依赖于关闭 channel 的消息机制,而没有数据收发时,将 finish 定义为 type chan struct 表示 channel 没有任何数据;只对其关闭的特性感兴趣。
func main()
finish := make(chan struct)
var done sync.WaitGroup
done.Add(1)
go func()
select
case <-time.After(1 * time.Hour):
case <-finish:
done.Done()
()
t0 := time.Now()
close(finish)
done.Wait()
fmt.Printf("Waited %v for goroutine to stop\\n", time.Since(t0))
一个 nil channel 永远都是阻塞的
- 当 channel 的值尚未进行初始化或赋值为 nil 是,永远都是阻塞的,发送和接收都是如此
- 当使用已经关闭的 channel 机制来等待多个 channel 关闭的时候,这确实是一个很有用的特性。例如当 nil channel 是 select 语句的一部分时,它实际上会被忽略,因此,将 a 设置为 nil 便会将其从 select 中移除,仅仅留下 b 等待它被关闭,进而退出循环。
func main()
var ch chan bool
ch <- true // 永远阻塞
func main()
var ch chan bool
go func()
fmt.Println("1")
ch <- true // 永远阻塞
fmt.Println("2")
()
fmt.Println("sleep start")
time.Sleep(3 * time.Second)
fmt.Println("sleep finish")
ch = make(chan bool, 1) // 也不生效,因为go程传递的ch是没初始化的ch
time.Sleep(10 * time.Second)
func main()
var ch chan bool
<- ch // 永远阻塞
// WaitMany 等待 a 和 b 关闭。
func WaitMany(a, b chan bool)
var aclosed, bclosed bool
for !aclosed || !bclosed
select
case <-a:
fmt.Printf("close b\\n")
aclosed = true
case <-b:
fmt.Printf("close b\\n")
bclosed = true
WaitMany() 用于等待 channel a 和 b 关闭是个不错的方法,但是有一个问题。
假设 channel a 首先被关闭,然后它会立刻返回。
但是由于 bclosed 仍然是 false,程序会进入死循环,
而让 channel b 永远不会被判定为关闭。(实际测试并没有死循环,而是仍有可能跑到b,只不过依靠概率,而改进则每个case只走1次,更为优雅)
func WaitMany(a, b chan bool)
for a != nil || b != nil
select
case <-a:
fmt.Printf("close a\\n")
a = nil
case <-b:
fmt.Printf("close b\\n")
b = nil
func main()
a, b := make(chan bool), make(chan bool)
t0 := time.Now()
go func()
close(a)
close(b)
()
WaitMany(a, b)
fmt.Printf("waited %v for WaitMany\\n", time.Since(t0))
channel app
生产者消费者
func producer(c chan int64, max int)
defer close(c)
for i:= 0; i < max; i ++
c <- time.Now().Unix()
func consumer(c chan int64)
var v int64
ok := true
for ok
if v, ok = <-c; ok
fmt.Println(v)
自增长 ID 生成器
type AutoInc struct
start, step int
queue chan int
running bool
func New(start, step int) (ai *AutoInc)
ai = &AutoInc
start: start,
step: step,
running: true,
queue: make(chan int, 4),
go ai.process()
return
func (ai *AutoInc) process()
defer func() recover()()
for i := ai.start; ai.running ; i=i+ai.step
ai.queue <- i
func (ai *AutoInc) Id() int
return <-ai.queue
func (ai *AutoInc) Close()
ai.running = false
close(ai.queue)
信号量
var sem = make(chan int, MaxOutstanding)
func handle(r *Request)
sem <- 1 // 等待放行;
process(r) // 可能需要一个很长的处理过程;
<-sem // 完成,放行另一个过程。
func Serve(queue chan *Request)
for
req := <-queue
go handle(req) // 无需等待 handle 完成。
随机序列生成器
func producer(c chan int64, max int)
defer close(c)
for i:= 0; i < max; i ++
select // randomized select
case c <- 0:
case c <- 1:
超时定时器
- 当一个 channel 被 read/write 阻塞时,它会被永远阻塞下去,直到 channel 被关闭,这时会产生一个 panic。channel 没有内建用于超时的定时器。
- 解决方案是使用另一个channel,以及select,这和time中的timer原理类似
c := make(chan int64, 5)
defer close(c)
timeout := make(chan bool)
defer close(timeout)
go func()
time.Sleep(time.Second) // 等一秒
timeout <- true // 向超时队列中放入标志
()
select
case <-timeout: // 超时
fmt.Println("timeout...")
case <-c: // 收到数据
fmt.Println("Read a date.")
channel trouble
- 有太多的东西要考虑,并且永远记得:什么时候、如何关闭 channel;
- 如何传递错误
- 如何释放资源
你可以说,这些模式都不常见。不过……我在我的项目中不得不实现它们中的大多数。每!一!次!可能我不怎么走运,而你的项目会跟写给初学者的指南一样简单。
我知道,你们中的大多数会说“世界是艰辛的,编程是苦难的”。我会继续打击你:至少有一些语言展示了部分解决这些问题的示例。至少,在尝试解决它。Haskell 和 Scala 的类型系统提供了构建强大的高级抽象的能力,甚至自定义控制流来处理并发。而另一阵营的 Clojure 利用动态类型鼓励和共享高级的抽象。Rust 有 channel 和泛型。
让它工作 -> 让它优雅 -> 让它可重用。
现在,第一步已经完成。接下来呢?不要误会,go 是一个有远见的语言:channel 和 goroutine 比起例如 pthread 来说更好,不过是不是真得就停留在此?
func merge(cs ...<-chan int) <-chan int
var wg sync.WaitGroup
out := make(chan int)
// 为 cs 中每个输入的 channel 启动一个输出用的 goroutine。
// 从 c 中复制值出来直到 c 被关闭,然后又调用 wg.Done。
output := func(c <-chan int)
for n := range c
out <- n
wg.Done()
wg.Add(len(cs))
for _, c := range cs
go output(c)
// 一旦所有输出的 goroutine 完成的,就启动一个 goroutine 来关闭 out。
// 这必须在 wg.Add 调用后启动。
go func()
wg.Wait()
close(out)
()
return out
以上是关于[11][go] go concurrency的主要内容,如果未能解决你的问题,请参考以下文章
北京大学机试 I Wanna Go Home 需要二刷 *最短路径衍生问题
Go’s Philosophy on Concurrency