Go语言实战 并发模式

Posted ycx95

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Go语言实战 并发模式相关的知识,希望对你有一定的参考价值。

章节目录学习内容有:runner、pool、Go读写锁、以及总结。总结我习惯将其放在前面。

总结

稍后添加

runner

common.go

package common

import (
    "time"
    "os"
    "errors"
    "os/signal"
)
var ErrTimeOut = errors.New("执行者执行超时")
var ErrInterrupt = errors.New("执行者被中断")
// 一个执行者可以执行任何任务 但是任务是限制完成的 该执行者可以通过发送终止信号终止它
type Runner struct{
    tasks []func(int) //要执行的任务
    complete chan error //用于通知任务全部完成  定义出现错误的原因 无错误返回nil
    timeout <-chan time.Time //单向通道 这些任务再多久内完成
    interrupt chan  os.Signal //可以控制强制终止的信号
}
//用于返回我们需要的Runner
//参数,用来设置这个执行者的超时时间 func New(tm time.Duration)
*Runner{ return &Runner{ complete:make(chan error), timeout:time.After(tm), interrupt:make(chan os.Signal,1), } } //将需要执行的任务 添加到Runner里 func (r *Runner) Add(tasks ...func(int)){ r.tasks = append(r.tasks,tasks...) } func (r *Runner) run() error { for id,task := range r.tasks { if r.isInterrupt(){ return ErrInterrupt } task(id) } return nil } //检查是否接收到了中断信号 func (r *Runner) isInterrupt() bool{ select { case <-r.interrupt: signal.Stop(r.interrupt) return true default: return false } } func (r *Runner) Start() error{ signal.Notify(r.interrupt,os.Interrupt) go func() { r.complete <- r.run() }() select { case err := <-r.complete: return err case <-r.timeout: return ErrTimeOut } }

 

main.go

func main() {
    log.Println("...开始执行任务...")

    timeout := 3 * time.Second  //定义超时时间为3s
    r := common.New(timeout)

    r.Add(createTask(), createTask(), createTask()) //添加3个任务

    if err:=r.Start();err!=nil{
        switch err {
        case common.ErrTimeOut:
            log.Println(err)
            os.Exit(1)
        case common.ErrInterrupt:
            log.Println(err)
            os.Exit(2)
        }
    }
    log.Println("...任务执行结束...")
}

func createTask() func(int) {
    return func(id int) {
        log.Printf("正在执行任务%d", id)
        time.Sleep(time.Duration(id)* time.Second)
    }
}

 

输出:

2018/07/31 16:40:42 ...开始执行任务...
2018/07/31 16:40:42 正在执行任务0
2018/07/31 16:40:42 正在执行任务1
2018/07/31 16:40:43 正在执行任务2
2018/07/31 16:40:45 执行者执行超时

pool

common.go

//一个安全的资源池,被管理的资源必须都实现io.Close接口
type Pool struct {
    m       sync.Mutex                       //m是一个互斥锁
    res     chan io.Closer                   //有缓冲通道 大小在初始化Pool指定
    factory func() (io.Closer, error)        //生成新资源的函数
    closed  bool                             //资源是否关闭
}

var ErrPoolClosed = errors.New("资源池已经被关闭。")

//创建一个资源池  接受两个参数:fn创建新资源函数 size指定资源池的大小
func New(fn func() (io.Closer, error), size uint) (*Pool, error) {
    if size <= 0 {
        return nil, errors.New("size的值太小了。")
    }
    return &Pool{
        factory: fn,
        res:     make(chan io.Closer, size),
    }, nil
}
//从资源池里获取一个资源
func (p *Pool) Acquire() (io.Closer,error) {
    select {
    case r,ok := <-p.res:
        log.Println("Acquire:共享资源")
        if !ok {
            return nil,ErrPoolClosed
        }
        return r,nil
    default:
        log.Println("Acquire:新生成资源")
        return p.factory()
    }
}

//关闭资源池,释放资源
func (p *Pool) Close() {
    p.m.Lock()
    defer p.m.Unlock()

    if p.closed {
        return
    }

    p.closed = true

    //关闭通道,不让写入了
    close(p.res)

    //关闭通道里的资源
    for r:=range p.res {
        r.Close()
    }
}

func (p *Pool) Release(r io.Closer){
    //保证该操作和Close方法的操作是安全的
    p.m.Lock()
    defer p.m.Unlock()

    //资源池都关闭了,就省这一个没有释放的资源了,释放即可
    if p.closed {
        r.Close()
        return
    }

    select {
    case p.res <- r:
        log.Println("资源释放到池子里了")
    default:
        log.Println("资源池满了,释放这个资源吧")
        r.Close()
    }
}

 

main.go

const (
    //模拟的最大goroutine
    maxGoroutine = 5
    //资源池的大小
    poolRes      = 2
)

func main() {
    //等待任务完成
    var wg sync.WaitGroup
    wg.Add(maxGoroutine)

    p, err := common.New(createConnection, poolRes)
    if err != nil {
        log.Println(err)
        return
    }
    //模拟好几个goroutine同时使用资源池查询数据
    for query := 0; query < maxGoroutine; query++ {
        go func(q int) {
            dbQuery(q, p)
            wg.Done()
        }(query)
    }

    wg.Wait()
    log.Println("开始关闭资源池")
    p.Close()
}

//模拟数据库查询
func dbQuery(query int, pool *common.Pool) {
    conn, err := pool.Acquire()
    if err != nil {
        log.Println(err)
        return
    }

    defer pool.Release(conn)

    //模拟查询
    time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
    log.Printf("第%d个查询,使用的是ID为%d的数据库连接", query, conn.(*dbConnection).ID)
}
//数据库连接
type dbConnection struct {
    ID int32//连接的标志
}

//实现io.Closer接口
func (db *dbConnection) Close() error {
    log.Println("关闭连接", db.ID)
    return nil
}

var idCounter int32

//生成数据库连接的方法,以供资源池使用
func createConnection() (io.Closer, error) {
    //并发安全,给数据库连接生成唯一标志
    id := atomic.AddInt32(&idCounter, 1)
    return &dbConnection{id}, nil
}

 

输出:

2018/07/31 16:51:58 Acquire:新生成资源
2018/07/31 16:51:58 Acquire:新生成资源
2018/07/31 16:51:58 Acquire:新生成资源
2018/07/31 16:51:58 Acquire:新生成资源
2018/07/31 16:51:58 Acquire:新生成资源
2018/07/31 16:51:58 第1个查询,使用的是ID为4的数据库连接
2018/07/31 16:51:58 资源释放到池子里了
2018/07/31 16:51:58 第2个查询,使用的是ID为5的数据库连接
2018/07/31 16:51:58 资源释放到池子里了
2018/07/31 16:51:58 第4个查询,使用的是ID为1的数据库连接
2018/07/31 16:51:58 资源池满了,释放这个资源吧
2018/07/31 16:51:58 关闭连接 1
2018/07/31 16:51:58 第3个查询,使用的是ID为3的数据库连接
2018/07/31 16:51:58 资源池满了,释放这个资源吧
2018/07/31 16:51:58 关闭连接 3
2018/07/31 16:51:58 第0个查询,使用的是ID为2的数据库连接
2018/07/31 16:51:58 资源池满了,释放这个资源吧
2018/07/31 16:51:58 关闭连接 2
2018/07/31 16:51:58 开始关闭资源池
2018/07/31 16:51:58 关闭连接 4
2018/07/31 16:51:58 关闭连接 5

 

由于资源使用频繁,Go提供原生的资源池管理,利用sync.Pool实现

package main

import (
    "sync"
    "time"
    "math/rand"
    "log"
    "sync/atomic"
)

const (
    //模拟的最大goroutine
    maxGoroutine = 5
)

func main() {
    //等待任务完成
    var wg sync.WaitGroup
    wg.Add(maxGoroutine)

    p:=&sync.Pool{
        New:createConnection,
    }

    //模拟好几个goroutine同时使用资源池查询数据
    for query := 0; query < maxGoroutine; query++ {
        go func(q int) {
            dbQuery(q, p)
            wg.Done()
        }(query)
    }

    wg.Wait()
}

//模拟数据库查询
func dbQuery(query int, pool *sync.Pool) {
    conn:=pool.Get().(*dbConnection)

    defer pool.Put(conn)

    //模拟查询
    time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
    log.Printf("第%d个查询,使用的是ID为%d的数据库连接", query, conn.ID)
}
//数据库连接
type dbConnection struct {
    ID int32//连接的标志
}

//实现io.Closer接口
func (db *dbConnection) Close() error {
    log.Println("关闭连接", db.ID)
    return nil
}

var idCounter int32

//生成数据库连接的方法,以供资源池使用
func createConnection() interface{} {
    //并发安全,给数据库连接生成唯一标志
    id := atomic.AddInt32(&idCounter, 1)
    return &dbConnection{ID:id}
}

 

输出

2018/07/31 16:55:40 第2个查询,使用的是ID为4的数据库连接
2018/07/31 16:55:40 第4个查询,使用的是ID为1的数据库连接
2018/07/31 16:55:40 第3个查询,使用的是ID为5的数据库连接
2018/07/31 16:55:41 第1个查询,使用的是ID为3的数据库连接
2018/07/31 16:55:41 第0个查询,使用的是ID为2的数据库连接

Go读写锁

技术分享图片
package main

import (
    "sync"
    "fmt"
    "math/rand"
)

var count int
var wg sync.WaitGroup

func main() {
    wg.Add(10)

    for i:=0;i<5;i++ {
        go read(i)
    }

    for i:=0;i<5;i++ {
        go write(i);
    }

    wg.Wait()
}

func read(n int) {
    fmt.Printf("读goroutine %d 正在读取...
",n)

    v := count

    fmt.Printf("读goroutine %d 读取结束,值为:%d
", n,v)
    wg.Done()
}

func write(n int) {
    fmt.Printf("写goroutine %d 正在写入...
",n)
    v := rand.Intn(1000)

    count = v

    fmt.Printf("写goroutine %d 写入结束,新值为:%d
", n,v)
    wg.Done()
}
存在竞争态例子

 

 

输出

写goroutine 1 正在写入...
写goroutine 1 写入结束,新值为:81
写goroutine 0 正在写入...
写goroutine 0 写入结束,新值为:887
读goroutine 1 正在读取...
读goroutine 1 读取结束,值为:887
读goroutine 0 正在读取...
读goroutine 0 读取结束,值为:887
写goroutine 2 正在写入...
写goroutine 2 写入结束,新值为:847
写goroutine 3 正在写入...
写goroutine 3 写入结束,新值为:59
读goroutine 2 正在读取...
读goroutine 2 读取结束,值为:59
读goroutine 3 正在读取...
读goroutine 3 读取结束,值为:59
写goroutine 4 正在写入...
读goroutine 4 正在读取...
读goroutine 4 读取结束,值为:81
写goroutine 4 写入结束,新值为:81

 

方法一:加互斥锁

方案二:读写锁

var count int
var wg sync.WaitGroup
var rw sync.RWMutex

func main() {
    wg.Add(10)

    for i:=0;i<5;i++ {
        go read(i)
    }

    for i:=0;i<5;i++ {
        go write(i);
    }

    wg.Wait()
}

func read(n int) {
    rw.RLock()
    fmt.Printf("读goroutine %d 正在读取...
",n)

    v := count

    fmt.Printf("读goroutine %d 读取结束,值为:%d
", n,v)
    wg.Done()
    rw.RUnlock()
}

func write(n int) {
    rw.Lock()
    fmt.Printf("写goroutine %d 正在写入...
",n)
    v := rand.Intn(1000)

    count = v

    fmt.Printf("写goroutine %d 写入结束,新值为:%d
", n,v)
    wg.Done()
    rw.Unlock()
}

 

输出 

读goroutine 2 正在读取...
读goroutine 2 读取结束,值为:0
写goroutine 4 正在写入...
写goroutine 4 写入结束,新值为:81
读goroutine 3 正在读取...
读goroutine 3 读取结束,值为:81
读goroutine 4 正在读取...
读goroutine 4 读取结束,值为:81
读goroutine 0 正在读取...
读goroutine 0 读取结束,值为:81
读goroutine 1 正在读取...
读goroutine 1 读取结束,值为:81
写goroutine 0 正在写入...
写goroutine 0 写入结束,新值为:887
写goroutine 1 正在写入...
写goroutine 1 写入结束,新值为:847
写goroutine 2 正在写入...
写goroutine 2 写入结束,新值为:59
写goroutine 3 正在写入...
写goroutine 3 写入结束,新值为:81

实现一个安全的Map

//安全的Map
type SynchronizedMap struct {
    rw *sync.RWMutex
    data map[interface{}]interface{}
}
//存储操作
func (sm *SynchronizedMap) Put(k,v interface{}){
    sm.rw.Lock()
    defer sm.rw.Unlock()

    sm.data[k]=v
}
//获取操作
func (sm *SynchronizedMap) Get(k interface{}) interface{}{
    sm.rw.RLock()
    defer sm.rw.RUnlock()

    return sm.data[k]
}

//删除操作
func (sm *SynchronizedMap) Delete(k interface{}) {
    sm.rw.Lock()
    defer sm.rw.Unlock()

    delete(sm.data,k)
}

//遍历Map,并且把遍历的值给回调函数,可以让调用者控制做任何事情
func (sm *SynchronizedMap) Each(cb func (interface{},interface{})){
    sm.rw.RLock()
    defer sm.rw.RUnlock()

    for k, v := range sm.data {
        cb(k,v)
    }
}

//生成初始化一个SynchronizedMap
func NewSynchronizedMap() *SynchronizedMap{
    return &SynchronizedMap{
        rw:new(sync.RWMutex),
        data:make(map[interface{}]interface{}),
    }
}

 

以上是关于Go语言实战 并发模式的主要内容,如果未能解决你的问题,请参考以下文章

《Go语言实战》摘录:7.3 并发模式 - work

《Go语言实战》摘录:7.1 并发模式 - runner

《Go语言实战》摘录:7.2 并发模式 - pool

《Go语言实战》摘录:7.2 并发模式 - pool

Go语言实战 并发模式

关于Go语言,你不得不知的并发模式!