go channel 也是通过共享内存和互斥锁来实现通信的

Posted douyacun

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了go channel 也是通过共享内存和互斥锁来实现通信的相关的知识,希望对你有一定的参考价值。


go语言提倡的并发模型就是:不要通过共享内存的方式进行通信,而是应该通过通信的方式共享内存

目录

mind map结构体调用栈makesendclosereceive

mind map


结构体

type hchan struct {
    qcount   uint           // total data in the queue
    dataqsiz uint           // size of the circular queue
  // buf数组,这里存储的就是发送的数据
    buf      unsafe.Pointer // points to an array of dataqsiz elements
    elemsize uint16
  // chan是否关闭
    closed   uint32
    elemtype *_type // element type
  // 数组写过队列应该明白就是为了让数组能转起来
    sendx    uint   // send index
    recvx    uint   // receive index
    // 这里维护等待 发送/接收 goroutine
    recvq    waitq  // list of recv waiters
    sendq    waitq  // list of send waiters
    // lock protects all fields in hchan, as well as several
    // fields in sudogs blocked on this channel.
    //
    // Do not change another G's status while holding this lock
    // (in particular, do not ready a G), as this can deadlock
    // with stack shrinking.
    lock mutex // 互斥锁,buf数组并不是线程安全的
}

调用栈

下面的内容都是源码分析不想深入的同学,就不要看了。chan有4个动作,相应的代码位置:

  1. make  -> runtime/chan.makechan

  2. send -> runtime/chan.chansend

  3. recieve -> runtime/chan.chanrecv

  4. closed -> runtime/chan.closechan

make

func makechan(t *chantype, size int) *hchan {
    elem := t.elem
    // compiler checks this but be safe.
  // 这里是编译器检测
    if elem.size >= 1<<16 {
        throw("makechan: invalid channel element type")
    }
    if hchanSize%maxAlign != 0 || elem.align > maxAlign {
        throw("makechan: bad alignment")
    }
    mem, overflow := math.MulUintptr(elem.size, uintptr(size))
    if overflow || mem > maxAlloc-hchanSize || size < 0 {
        panic(plainError("makechan: size out of range"))
    }
    // Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.
    // buf points into the same allocation, elemtype is persistent.
    // SudoG's are referenced from their owning thread so they can't be collected.
    // TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
    var c *hchan
    switch {
    case mem == 0:// 无缓存channel
        // Queue or element size is zero.
        c = (*hchan)(mallocgc(hchanSize, niltrue))// 无缓存channel就没必要申请buf
        // Race detector uses this location for synchronization.
        c.buf = c.raceaddr()
    case elem.kind&kindNoPointers != 0:// 有缓存channel
        // Elements do not contain pointers.
        // Allocate hchan and buf in one call.
    // 这里提到chan最好不要包含指针传递,并发的话会引起race
        c = (*hchan)(mallocgc(hchanSize+mem, niltrue))// 结构体的大小加上buf的大小
        c.buf = add(unsafe.Pointer(c), hchanSize)
    // 这里画一下内存大小,chan struct中的buf指向就是后面多申请的这一块buf内存
    // |chan struct|buf...|
    default:
        // Elements contain pointers.
        c = new(hchan)
        c.buf = mallocgc(mem, elem, true)
    }
    c.elemsize = uint16(elem.size)
    c.elemtype = elem
    c.dataqsiz = uint(size)
    if debugChan {
        print("makechan: chan=", c, "; elemsize=", elem.size, "; elemalg=", elem.alg, "; dataqsiz=", size, "\n")
    }
    return c
}

chan通道实际也是通过共享内存和互斥锁实现

chan最好不要传递指针

send

无缓冲channel send

// entry point for c <- x from compiled code
//go:nosplit
func chansend1(c *hchan, elem unsafe.Pointer) {
    chansend(c, elem, true, getcallerpc())
}

/*
 * generic single channel send/recv
 * If block is not nil,
 * then the protocol will not
 * sleep but return if it could
 * not complete.
 *
 * sleep can wake up with g.param == nil
 * when a channel involved in the sleep has
 * been closed.  it is easiest to loop and re-run
 * the operation; we'll see that it's now closed.
 */

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
  // 内部检查,避免一些低级的错误发生
    if c == nil {
        if !block {
            return false
        }
        gopark(nilnil, waitReasonChanSendNilChan, traceEvGoStop, 2)
        throw("unreachable")
    }
    if debugChan {
        print("chansend: chan=", c, "\n")
    }
    if raceenabled {
        racereadpc(c.raceaddr(), callerpc, funcPC(chansend))
    }

    // Fast path: check for failed non-blocking operation without acquiring the lock.
    //
  // 没有recv goroutine,就无法send
  // channel buf 满了也是无法再次发送  todo::做着的目的是为了啥
    if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) ||
        (c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {
        return false
    }
    var t0 int64
    if blockprofilerate > 0 {
        t0 = cputicks()
    }
  // buf array共享内存,互斥锁来避免数据竞争
    lock(&c.lock)
    // send on closed channel 这句话应该见到的挺多的了
    if c.closed != 0 {
        unlock(&c.lock)
        panic(plainError("send on closed channel"))
    }
  // 有goroutine等待接收channel,不需要进入buf,直接发送给等待接收的goroutine
    if sg := c.recvq.dequeue(); sg != nil {
        send(c, sg, ep, func() { unlock(&c.lock) }, 3// 这里是真正发送给接收线程的操作
        return true
    }
  // buf不满
    if c.qcount < c.dataqsiz {
        // chanbuf 获取当前,发送索引的内存地址
        qp := chanbuf(c, c.sendx)
        if raceenabled {
            raceacquire(qp)
            racerelease(qp)
        }
    // 这里是memmove(to, from, elem), 就是将ep(elem)数据写入到发送所有的地址
        typedmemmove(c.elemtype, qp, ep)
    // 接下来的操作就是:如何用数组来模拟一个队列
        c.sendx++
        if c.sendx == c.dataqsiz {
            c.sendx = 0
        }
        c.qcount++
    // 释放锁
        unlock(&c.lock)
        return true
    }

    if !block {
        unlock(&c.lock)
        return false
    }
    // buf满了,需要阻塞发送的goroutine
    // Block on the channel. Some receiver will complete our operation for us.
    gp := getg()
    mysg := acquireSudog()
    mysg.releasetime = 0
    if t0 != 0 {
        mysg.releasetime = -1
    }
    // No stack splits between assigning elem and enqueuing mysg
    // on gp.waiting where copystack can find it.
    mysg.elem = ep
    mysg.waitlink = nil
    mysg.g = gp
    mysg.isSelect = false
    mysg.c = c
    gp.waiting = mysg
    gp.param = nil
  // 这上面就是阻塞并序列化当前goroutine,放到等待发送的队列中,并释放锁
    c.sendq.enqueue(mysg)
  // 当前goroutine的运行状态:Grunning -> Gwaiting
    goparkunlock(&c.lock, waitReasonChanSend, traceEvGoBlockSend, 3)
    // Ensure the value being sent is kept alive until the
    // receiver copies it out. The sudog has a pointer to the
    // stack object, but sudogs aren't considered as roots of the
    // stack tracer.
  // keepalive可以确保ep在此之前不会被gc回收
    KeepAlive(ep)
    // someone woke us up.
    if mysg != gp.waiting {
        throw("G waiting list is corrupted")
    }
    gp.waiting = nil
    if gp.param == nil {
        if c.closed == 0 {
            throw("chansend: spurious wakeup")
        }
        panic(plainError("send on closed channel"))
    }
    gp.param = nil
    if mysg.releasetime > 0 {
        blockevent(mysg.releasetime-t0, 2)
    }
    mysg.c = nil
  // 释放goroutine
    releaseSudog(mysg)
    return true
}

close

func closechan(c *hchan) {
    if c == nil {
        panic(plainError("close of nil channel"))
    }
    // channel 加锁
    lock(&c.lock)
    if c.closed != 0 {
        unlock(&c.lock)
        panic(plainError("close of closed channel"))
    }
    if raceenabled {
        callerpc := getcallerpc()
        racewritepc(c.raceaddr(), callerpc, funcPC(closechan))
        racerelease(c.raceaddr())
    }
  // 标记channel已经关闭了
    c.closed = 1
    // 这里声明一个链表,收集send/recv队列中的G,方便
    var glist gList

    // release all readers
    for {
        sg := c.recvq.dequeue()
        if sg == nil {
            break
        }
        if sg.elem != nil {
            typedmemclr(c.elemtype, sg.elem)
            sg.elem = nil
        }
        if sg.releasetime != 0 {
            sg.releasetime = cputicks()
        }
        gp := sg.g
        gp.param = nil
        if raceenabled {
            raceacquireg(gp, c.raceaddr())
        }
    // glist收集所有等待接收的goroutine
        glist.push(gp)
    }

    // release all writers (they will panic)
    for {
        sg := c.sendq.dequeue()
        if sg == nil {
            break
        }
        sg.elem = nil
        if sg.releasetime != 0 {
            sg.releasetime = cputicks()
        }
        gp := sg.g
        gp.param = nil
        if raceenabled {
            raceacquireg(gp, c.raceaddr())
        }
    // // glist收集所有等待发送的goroutine
        glist.push(gp)
    }
  // channel解锁
    unlock(&c.lock)

    // Ready all Gs now that we've dropped the channel lock.
    for !glist.empty() {
        gp := glist.pop()
        gp.schedlink = 0
    // glist中的所有goroutine从Gwaiting -> Grunable 
        goready(gp, 3)
    }
}

receive

// chanrecv receives on channel c and writes the received data to ep.
// ep may be nil, in which case received data is ignored.
// If block == false and no elements are available, returns (false, false).
// Otherwise, if c is closed, zeros *ep and returns (true, false).
// Otherwise, fills in *ep with an element and returns (true, true).
// A non-nil ep must point to the heap or the caller's stack.
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
    // 这一块和发送的时候是一样的属于编译器的检查
    if debugChan {
        print("chanrecv: chan=", c, "\n")
    }
    if c == nil {
        if !block {
            return
        }
        gopark(nilnil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
        throw("unreachable")
    }
    if !block && (c.dataqsiz == 0 && c.sendq.first == nil ||
        c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&
        atomic.Load(&c.closed) == 0 {
        return
    }

    var t0 int64
    if blockprofilerate > 0 {
        t0 = cputicks()
    }
    // channel加锁
    lock(&c.lock)

    if c.closed != 0 && c.qcount == 0 {
        if raceenabled {
            raceacquire(c.raceaddr())
        }
        unlock(&c.lock)
        if ep != nil {
            typedmemclr(c.elemtype, ep)
        }
        return truefalse
    }
    // 给接收者发送element
    if sg := c.sendq.dequeue(); sg != nil {
        // Found a waiting sender. If buffer is size 0, receive value
        // directly from sender. Otherwise, receive from head of queue
        // and add sender's value to the tail of the queue (both map to
        // the same buffer slot because the queue is full).
        recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
        return truetrue
    }
    // buf不为空
    if c.qcount > 0 {
        // Receive directly from queue
        qp := chanbuf(c, c.recvx)
        if raceenabled {
            raceacquire(qp)
            racerelease(qp)
        }
        if ep != nil {
      // 将buf c.recvx位置的数据copy到ep内存上
            typedmemmove(c.elemtype, ep, qp)
        }
        typedmemclr(c.elemtype, qp)
    // 等价 c.recvx = (c.recvx + 1) % c.dataqsiz
        c.recvx++ 
        if c.recvx == c.dataqsiz {
            c.recvx = 0
        }
        c.qcount--
    // channel解锁
        unlock(&c.lock)
        return truetrue
    }

    if !block {
        unlock(&c.lock)
        return falsefalse
    }

    // no sender available: block on this channel.
  // 没有接收者,阻塞当前goroutine,压入c.recvq
    gp := getg()
    mysg := acquireSudog()
    mysg.releasetime = 0
    if t0 != 0 {
        mysg.releasetime = -1
    }
    // No stack splits between assigning elem and enqueuing mysg
    // on gp.waiting where copystack can find it.
    mysg.elem = ep
    mysg.waitlink = nil
    gp.waiting = mysg
    mysg.g = gp
    mysg.isSelect = false
    mysg.c = c
    gp.param = nil
    c.recvq.enqueue(mysg)
  // 当前goroutine的状态: Grunning -> Gwaiting
    goparkunlock(&c.lock, waitReasonChanReceive, traceEvGoBlockRecv, 3)
    // someone woke us up
    if mysg != gp.waiting {
        throw("G waiting list is corrupted")
    }
    gp.waiting = nil
    if mysg.releasetime > 0 {
        blockevent(mysg.releasetime-t0, 2)
    }
    closed := gp.param == nil
    gp.param = nil
    mysg.c = nil
  // 释放当前groutine
    releaseSudog(mysg)
    return true, !closed
}

以上是关于go channel 也是通过共享内存和互斥锁来实现通信的的主要内容,如果未能解决你的问题,请参考以下文章

十.Go并发编程--channel使用

Go语言并发编程

[golang] channel通道

从鹅厂实例出发!分析Go Channel底层原理

go channel 概述

云原生时代崛起的编程语言Go并发编程实战