Golang中常见并发模式

Posted Go编程之旅

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Golang中常见并发模式相关的知识,希望对你有一定的参考价值。

sync.Mutex模式错误使用

package main

import(
   "fmt"    "sync"
   
)

func main() {    
   var mu sync.Mutex    go func(){        fmt.Println("你好, 世界")        mu.Lock()    }()    mu.Unlock() }
运行结果
$ go run test.go 
    fatal error: sync: unlock of unlocked mutex
    
    goroutine 1 [running]:
    runtime.throw(0x4b8027, 0x1e)
    	/opt/go/src/runtime/panic.go:608 +0x72 fp=0xc000032720 sp=0xc0000326f0 pc=0x426dd2
    sync.throw(0x4b8027, 0x1e)
    	/opt/go/src/runtime/panic.go:594 +0x35 fp=0xc000032740 sp=0xc000032720 pc=0x426d55
    sync.(*Mutex).Unlock(0xc000018110)
    	/opt/go/src/sync/mutex.go:184 +0xc1 fp=0xc000032768 sp=0xc000032740 pc=0x45eed1
    main.main()
    	/home/kenmy/go/src/test.go:17 +0x62 fp=0xc000032798 sp=0xc000032768 pc=0x4849a2
    runtime.main()
    	/opt/go/src/runtime/proc.go:201 +0x207 fp=0xc0000327e0 sp=0xc000032798 pc=0x428747
    runtime.goexit()
    	/opt/go/src/runtime/asm_amd64.s:1333 +0x1 fp=0xc0000327e8 sp=0xc0000327e0 pc=0x450701
    
    goroutine 5 [runnable]:
    main.main.func1(0xc000018110)
    	/home/kenmy/go/src/test.go:12
    created by main.main
    	/home/kenmy/go/src/test.go:12 +0x54
    exit status 2
解释
函数中的mu.Unlock()很有可能先发生,而这个时刻mu互斥对象还处于未加锁的状态,从而会导致运行时异常。

sync.Mutex模式正确使用

package main
import(
   "fmt"    "sync"
   
)

func main() {    
   var mu sync.Mutex    mu.Lock()    go func(){        fmt.Println("你好, 世界")        mu.Unlock()    }()    mu.Lock() }
运行结果
$ go run test.go 你好, 世界
解析
在main函数所在线程中执行两次mu.Lock(),当第二次加锁时会因为锁已经
被占用(不是递归锁)而阻塞,main函数的阻塞状态驱动后台线程继续向前执行。当后台
线程执行到mu.Unlock()时解锁,此时打印工作已经完成了,解锁会导致main函数中的第
二个mu.Lock()阻塞状态取消,此时后台线程和主线程再没有其它的同步事件参考,它们
退出的事件将是并发的:在main函数退出导致程序退出时,后台线程可能已经退出了,也
可能没有退出。虽然无法确定两个线程退出的时间,但是打印工作是可以正确完成的。

用无缓存的管道来实现同步

package main

import(
   "fmt"

)
   
func main() {    done := make(chan int)    
   go func(){        fmt.Println("你好, 世界")        <-done    }()    done <- 1

}
运行结果
$ go run test.go 你好, 世界
解析
根据Go语言内存模型规范,对于从无缓冲Channel进行的接收,发生在对该Channel进行的    
发送完成之前。因此,后台线程<-done接收操作完成之后,main线程的done <- 1发送操作
才可能完成(从而退出main、退出程序),而此时打印工作已经完成了。

用有缓存的管道来实现同步,会出现无打印

package main

import(
   "fmt"
   
)

func main() {    done := make(chan int, 1)    
   go func(){        fmt.Println("你好, 世界")        <-done    }()    done <- 1

}
运行结果

没有打印

# go build test.go

用有缓存的管道来实现同步,有打印

package main

import(
   "fmt"

)
func main() {    done := make(chan int, 1)    
   go func(){        fmt.Println("你好, 世界")        done<-1    }()    <-done }
运行结果
$ go run test.go 你好, 世界

带缓存并发模式

package main
import(
   "fmt"
   
)
func main() {    done := make(chan int, 10) // 带 10 个缓存    // 开N个后台打印线程    for i := 0; i < cap(done); i++ {        
       go func(){            fmt.Println("你好, 世界")            done <- 1        }()    }    // 等待N个后台线程完成    for i := 0; i < cap(done); i++ {        <-done    } }
运行结果
$ go run  test.go 你好, 世界
你好, 世界
你好, 世界
你好, 世界
你好, 世界
你好, 世界
你好, 世界
你好, 世界
你好, 世界
你好, 世界

sync.WaitGroup模式并发

package main

import(
   "fmt"    "sync"
   
)

func main() {    
   var wg sync.WaitGroup    // 开N个后台打印线程    for i := 0; i < 10; i++ {        wg.Add(1)        go func() {            fmt.Println("你好, 世界")            wg.Done()        }()    }    // 等待N个后台线程完成    wg.Wait() }
运行结果
$ go run  test.go 你好, 世界
你好, 世界
你好, 世界
你好, 世界
你好, 世界
你好, 世界
你好, 世界
你好, 世界
你好, 世界
你好, 世界

生产者消费者模型

package main

import(
       "fmt" "os" "os/signal" "syscall"

)
// 生产者: 生成 factor 整数倍的序列
func Producer(factor int, out chan<- int) {    
   for i := 0;i <= 10 ; i++ {        out <- i*factor    } }

// 消费者
func Consumer(in <-chan int) {    
   for v := range in {        fmt.Println(v)    } }
   
func main() {    ch := make(chan int, 64) // 成果队列    go Producer(3, ch) // 生成 3 的倍数的序列    go Producer(5, ch) // 生成 5 的倍数的序列    go Consumer(ch)    // 消费 生成的队列    // Ctrl+C 退出    sig := make(chan os.Signal, 1)    signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)    fmt.Printf("quit (%v)\n", <-sig) }
运行结果
$ go run test.go 
03691215182124273005101520253035404550

发布订阅模型

package  main

import (    
   "sync"    "time"    "fmt"    "strings"

)

type (    subscriber chan interface{}         // 订阅者为一个管道    topicFunc  func(v interface{}) bool // 主题为一个过滤器)// 发布者对象type Publisher struct {    m           sync.RWMutex             // 读写锁    buffer      int                      // 订阅队列的缓存大小    timeout     time.Duration            // 发布超时时间    subscribers map[subscriber]topicFunc // 订阅者信息

}

// 构建一个发布者对象, 可以设置发布超时时间和缓存队列的长度

func NewPublisher(publishTimeout time.Duration, buffer int) *Publisher {    
   return &Publisher{        buffer:      buffer,        timeout:     publishTimeout,        subscribers: make(map[subscriber]topicFunc),    } }
   
// 添加一个新的订阅者,订阅全部主题
func (p *Publisher) Subscribe() chan interface{} {    
   return p.SubscribeTopic(nil) }
   
// 添加一个新的订阅者,订阅过滤器筛选后的主题
func (p *Publisher) SubscribeTopic(topic topicFunc) chan interface{} {    ch := make(chan interface{}, p.buffer)    p.m.Lock()    p.subscribers[ch] = topic    p.m.Unlock()    return ch }

// 退出订阅
func (p *Publisher) Evict(sub chan interface{}) {    p.m.Lock()    
   defer p.m.Unlock()    
   delete(p.subscribers, sub)    
   close(sub) }
   
// 发布一个主题
func (p *Publisher) Publish(v interface{}) {    p.m.RLock()    
   defer p.m.RUnlock()    
   var wg sync.WaitGroup    
   for sub, topic := range p.subscribers {        wg.Add(1)        
       go p.sendTopic(sub, topic, v, &wg)    }    wg.Wait() }
       
// 关闭发布者对象,同时关闭所有的订阅者管道。

func (p *Publisher) Close() {     p.m.Lock()    
    defer p.m.Unlock()    
    for sub := range p.subscribers {        
        delete(p.subscribers, sub)        
        close(sub)    } }

// 发送主题,可以容忍一定的超时

func (p *Publisher) sendTopic(    sub subscriber, topic topicFunc, v interface{}, wg *sync.WaitGroup, ) {    
    defer wg.Done()    
    if topic != nil && !topic(v) {        
        return     }    
    select {    
        case sub <- v:    
        case <-time.After(p.timeout):    } }
       
func main() {    p := NewPublisher(100*time.Millisecond, 10)    
   defer p.Close()    all := p.Subscribe()    golang := p.SubscribeTopic(func(v interface{}) bool {        
       if s, ok := v.(string); ok {            
           return strings.Contains(s, "golang")        }        
       return false    })    p.Publish("hello,  world!")    p.Publish("hello, golang!")    
   go func() {        
       for  msg := range all {            fmt.Println("all:", msg)        }    } ()    
   
   go func() {        
       for  msg := range golang {            fmt.Println("golang:", msg)        }    } ()    // 运行一定时间后退出    time.Sleep(3 * time.Second) }
运行结果
$ go run test.go golang: hello, golang!all: hello,  world!all: hello, golang!

并发的安全退出1

package  main
import (
       "fmt" "time"

)

func
main() {        v := make(chan int, 1) worker(v) }

func worker(in chan int) {
   select {
       case v := <-in:    fmt.Println(v)
       case <-time.After(10 * time.Second):    
           return // 超时 } }
运行结果
超过10s退出

并发的安全退出2

package  main
import (
       "fmt" "time"
)

func
worker(cannel chan bool) {    
   for {        
       select {        
           default:                time.Sleep(900 * time.Millisecond)                fmt.Println("hello")            // 正常工作            case <-cannel:            // 退出        }    } }

func
main() {    cannel := make(chan bool)    
   go worker(cannel)    time.Sleep(1 * time.Second)    cannel <- true

}
运行结果
$ go run test.go hello
hello

并发的安全退出3

package  main
import (
   "fmt"    "time"  
)

func
worker(cannel chan bool) {    
   for {        
       select {        
           default:                fmt.Println("hello")            // 正常工作            case <-cannel:            // 退出        }    } }
           
func main() {    cancel := make(chan bool)    
   for i := 0; i < 10; i++ {        
       go worker(cancel)    }    time.Sleep(time.Second)    
   close(cancel) }
运行结果
运行1s后退出
解析
管道的发送操作和接收操作是一一对应的,如果要停止多个Goroutine那么可能
需要创建同样数量的管道,这个代价太大了。其实我们可以通过close关闭一个
管道来实现广播的效果,所有从关闭管道接收的操作均会收到一个零值和一个
可选的失败标志。

并发的安全退出4

package  main
import (
       "fmt" "time" "sync"

)
func worker(wg *sync.WaitGroup, cannel chan bool) {    
   defer wg.Done()    
   for {        
       select {        
           default:                fmt.Println("hello")        
           case <-cannel:            
               return        }    } }
               
func main() {    cancel := make(chan bool)    
   var wg sync.WaitGroup    
   for i := 0; i < 10; i++ {        wg.Add(1)        
       go worker(&wg, cancel)    }    time.Sleep(time.Second)    
   close(cancel)    wg.Wait() }
运行结果
运行1s后退出
解析
通过close来关闭cancel管道向多个Goroutine广播退出的指令。不过这个程序依然
不够稳健:当每个Goroutine收到退出指令退出时一般会进行一定的清理工作,但是
退出的清理工作并不能保证被完成,因为main线程并没有等待各个工作Goroutine退
出工作完成的机制。我们可以结合sync.WaitGroup来改进

并发的安全退出-context

package  main

import (	
       "fmt" "time" "sync" "context"

)
func worker(ctx context.Context, wg *sync.WaitGroup) error {    defer wg.Done()    
   for {        
       select {        
           default:                fmt.Println("hello")        
           case <-ctx.Done():            
               return ctx.Err()        }    } }

func
main() {
   ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)    var wg sync.WaitGroup    
   for i := 0; i < 10; i++ {        wg.Add(1)        go worker(ctx, &wg)    }    time.Sleep(time.Second)    cancel()    wg.Wait() }
解析
当并发体超时或main主动停止工作者Goroutine时,每个工作者都可以安全退出。

并发的安全退出-context

package  main
import (
   "fmt"    "context"
   
)

// 返回生成自然数序列的管道: 2, 3, 4, ...

func GenerateNatural(ctx context.Context) chan int {    ch := make(chan int)    
   go func() {        
       for i := 2; ; i++ {            
           select {            
               case <- ctx.Done():                
                   return                case ch <- i:            }       }    }()    
   return ch }

// 管道过滤器: 删除能被素数整除的数

func PrimeFilter(ctx context.Context, in <-chan int, prime int) chan int {    out := make(chan int)    
   go func() {        
       for {            
           if i := <-in; i%prime != 0 {                
               select {                
                   case <- ctx.Done():                    
                       return                    case out <- i:                }            }        }    }()    
   return out }

func
main() {    
   // 通过 Context 控制后台Goroutine状态    ctx, cancel := context.WithCancel(context.Background())    ch := GenerateNatural(ctx) // 自然数序列: 2, 3, 4, ...    for i := 0; i < 100; i++ {        prime := <-ch // 新出现的素数        fmt.Printf("%v: %v\n", i+1, prime)        ch = PrimeFilter(ctx, ch, prime) // 基于新素数构造的过滤器    }    cancel() }
解析
当main函数完成工作前,通过调用cancel()来通知后台Goroutine退出,这样就避免了Goroutine的泄漏。


以上是关于Golang中常见并发模式的主要内容,如果未能解决你的问题,请参考以下文章

golang代码片段(摘抄)

Golang 并发模式

Golang 并发模式

4种Golang并发操作中常见的死锁情形

4种Golang并发操作中常见的死锁情形

4种Golang并发操作中常见的死锁情形