Go基础并发编程

Posted Ricky_0528

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Go基础并发编程相关的知识,希望对你有一定的参考价值。

文章目录

1. 并发模型

任何语言的并行,到操作系统层面,都是内核线程的并行。同一个进程内的多个线程共享系统资源,进程的创建、销毁、切换比线程大很多。从进程到线程再到协程, 其实是一个不断共享, 不断减少切换成本的过程。


协程线程
创建数量轻松创建上百万个协程而不会导致系统资源衰竭通常最多不能超过1万个
内存占用初始分配4k堆栈,随着程序的执行自动增长删除创建线程时必须指定堆栈且是固定的,通常以M为单位
切换成本协程切换只需保存三个寄存器,耗时约200纳秒线程切换需要保存几十个寄存器,耗时约1000纳秒
调度方式非抢占式,由Go runtime主动交出控制权(对于开发者而言是抢占式)在时间片用完后,由 CPU 中断任务强行将其调度走,这时必须保存很多信息
创建销毁goroutine因为是由Go runtime负责管理的,创建和销毁的消耗非常小,是用户级的创建和销毁开销巨大,因为要和操作系统打交道,是内核级的,通常解决的办法就是线程池

查看逻辑核心数

fmt.Println(runtime.NumCPU())

Go语言的MPG并发模型

M(Machine)对应一个内核线程;P(Processor)虚拟处理器,代表M所需的上下文环境,是处理用户级代码逻辑的处理器;P的数量由环境变量中的GOMAXPROCS决定,默认情况下就是核数;G(Goroutine)本质上是轻量级的线程,G0正在执行,其他G在等待。M和内核线程的对应关系是确定的。G0阻塞(如系统调用)时,P与G0、M0解绑,P被挂到其他M上,然后继续执行G队列。G0解除阻塞后,如果有空闲的P,就绑定M0并执行G0;否则G0进入全局可运行队列(runqueue)。P会周期性扫描全局runqueue,使上面的G得到执行;如果全局runqueue为空,就从其他P的等待队列里偷一半G过来。

2. Goroutine的使用

启动协程的两种常见方式:

func Add(a, b int) int 
    fmt.Println("Add")
    return a + b

go Add(2, 4)
go func(a, b int) int 
	fmt.Println("add")
	return a + b
(2, 4)

优雅地等子协程结束:

wg := sync.WaitGroup
wg.Add(10) // 加10
for i := 0; i < 10; i++ 
	// 开10个子协程
	go func(a, b int) 
		defer wg.Done() // 减1
		//do something
	(i, i+1)

wg.Wait() // 等待减为0

父协程结束后,子协程并不会结束,main协程结束后,所有协程都会结束
向协程内传递变量

package main

import (
	"fmt"
	"time"
)

func main() 
	arr := []int1, 2, 3, 4
	for _, v := range arr 
		go func() 
			fmt.Printf("%d\\t", v) // 用的是协程外面的全局变量v,输出4 4 4 4
		()
	
	time.Sleep(time.Duration(1) * time.Second)
	fmt.Println()
	for _, v := range arr 
		go func(value int) 
			fmt.Printf("%d\\t", value) // 输出1 4 2 3
		(v) // 把v的副本传到协程内部
	
	time.Sleep(time.Duration(1) * time.Second)
	fmt.Println()

有时候需要确保在高并发的场景下有些事情只执行一次,比如加载配置文件、关闭管道等

var resource map[string]string
var loadResourceOnce sync.Once func LoadResource() 
	loadResourceOnce.Do(func() 
		resource["1"] = "A"
	)

单例模式

type Singleton struct 
var singleton *Singleton
var singletonOnce sync.Once
func GetSingletonInstance() *Singleton 
	singletonOnce.Do(func() 
		singleton = &Singleton
	)
	return singleton

何时会发生panic:

  • 运行时错误会导致panic,比如数组越界、除0
  • 程序主动调用panic(error)

panic会执行什么:

  • 逆序执行当前goroutine的defer链(recover从这里介入)
  • 打印错误信息和调用堆栈
  • 调用exit(2)结束整个进程

关于defer:

  • defer在函数退出前被调用,注意不是在代码的return语句之前执行,因为return语句不是原子操作
  • 如果发生panic,则之后注册的defer不会执行
  • defer服从先进后出原则,即一个函数里如果注册了多个defer,则按注册的逆序执行
  • defer后面可以跟一个匿名函数
func goo(x int) int 
	fmt.Printf("x=%d\\n", x)
	return x


func foo(a, b int, p bool) int 
	c := a*3 + 9
	// defer是先进后出,即逆序执行
	defer fmt.Println("first defer")
	d := c + 5
	defer fmt.Println("second defer")
	e := d / b // 如果发生panic,则后面的defer不会执行
	if p 
		panic(errors.New("my error")) // 主动panic
	
	defer fmt.Println("third defer")
	return goo(e) // defer是在函数临退出前执行,不是在代码的return语句之前执行,因为return语句不是原子操作

recover会阻断panic的执行

func soo(a, b int) 
	defer func() 
		// recover必须在defer中才能生效
		if err := recover(); err != nil 			
            fmt.Printf("soo函数中发生了panic:%s\\n", err)
		
	()
	panic(errors.New("my error"))

3. Channel的同步与异步

很多语言通过共享内存来实现线程间的通信,通过加锁来访问共享数据,如数组、map或结构体,go语言也实现了这种并发模型。

CSP(communicating sequential processes)讲究的是“以通信的方式来共享内存”,在go语言里channel是这种模式的具体实现。

异步管道:

asynChann := make(chan int, 8)

channel底层维护一个环形队列(先进先出),make初始化时指定队列的长度。队列满时,写阻塞;队列空时,读阻塞。sendx指向下一次写入的位置, recvx指向下一次读取的位置。 recvq维护因读管道而被阻塞的协程,sendq维护因写管道而被阻塞的协程。

同步管道可以认为队列容量为0,当读协程和写协程同时就绪时它们才会彼此帮对方解除阻塞

syncChann := make(chan int)

channel仅作为协程间同步的工具,不需要传递具体的数据,管道类型可以用struct。空结构体变量的内存占用为0,因此struct类型的管道比bool类型的管道还要省内存

sc := make(chan struct)
sc <- struct

关于channel的死锁与阻塞:

  • Channel满了,就阻塞写;Channel空了,就阻塞读
  • 阻塞之后会交出cpu,去执行其他协程,希望其他协程能帮自己解除阻塞
  • 如果阻塞发生在main协程里,并且没有其他子协程可以执行,那就可以确定“希望永远等不来”,自已把自己杀掉,报一个fatal error:deadlock出来
  • 如果阻塞发生在子协程里,就不会发生死锁,因为至少main协程是一个值得等待的“希望”,会一直等(阻塞)下去
package main

import (
	"fmt"
	"time"
)

func main() 
	ch := make(chan struct, 1)
	ch <- struct // 有1个缓冲可以用,无需阻塞,可以立即执行
	// 子协程1
	go func() 
		time.Sleep(5 * time.Second) // sleep一个很长的时间
		<-ch // 如果把本行代码注释掉,main协程5秒钟后会报fatal error
		fmt.Println("sub routine 1 over")
	()

	ch <- struct // 由于子协程1已经启动,寄希望于子协程1帮自己解除阻塞,所以会一直等子协程1执行结束,如果子协程1执行结束后没帮自己解除阻塞,则希望完全破灭,报出deadlock
	fmt.Println("send to channel in main routine")
	// 子协程2
	go func() 
		time.Sleep(2 * time.Second)
		ch <- struct // channel已满,子协程2会一直阻塞在这一行
		fmt.Println("sub routine 2 over")
	()
	time.Sleep(3 * time.Second)
	fmt.Println("main routine exit")

关闭channel

  • 只有当管道关闭时,才能通过range遍历管道里的数据,否则会发生fatal error
  • 管道关闭后读操作会立即返回,如果缓冲已空会返回“0值”
  • ele, ok := <-ch ok==true代表ele是管道里的真实数据
  • 向已关闭的管道里send数据会发生panic
  • 不能重复关闭管道,不能关闭值为nil的管道,否则都会panic
package main

import (
	"fmt"
	"time"
)

var cloch = make(chan int, 1)
var cloch2 = make(chan int, 1)

func traverseChannel() 
	for ele := range cloch 
		fmt.Printf("receive %d\\n", ele)
	
	fmt.Println()


func traverseChannel2() 
	for 
		if ele, ok := <-cloch2; ok  // ok==true代表管道还没有close
			fmt.Printf("receive %d\\n", ele)
		 else 
			// 管道关闭后,读操作会立即返回“0值”
			fmt.Printf("channel have been closed, receive %d\\n", ele)
			break
		
	


func main() 
	cloch <- 1
	close(cloch) // 如果不close就直接通过range遍历管道,会发生fatal error: all goroutines are asleep - deadlock!
	traverseChannel()
	fmt.Println("==================")
	go traverseChannel2()
	cloch2 <- 1
	close(cloch2)
	time.Sleep(10 * time.Millisecond)

channel在并发编程中有多种玩法,经常用channel来实现协程间的同步

package main

import (
	"fmt"
	"time"
)

func upstream(ch chan struct) 
	time.Sleep(15 * time.Millisecond)
	fmt.Println("一个上游协程执行结束")
	ch <- struct


func downstream(ch chan struct) 
	<-ch
	fmt.Println("下游协程开始执行")


func main() 
	upstreamNum := 4 // 上游协程的数量
	downstreamNum := 5 // 下游协程的数量

	upstreamCh := make(chan struct, upstreamNum)
	downstreamCh := make(chan struct, downstreamNum)

	// 启动上游协程和下游协程,实际下游协程会先阻塞
	for i := 0; i < upstreamNum; i++ 
		go upstream(upstreamCh)
	
	for i := 0; i < downstreamNum; i++ 
		go downstream(downstreamCh)
	

	// 同步点
	for i := 0; i < upstreamNum; i++ 
		<-upstreamCh
	

	// 通过管道让下游协程开始执行
	for i := 0; i < downstreamNum; i++ 
		downstreamCh <- struct
	

	time.Sleep(10 * time.Millisecond) // 等下游协程执行结束

4. 并发安全性

多协程并发修改同一块内存,产生资源竞争,go run或go build时添加-race参数检查资源竞争情况
n++不是原子操作,并发执行时会存在脏写,n++分为3步:取出n,加1,结果赋给n,测试时需要开1000个并发协程才能观察到脏写

func atomic.AddInt32(addr *int32, delta int32) (new int32)
func atomic.LoadInt32(addr *int32) (val int32)

把n++封装成原子操作,解除资源竞争,避免脏写

var lock sync.RWMutex // 声明读写锁,无需初始化
lock.Lock() lock.Unlock() // 加写锁和释放写锁
lock.RLock() lock.RUnlock()	// 加读锁和释放读锁

任意时刻只可以加一把写锁,且不能加读锁。没加写锁时,可以同时加多把读锁,读锁加上之后不能再加写锁

package main

import (
	"fmt"
	"sync"
	"sync/atomic"
)

var n int32 = 0
var lock sync.RWMutex

func inc1() 
	n++ // n++不是原子操作,它分为3步:取出n,加1,结果赋给n


func inc2() 
	atomic.AddInt32(&n, 1) // 封装成原子操作


func inc3() 
	lock.Lock() //加写锁
	n++  // 任一时刻,只有一个协程能进入临界区域
	lock.Unlock() // 释放写锁


func main() 
	const P = 1000 // 开大量协程才能把脏写问题测出来
	wg := sync.WaitGroup
	wg.Add(P)
	for i := 0; i < P; i++ 
		go func() 
			defer wg.Done()
			inc1()
		()
	
	wg.Wait()
	fmt.Printf("finally n=%d\\n", n) // 多运行几次,n经常不等于1000
	fmt.Println("===========================")
	n = 0 // 重置n
	wg = sync.WaitGroup
	wg.Add(P)
	for i := 0; i < P; i++ 
		go func() 
			defer wg.Done()
			inc2()
		()
	
	wg.Wait()
	fmt.Printf("finally n=%d\\n", atomic.LoadInt32(&n))
	fmt.Println("===========================")
	n = 0 // 重置n
	wg = sync.WaitGroup
	wg.Add(P)
	for i := 0; i < P; i++ 
		go func() 
			defer wg.Done()
			inc3()
		()
	
	wg.Wait()
	lock.RLock() // 加读锁,当写锁被其他协程持有时,加读锁操作将被阻塞;否则,如果其他协程持有读锁,加读锁操作不会被阻塞
	fmt.Printf("finally n=%d\\n", n)
	lock.RUnlock() // 释放读锁
	fmt.Println("===========================")

数组、slice、struct允许并发修改(可能会脏写),并发修改map有时会发生panic,如果需要并发修改map请使用sync.Map

package main

import (
	"fmt"
	"sync"
)

type Student struct 
	Name string
	Age  int32


var arr = [10]int
var m = sync.Map

func main() 
	wg := sync.WaitGroup
	wg.Add(2)
	// 写偶数位
	go func() 
		defer wg.Done()
		for i := 0; i < len(arr); i += 2 
			arr[i] = 0
		
	()
	// 写奇数位
	go func() 
		defer wg.Done()
		for i := 1; i < len(arr); i += 2 
			arr[i] = 1
		
	()
	wg.Wait()
	fmt.Println(arr) // 输出[0 1 0 1 0 1 0 1 0 1]
	fmt.Println("=======================")
	wg.Add(2)
	var stu Student
	go func() 
		defer wg.Done()
		stu.Name = "Fred"
	()
	go func() 
		defer wg.Done()
		stu.Age = 20
	()
	wg.Wait()
	fmt.Printf("%s %d\\n", stu.Name, stu.Age)
	fmt.Println("=======================")
	wg.Add(2)
	go func() 
		defer wg.Done()
		m.Store("k1", "v1")
	()
	go func() 
		defer wg.Done()
		m.Store1.5 Go微服务实战(Go语言基础) --- 并发编程

Go基础并发编程

Go基础并发编程

Go语言基础之并发

go 并发编程

Go语言基础之并发