Golang中常见并发模式
Posted Go编程之旅
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Golang中常见并发模式相关的知识,希望对你有一定的参考价值。
sync.Mutex模式错误使用
package main
import(
"fmt" "sync"
)
func main() {
var mu sync.Mutex
go func(){
fmt.Println("你好, 世界")
mu.Lock()
}()
mu.Unlock()
}
运行结果
$ go run test.go
fatal error: sync: unlock of unlocked mutex
goroutine 1 [running]:
runtime.throw(0x4b8027, 0x1e)
/opt/go/src/runtime/panic.go:608 +0x72 fp=0xc000032720 sp=0xc0000326f0 pc=0x426dd2
sync.throw(0x4b8027, 0x1e)
/opt/go/src/runtime/panic.go:594 +0x35 fp=0xc000032740 sp=0xc000032720 pc=0x426d55
sync.(*Mutex).Unlock(0xc000018110)
/opt/go/src/sync/mutex.go:184 +0xc1 fp=0xc000032768 sp=0xc000032740 pc=0x45eed1
main.main()
/home/kenmy/go/src/test.go:17 +0x62 fp=0xc000032798 sp=0xc000032768 pc=0x4849a2
runtime.main()
/opt/go/src/runtime/proc.go:201 +0x207 fp=0xc0000327e0 sp=0xc000032798 pc=0x428747
runtime.goexit()
/opt/go/src/runtime/asm_amd64.s:1333 +0x1 fp=0xc0000327e8 sp=0xc0000327e0 pc=0x450701
goroutine 5 [runnable]:
main.main.func1(0xc000018110)
/home/kenmy/go/src/test.go:12
created by main.main
/home/kenmy/go/src/test.go:12 +0x54
exit status 2
解释
函数中的mu.Unlock()很有可能先发生,而这个时刻mu互斥对象还处于未加锁的状态,从而会导致运行时异常。
sync.Mutex模式正确使用
package main
import(
"fmt" "sync"
)
func main() {
var mu sync.Mutex
mu.Lock()
go func(){
fmt.Println("你好, 世界")
mu.Unlock()
}()
mu.Lock()
}
运行结果
$ go run test.go 你好, 世界
解析
在main函数所在线程中执行两次mu.Lock(),当第二次加锁时会因为锁已经
被占用(不是递归锁)而阻塞,main函数的阻塞状态驱动后台线程继续向前执行。当后台
线程执行到mu.Unlock()时解锁,此时打印工作已经完成了,解锁会导致main函数中的第
二个mu.Lock()阻塞状态取消,此时后台线程和主线程再没有其它的同步事件参考,它们
退出的事件将是并发的:在main函数退出导致程序退出时,后台线程可能已经退出了,也
可能没有退出。虽然无法确定两个线程退出的时间,但是打印工作是可以正确完成的。
用无缓存的管道来实现同步
package main
import(
"fmt"
)
func main() {
done := make(chan int)
go func(){
fmt.Println("你好, 世界")
<-done
}()
done <- 1
}
运行结果
$ go run test.go 你好, 世界
解析
根据Go语言内存模型规范,对于从无缓冲Channel进行的接收,发生在对该Channel进行的
发送完成之前。因此,后台线程<-done接收操作完成之后,main线程的done <- 1发送操作
才可能完成(从而退出main、退出程序),而此时打印工作已经完成了。
用有缓存的管道来实现同步,会出现无打印
package main
import(
"fmt"
)
func main() {
done := make(chan int, 1)
go func(){
fmt.Println("你好, 世界")
<-done
}()
done <- 1
}
运行结果
没有打印
# go build test.go
用有缓存的管道来实现同步,有打印
package main
import(
"fmt"
)
func main() {
done := make(chan int, 1)
go func(){
fmt.Println("你好, 世界")
done<-1
}()
<-done
}
运行结果
$ go run test.go 你好, 世界
带缓存并发模式
package main
import(
"fmt"
)
func main() {
done := make(chan int, 10) // 带 10 个缓存
// 开N个后台打印线程
for i := 0; i < cap(done); i++ {
go func(){
fmt.Println("你好, 世界")
done <- 1
}()
} // 等待N个后台线程完成
for i := 0; i < cap(done); i++ {
<-done
}
}
运行结果
$ go run test.go 你好, 世界
你好, 世界
你好, 世界
你好, 世界
你好, 世界
你好, 世界
你好, 世界
你好, 世界
你好, 世界
你好, 世界
sync.WaitGroup模式并发
package main
import(
"fmt" "sync"
)
func main() {
var wg sync.WaitGroup
// 开N个后台打印线程
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
fmt.Println("你好, 世界")
wg.Done()
}()
} // 等待N个后台线程完成
wg.Wait()
}
运行结果
$ go run test.go 你好, 世界
你好, 世界
你好, 世界
你好, 世界
你好, 世界
你好, 世界
你好, 世界
你好, 世界
你好, 世界
你好, 世界
生产者消费者模型
package main
import(
"fmt"
"os"
"os/signal"
"syscall"
)
// 生产者: 生成 factor 整数倍的序列
func Producer(factor int, out chan<- int) {
for i := 0;i <= 10 ; i++ {
out <- i*factor
}
}
// 消费者
func Consumer(in <-chan int) {
for v := range in {
fmt.Println(v)
}
}
func main() {
ch := make(chan int, 64) // 成果队列
go Producer(3, ch) // 生成 3 的倍数的序列
go Producer(5, ch) // 生成 5 的倍数的序列
go Consumer(ch) // 消费 生成的队列
// Ctrl+C 退出
sig := make(chan os.Signal, 1)
signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)
fmt.Printf("quit (%v)\n", <-sig)
}
运行结果
$ go run test.go
03691215182124273005101520253035404550
发布订阅模型
package main
import (
"sync"
"time"
"fmt"
"strings"
)
type (
subscriber chan interface{} // 订阅者为一个管道
topicFunc func(v interface{}) bool // 主题为一个过滤器)// 发布者对象type Publisher struct {
m sync.RWMutex // 读写锁
buffer int // 订阅队列的缓存大小
timeout time.Duration // 发布超时时间
subscribers map[subscriber]topicFunc // 订阅者信息
}
// 构建一个发布者对象, 可以设置发布超时时间和缓存队列的长度
func NewPublisher(publishTimeout time.Duration, buffer int) *Publisher {
return &Publisher{
buffer: buffer,
timeout: publishTimeout,
subscribers: make(map[subscriber]topicFunc),
}
}
// 添加一个新的订阅者,订阅全部主题
func (p *Publisher) Subscribe() chan interface{} {
return p.SubscribeTopic(nil)
}
// 添加一个新的订阅者,订阅过滤器筛选后的主题
func (p *Publisher) SubscribeTopic(topic topicFunc) chan interface{} {
ch := make(chan interface{}, p.buffer)
p.m.Lock()
p.subscribers[ch] = topic
p.m.Unlock() return ch
}
// 退出订阅
func (p *Publisher) Evict(sub chan interface{}) {
p.m.Lock()
defer p.m.Unlock()
delete(p.subscribers, sub)
close(sub)
}
// 发布一个主题
func (p *Publisher) Publish(v interface{}) {
p.m.RLock()
defer p.m.RUnlock()
var wg sync.WaitGroup
for sub, topic := range p.subscribers {
wg.Add(1)
go p.sendTopic(sub, topic, v, &wg)
}
wg.Wait()
}
// 关闭发布者对象,同时关闭所有的订阅者管道。
func (p *Publisher) Close() {
p.m.Lock()
defer p.m.Unlock()
for sub := range p.subscribers {
delete(p.subscribers, sub)
close(sub)
}
}
// 发送主题,可以容忍一定的超时
func (p *Publisher) sendTopic(
sub subscriber, topic topicFunc, v interface{}, wg *sync.WaitGroup,
) {
defer wg.Done()
if topic != nil && !topic(v) {
return
}
select {
case sub <- v:
case <-time.After(p.timeout):
}
}
func main() {
p := NewPublisher(100*time.Millisecond, 10)
defer p.Close()
all := p.Subscribe()
golang := p.SubscribeTopic(func(v interface{}) bool {
if s, ok := v.(string); ok {
return strings.Contains(s, "golang")
}
return false
})
p.Publish("hello, world!")
p.Publish("hello, golang!")
go func() {
for msg := range all {
fmt.Println("all:", msg)
}
} ()
go func() {
for msg := range golang {
fmt.Println("golang:", msg)
}
} () // 运行一定时间后退出
time.Sleep(3 * time.Second)
}
运行结果
$ go run test.go golang: hello, golang!all: hello, world!all: hello, golang!
并发的安全退出1
package main
import (
"fmt"
"time"
)
func main() {
v := make(chan int, 1)
worker(v)
}
func worker(in chan int) {
select {
case v := <-in:
fmt.Println(v)
case <-time.After(10 * time.Second):
return // 超时
}
}
运行结果
超过10s退出
并发的安全退出2
package main
import (
"fmt"
"time"
)
func worker(cannel chan bool) {
for {
select {
default:
time.Sleep(900 * time.Millisecond)
fmt.Println("hello") // 正常工作
case <-cannel: // 退出
}
}
}
func main() {
cannel := make(chan bool)
go worker(cannel)
time.Sleep(1 * time.Second)
cannel <- true
}
运行结果
$ go run test.go hello
hello
并发的安全退出3
package main
import (
"fmt" "time"
)
func worker(cannel chan bool) {
for {
select {
default:
fmt.Println("hello") // 正常工作
case <-cannel: // 退出
}
}
}
func main() {
cancel := make(chan bool)
for i := 0; i < 10; i++ {
go worker(cancel)
}
time.Sleep(time.Second)
close(cancel)
}
运行结果
运行1s后退出
解析
管道的发送操作和接收操作是一一对应的,如果要停止多个Goroutine那么可能
需要创建同样数量的管道,这个代价太大了。其实我们可以通过close关闭一个
管道来实现广播的效果,所有从关闭管道接收的操作均会收到一个零值和一个
可选的失败标志。
并发的安全退出4
package main
import (
"fmt"
"time"
"sync"
)
func worker(wg *sync.WaitGroup, cannel chan bool) {
defer wg.Done()
for {
select {
default:
fmt.Println("hello")
case <-cannel:
return
}
}
}
func main() {
cancel := make(chan bool)
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go worker(&wg, cancel)
}
time.Sleep(time.Second)
close(cancel)
wg.Wait()
}
运行结果
运行1s后退出
解析
通过close来关闭cancel管道向多个Goroutine广播退出的指令。不过这个程序依然
不够稳健:当每个Goroutine收到退出指令退出时一般会进行一定的清理工作,但是
退出的清理工作并不能保证被完成,因为main线程并没有等待各个工作Goroutine退
出工作完成的机制。我们可以结合sync.WaitGroup来改进
并发的安全退出-context
package main
import (
"fmt"
"time"
"sync"
"context"
)
func worker(ctx context.Context, wg *sync.WaitGroup) error {
defer wg.Done()
for {
select {
default:
fmt.Println("hello")
case <-ctx.Done():
return ctx.Err()
}
}
}
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go worker(ctx, &wg)
}
time.Sleep(time.Second)
cancel()
wg.Wait()
}
解析
当并发体超时或main主动停止工作者Goroutine时,每个工作者都可以安全退出。
并发的安全退出-context
package main
import (
"fmt" "context"
)
// 返回生成自然数序列的管道: 2, 3, 4, ...
func GenerateNatural(ctx context.Context) chan int {
ch := make(chan int)
go func() {
for i := 2; ; i++ {
select {
case <- ctx.Done():
return
case ch <- i:
}
}
}()
return ch
}
// 管道过滤器: 删除能被素数整除的数
func PrimeFilter(ctx context.Context, in <-chan int, prime int) chan int {
out := make(chan int)
go func() {
for {
if i := <-in; i%prime != 0 {
select {
case <- ctx.Done():
return
case out <- i:
}
}
}
}()
return out
}
func main() {
// 通过 Context 控制后台Goroutine状态
ctx, cancel := context.WithCancel(context.Background())
ch := GenerateNatural(ctx) // 自然数序列: 2, 3, 4, ...
for i := 0; i < 100; i++ {
prime := <-ch // 新出现的素数
fmt.Printf("%v: %v\n", i+1, prime)
ch = PrimeFilter(ctx, ch, prime) // 基于新素数构造的过滤器
}
cancel()
}
解析
当main函数完成工作前,通过调用cancel()来通知后台Goroutine退出,这样就避免了Goroutine的泄漏。
以上是关于Golang中常见并发模式的主要内容,如果未能解决你的问题,请参考以下文章