GoLang协程与通道---下

Posted 大忽悠爱忽悠

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了GoLang协程与通道---下相关的知识,希望对你有一定的参考价值。

GoLang协程与通道---下


新旧模型对比:任务和worker

假设我们需要处理很多任务;一个worker处理一项任务。任务可以被定义为一个结构体(具体的细节在这里并不重要):

type Task struct 
    // some state

旧模式:使用共享内存进行同步

由各个任务组成的任务池共享内存;为了同步各个worker以及避免资源竞争,我们需要对任务池进行加锁保护:

    type Pool struct 
        Mu      sync.Mutex
        Tasks   []*Task
    

sync.Mutex:它用来在代码中保护临界区资源:同一时间只有一个go协程(goroutine)可以进入该临界区。

如果出现了同一时间多个go协程都进入了该临界区,则会产生竞争:Pool结构就不能保证被正确更新。在传统的模式中(经典的面向对象的语言中应用得比较多,比如C++,JAVA,C#),worker代码可能这样写:

func Worker(pool *Pool) 
    for 
        pool.Mu.Lock()
        // begin critical section:
        task := pool.Tasks[0]        // take the first task
        pool.Tasks = pool.Tasks[1:]  // update the pool of tasks
        // end critical section
        pool.Mu.Unlock()
        process(task)
    

这些worker有许多都可以并发执行;他们可以在go协程中启动。一个worker先将pool锁定,从pool获取第一项任务,再解锁和处理任务。加锁保证了同一时间只有一个go协程可以进入到pool中:一项任务有且只能被赋予一个worker。如果不加锁,则工作协程可能会在task:=pool.Tasks[0]发生切换,导致pool.Tasks=pool.Tasks[1:]结果异常:一些worker获取不到任务,而一些任务可能被多个worker得到。加锁实现同步的方式在工作协程比较少时可以工作的很好,但是当工作协程数量很大,任务量也很多时,处理效率将会因为频繁的加锁/解锁开销而降低。当工作协程数增加到一个阈值时,程序效率会急剧下降,这就成为了瓶颈。

待解决问题为多个协程同时从工作池中获取头部任务存在并发问题

新模式:使用通道

使用通道进行同步:使用一个通道接受需要处理的任务,一个通道接受处理完成的任务(及其结果)。worker在协程中启动,其数量N应该根据任务数量进行调整。

主线程扮演着Master节点角色,可能写成如下形式:

    func main() 
        pending, done := make(chan *Task), make(chan *Task)
        go sendWork(pending)       // put tasks with work on the channel
        for i := 0; i < N; i++    // start N goroutines to do work
            go Worker(pending, done)
        
        consumeWork(done)          // continue with the processed tasks
    

worker的逻辑比较简单:从pending通道拿任务,处理后将其放到done通道中:

    func Worker(in, out chan *Task) 
        for 
            t := <-in
            process(t)
            out <- t
        
    


这里并不使用锁:从通道得到新任务的过程没有任何竞争。随着任务数量增加,worker数量也应该相应增加,同时性能并不会像第一种方式那样下降明显。

在pending通道中存在一份任务的拷贝,第一个worker从pending通道中获得第一个任务并进行处理,这里并不存在竞争(对一个通道读数据和写数据的整个过程是原子性的)。某一个任务会在哪一个worker中被执行是不可知的,反过来也是。worker数量的增多也会增加通信的开销,这会对性能有轻微的影响。

从这个简单的例子中可能很难看出第二种模式的优势,但含有复杂锁运用的程序不仅在编写上显得困难,也不容易编写正确,使用第二种模式的话,就无需考虑这么复杂的东西了。

因此,第二种模式对比第一种模式而言,不仅性能是一个主要优势,而且还有个更大的优势:代码显得更清晰、更优雅。一个更符合go语言习惯的worker写法:

    func Worker(in, out chan *Task) 
        for 
            t := <-in
            process(t)
            out <- t
        
    

对于任何可以建模为Master-Worker范例的问题,一个类似于worker使用通道进行通信和交互、Master进行整体协调的方案都能完美解决。如果系统部署在多台机器上,各个机器上执行Worker协程,Master和Worker之间使用netchan或者RPC进行通信。

怎么选择是该使用锁还是通道?

通道是一个较新的概念,本节我们着重强调了在go协程里通道的使用,但这并不意味着经典的锁方法就不能使用。go语言让你可以根据实际问题进行选择:创建一个优雅、简单、可读性强、在大多数场景性能表现都能很好的方案。如果你的问题适合使用锁,也不要忌讳使用它。go语言注重实用,什么方式最能解决你的问题就用什么方式,而不是强迫你使用一种编码风格。下面列出一个普遍的经验法则:

使用锁的情景:

  • 访问共享数据结构中的缓存信息
  • 保存应用程序上下文和状态信息数据

使用通道的情景:

  • 与异步操作的结果进行交互
  • 分发任务
  • 传递数据所有权

当你发现你的锁使用规则变得很复杂时,可以反省使用通道会不会使问题变得简单些。


惰性生成器的实现

生成器是指当被调用时返回一个序列中下一个值的函数,例如:

    generateInteger() => 0
    generateInteger() => 1
    generateInteger() => 2
    ....

生成器每次返回的是序列中下一个值而非整个序列;这种特性也称之为惰性求值:只在你需要时进行求值,同时保留相关变量资源(内存和cpu):这是一项在需要时对表达式进行求值的技术。例如,生成一个无限数量的偶数序列:要产生这样一个序列并且在一个一个的使用可能会很困难,而且内存会溢出!但是一个含有通道和go协程的函数能轻易实现这个需求。

在下面的例子中,我们实现了一个使用 int 型通道来实现的生成器。通道被命名为yield和resume,这些词经常在协程代码中使用。

package main
import (
    "fmt"
)
var resume chan int

func integers() chan int 
    yield := make(chan int)
    count := 0
    go func() 
        for 
            yield <- count
            count++
        
    ()
    return yield


func generateInteger() int 
    return <-resume


func main() 
    resume = integers()
    fmt.Println(generateInteger())  //=> 0
    fmt.Println(generateInteger())  //=> 1
    fmt.Println(generateInteger())  //=> 2    

有一个细微的区别是从通道读取的值可能会是稍早前产生的,并不是在程序被调用时生成的。如果确实需要这样的行为,就得实现一个请求响应机制。当生成器生成数据的过程是计算密集型且各个结果的顺序并不重要时,那么就可以将生成器放入到go协程实现并行化。但是得小心,使用大量的go协程的开销可能会超过带来的性能增益。

这些原则可以概括为:通过巧妙地使用空接口、闭包和高阶函数,我们能实现一个通用的惰性生产器的工厂函数BuildLazyEvaluator(这个应该放在一个工具包中实现)。

工厂函数需要一个函数和一个初始状态作为输入参数,返回一个无参、返回值是生成序列的函数。传入的函数需要计算出下一个返回值以及下一个状态参数。在工厂函数中,创建一个通道和无限循环的go协程。返回值被放到了该通道中,返回函数稍后被调用时从该通道中取得该返回值。每当取得一个值时,下一个值即被计算。在下面的例子中,定义了一个evenFunc函数,其是一个惰性生成函数:在main函数中,我们创建了前10个偶数,每个都是通过调用even()函数取得下一个值的。为此,我们需要在BuildLazyIntEvaluator函数中具体化我们的生成函数,然后我们能够基于此做出定义。

package main

import (
    "fmt"
)

type Any interface

type EvalFunc func(Any) (Any, Any)

func main() 
    evenFunc := func(state Any) (Any, Any) 
        os := state.(int)
        ns := os + 2
        return os, ns
    
    even := BuildLazyIntEvaluator(evenFunc, 0)
    for i := 0; i < 10; i++ 
        fmt.Printf("%vth even: %v\\n", i, even())
    


func BuildLazyEvaluator(evalFunc EvalFunc, initState Any) func() Any 
    retValChan := make(chan Any)
    loopFunc := func() 
        var actState Any = initState
        var retVal Any
        for 
            retVal, actState = evalFunc(actState)
            retValChan <- retVal
        
    
    retFunc := func() Any 
        return <- retValChan
    
    go loopFunc()
    return retFunc


func BuildLazyIntEvaluator(evalFunc EvalFunc, initState Any) func() int 
    ef := BuildLazyEvaluator(evalFunc, initState)
    return func() int 
        return ef().(int)
    

输出:

0th even: 0
1th even: 2
2th even: 4
3th even: 6
4th even: 8
5th even: 10
6th even: 12
7th even: 14
8th even: 16
9th even: 18

提示:因为斐波那契数增长很迅速,使用uint64类型。 注:这种计算通常被定义为递归函数,但是在没有尾递归的语言中,例如go语言,这可能会导致栈溢出,但随着go语言中堆栈可扩展的优化,这个问题就不那么严重。这里的诀窍是使用了惰性求值。gccgo编译器在某些情况下会实现尾递归。


实现 Futures 模式

所谓Futures就是指:有时候在你使用某一个值之前需要先对其进行计算。这种情况下,你就可以在另一个处理器上进行该值的计算,到使用时,该值就已经计算完毕了。

Futures模式通过闭包和通道可以很容易实现,类似于生成器,不同地方在于Futures需要返回一个值。

参考条目文献给出了一个很精彩的例子:假设我们有一个矩阵类型,我们需要计算两个矩阵A和B乘积的逆,首先我们通过函数Inverse(M)分别对其进行求逆运算,再将结果相乘。如下函数InverseProduct()实现了如上过程:

func InverseProduct(a Matrix, b Matrix) 
    a_inv := Inverse(a)
    b_inv := Inverse(b)
    return Product(a_inv, b_inv)

在这个例子中,a和b的求逆矩阵需要先被计算。那么为什么在计算b的逆矩阵时,需要等待a的逆计算完成呢?显然不必要,这两个求逆运算其实可以并行执行的。换句话说,调用Product函数只需要等到a_inv和b_inv的计算完成。如下代码实现了并行计算方式:

func InverseProduct(a Matrix, b Matrix) 
    a_inv_future := InverseFuture(a)   // start as a goroutine
    b_inv_future := InverseFuture(b)   // start as a goroutine
    a_inv := <-a_inv_future
    b_inv := <-b_inv_future
    return Product(a_inv, b_inv)

InverseFuture函数以goroutine的形式起了一个闭包,该闭包会将矩阵求逆结果放入到future通道中:

func InverseFuture(a Matrix) chan Matrix 
    future := make(chan Matrix)
    go func() 
        future <- Inverse(a)
    ()
    return future

当开发一个计算密集型库时,使用Futures模式设计API接口是很有意义的。在你的包使用Futures模式,且能保持友好的API接口。此外,Futures可以通过一个异步的API暴露出来。这样你可以以最小的成本将包中的并行计算移到用户代码中。(参见参考文件18:http://www.golangpatterns.info/concurrency/futures


复用

典型的客户端/服务器(C/S)模式

客户端-服务器应用正是 goroutines 和 channels 的亮点所在。

客户端(Client)可以是运行在任意设备上的任意程序,它会按需发送请求(request)至服务器。服务器(Server)接收到这个请求后开始相应的工作,然后再将响应(response)返回给客户端。典型情况下一般是多个客户端(即多个请求)对应一个(或少量)服务器。例如我们日常使用的浏览器客户端,其功能就是向服务器请求网页。而Web服务器则会向浏览器响应网页数据。

使用Go的服务器通常会在协程中执行向客户端的响应,故而会对每一个客户端请求启动一个协程。一个常用的操作方法是客户端请求自身中包含一个通道,而服务器则向这个通道发送响应。

例如下面这个Request结构,其中内嵌了一个replyc通道。

type Request struct 
    a, b      int    
    replyc    chan int // reply channel inside the Request

或者更通俗的:

type Reply struct...
type Request struct
    arg1, arg2, arg3 some_type
    replyc chan *Reply

接下来先使用简单的形式,服务器会为每一个请求启动一个协程并在其中执行run()函数,此举会将类型为binOp的op操作返回的int值发送到replyc通道。

type binOp func(a, b int) int
func run(op binOp, req *Request) 
    req.replyc <- op(req.a, req.b)

server协程会无限循环以从chan *Request接收请求,并且为了避免被长时间操作所堵塞,它将为每一个请求启动一个协程来做具体的工作:

func server(op binOp, service chan *Request) 
    for 
        req := <-service; // requests arrive here  
        // start goroutine for request:        
        go run(op, req);  // don’t wait for op to complete    
    

server本身则是以协程的方式在startServer函数中启动:

func startServer(op binOp) chan *Request 
    reqChan := make(chan *Request);
    go server(op, reqChan);
    return reqChan;

startServer则会在main协程中被调用。

在以下测试例子中,100个请求会被发送到服务器,只有它们全部被送达后我们才会按相反的顺序检查响应:

func main() 
    adder := startServer(func(a, b int) int  return a + b )
    const N = 100
    var reqs [N]Request
    for i := 0; i < N; i++ 
        req := &reqs[i]
        req.a = i
        req.b = i + N
        req.replyc = make(chan int)
        adder <- req  // adder is a channel of requests
    
    // checks:
    for i := N - 1; i >= 0; i-- 
        // doesn’t matter what order
        if <-reqs[i].replyc != N+2*i 
            fmt.Println(“fail at”, i)
         else 
            fmt.Println(“Request “, i, “is ok!)
        
    
    fmt.Println(“done”)

输出:

Request 99 is ok!
Request 98 is ok!
...
Request 1 is ok!
Request 0 is ok!
done

这个程序仅启动了100个协程。然而即使执行100,000个协程我们也能在数秒内看到它完成。这说明了Go的协程是如何的轻量:如果我们启动相同数量的真实的线程,程序早就崩溃了。

完整示例如下:

package main
import "fmt"
type Request struct 
    a, b   int
    replyc chan int // reply channel inside the Request

type binOp func(a, b int) int
func run(op binOp, req *Request) 
    req.replyc <- op(req.a, req.b)

func server(op binOp, service chan *Request) 
    for 
        req := <-service // requests arrive here
        // start goroutine for request:
        go run(op, req) // don't wait for op
    

func startServer(op binOp) chan *Request 
    reqChan := make(chan *Request)
    go server(op, reqChan)
    return reqChan

func main() 
    adder := startServer(func(a, b int) int  return a + b )
    const N = 100
    var reqs [N]Request
    for i := 0; i < N; i++ 
        req := &reqs[i]
        req.a = i
        req.b = i + N
        req.replyc = make(chan int)
        adder <- req
    
    // checks:
    for i := N - 1; i >= 0; i--  // doesn't matter what order
        if <-reqs[i].replyc != N+2*i 
            fmt.Println("fail at", i)
         else 
            fmt.Println("Request ", i, " is ok!")
        
    
    fmt.Println("done")


卸载(Teardown):通过信号通道关闭服务器

在上一个版本中server在main函数返回后并没有完全关闭,而被强制结束了。为了改进这一点,我们可以提供一个退出通道给server:

func startServer(op binOp) (service chan *Request, quit chan bool) 
    service = make(chan *Request)
    quit = make(chan bool)
    go server(op, service, quit)
    return service, quit

server函数现在则使用select在service通道和quit通道之间做出选择:

func server(op binOp, service chan *request, quit chan bool) 
    for 
        select 
            case req := <-service:
                go run(op, req) 
            case <-quit:
                return   
        
    

当quit通道接收到一个true值时,server就会返回并结束。

在main函数中我们做出如下更改:

adder, quit := startServer(func(a, b int) int  return a + b )

在main函数的结尾处我们放入这一行:quit <- true

package main
import "fmt"
type Request struct 
    a, b   int
    replyc chan int // reply channel inside the Request

type binOp func(a, b int) int
func run(op binOp, req *Request) 
    req.replyc <- op(req.a, req.b)

func server(op binOp, service chan *Request, quit chan bool) 
    for 
        select 
        case req := <-service:
            go run(op, req)
        case <-quit:
            return
        
    

func startServer(op binOp) (service chan *Request, quit chan bool) 
    service = make(chan *Request)
    quit = make(chan bool)
    go server(op, service, quit)
    return service, quit

func main() 
    adder, quit := startServer(func(a, b int) int  return a + b )
    const N = 100
    var reqs [N]Request
    for i := 0; i < N; i++ 
        req := &reqs[i]
        req.a = i
        req.b = i + N
        req.replyc = make(chan int)
        adder <- req
    
    // checks:
    for i := N - 1; i >= 0; i--  // doesn't matter what order
        if <-reqs[i].replyc != N+2*i 
            fmt.Println("fail at", i)
         else 
            fmt.Println("Request ", i, " is ok!")
        
    
    quit <- true
    fmt.Println("done")


限制同时处理的请求数

使用带缓冲区的通道很容易实现这一点,其缓冲区容量就是同时处理请求的最大数量。程序max_tasks.go虽然没有做什么有用的事但是却包含了这个技巧:超过MAXREQS的请求将不会被同

以上是关于GoLang协程与通道---下的主要内容,如果未能解决你的问题,请参考以下文章

GoLang协程与通道---中

协程与Channels (CSP: Kotlin, Golang)

协程与通道

Go协程与协程池

Go协程与协程池

2020-08-20:GO语言中的协程与Python中的协程的区别?