Go语言之Go语言并发
Posted heych
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Go语言之Go语言并发相关的知识,希望对你有一定的参考价值。
Go 语言并发
Golang从语言层面就对并发提供了支持,而goruntine是Go语言并发设计的核心。
Go语言的并发机制运用起来非常舒适,在启动并发的方式上直接添加了语言级的关键字就可以实现,和其他编程语言相比更加轻量。
进程&线程
A、进程是程序在操作系统中的一次执行过程,系统进行资源分配和调度的一个独立单位。
B、线程是进程的一个执行实体,是CPU调度和分派的基本单位,它是比进程更小的能独立运行的基本单位。
C、一个进程可以创建和撤销多个线程;同一个进程中的多个线程之间可以并发执行。
并发&并行
A、多线程程序在一个核的cpu上运行,就是并发。
B、多线程程序在多个核的cpu上运行,就是并行。
并发不是并行:
并发主要由切换时间片来实现"同时"运行,并行则是直接利用多核实现多线程的运行,Go程序可以设置使用核数,以发挥多核计算机的能力。
协程&线程
协程:独立的栈空间,共享堆空间,调度由用户自己控制,本质上有点类似于用户级线程,这些用户级线程的调度也是自己实现的。
线程:一个线程上可以跑多个协程,协程是轻量级的线程。
Goroutine 介绍
goroutine 只是由官方实现的超级"线程池"。每个实力4~5KB的栈内存占用和由于实现机制而大幅减少的创建和销毁开销是Go语言高并发的根本原因。
goroutine 奉行通过通信来共享内存,而不是共享内存来通信。只需在函数调用语句前添加 go 关键字,就可创建并发执行单元。开发人员无需了解任何执行细节,调度器会自动将其安排到合适的系统线程上执行。goroutine 是一种非常轻量级的实现,可在单个进程里执行成千上万的并发任务。
事实上,入口函数 main 就以 goroutine 运行。另有与之配套的 channel 类型,用以实现 "以通讯来共享内存" 的 CSP 模式。
goroutine 是通过 Go 的 runtime管理的一个线程管理器
package main
import (
"fmt"
"time"
)
func main() {
go func() {
fmt.Println("hello word")
}()
time.Sleep(1 * time.Second)
}
进入 main 函数开启一个 goroutine 运行匿名函数函数体内容:fmt.Println("Hello, World!") 。主线程执行 time.Sleep(1 * time.Second) 等待 1 秒。goroutine 执行完毕回到主线程,主线程的sleep 完成结束程序。
注意:若去掉 time.Sleep(1 * time.Second) 这段代码,进入 main 函数开启一个 goroutine,没等 goroutine 运行匿名函数函数体内容,主线程已经完成结束程序。
Go语言Chan应用
Channel 是 CSP 模式的具体实现,用于多个 goroutine 通讯。其内部实现了同步,确保并发安全。
Channel 是先进先出,线程安全的,多个goroutine同时访问,不需要加锁。
chan 阻塞
我们定义的管道 intChan 容量是5,开启 goroutine 写入10条数据,在写满5条数据时会阻塞,而 read() 每秒会从 intChan 中读取一条,然后write() 再会写入一条数据。
package main
import (
"fmt"
"time"
)
func write(ch chan int) {
for i := 0; i < 10; i++ {
ch <- i
fmt.Println("write data:", i)
}
}
func read(ch chan int) {
for {
i := <-ch
fmt.Println("read data:", i)
time.Sleep(time.Second)
}
}
func main() {
intChan := make(chan int, 5)
go write(intChan)
go read(intChan)
time.Sleep(10 * time.Second)
}
同步模式
默认为同步模式,需要发送和接收配对。否则会被阻塞,直到另一方准备好后被唤醒。
package main
import "fmt"
func main() {
data := make(chan string) // 数据交换队列
exit := make(chan bool) // 退出通知
go func() {
for d := range data { // 从队列迭代接收数据,直到 close 。
fmt.Println(d)
}
fmt.Println("received over")
exit <- true // 发出退出通知。
}()
data <- "oldboy" // 发送数据。
data <- "Linux"
data <- "GOlang"
data <- "python"
close(data) // 关闭队列。
fmt.Println("send over")
<-exit // 等待退出通知。
}
异步模式
异步方式通过判断缓冲区来决定是否阻塞。如果缓冲区已满,发送被阻塞;缓冲区为空,接收被阻塞。
通常情况下,异步 channel 可减少排队阻塞,具备更高的效率。但应该考虑使用指针规避大对象拷贝,将多个元素打包,减小缓冲区大小。
package main
import "fmt"
func main() {
data := make(chan string, 3) // 缓冲区可以存储 3 个元素
exit := make(chan bool)
data <- "old boy" // 在缓冲区未满前,不会阻塞。
data <- "python"
data <- "linux"
go func() {
for d := range data { // 在缓冲区未空前,不会阻塞。
fmt.Println(d)
}
// 表示读取出data通道中数据
exit <- true
}()
data <- "java" // 如果缓冲区已满,阻塞。
data <- "C"
close(data)
<-exit
}
chan 选择
如果需要同时处理多个 channel,可使用 select 语句。它随机选择一个可用 channel 做收发操作,或执行 default case。
用 select 实现超时控制:
package main
import (
"fmt"
"time"
)
func main() {
exit := make(chan bool)
intChan := make(chan int, 2)
strChan := make(chan string, 2)
go func() {
select {
case vi := <-intChan:
fmt.Println(vi)
case vs := <-strChan:
fmt.Println(vs)
case <-time.After(time.Second * 3):
fmt.Println("timeout.")
}
exit <- true
}()
// intChan <- 100 // 注释掉,引发 timeout。
// strChan <- "oldboy"
<-exit
}
在循环中使用 select default case 需要小心,避免形成洪水。
简单工厂模式
用简单工厂模式打包并发任务和 channel。
package main
import (
"fmt"
"math/rand"
"time"
)
func NewTest() chan int {
c := make(chan int)
rand.Seed(time.Now().UnixNano())
go func() {
time.Sleep(time.Second)
c <- rand.Int()
}()
return c
}
func main() {
t := NewTest()
fmt.Println(<-t) // 等待 goroutine 结束返回。
}
Go 语言WaitGroup
WaitGroup能够一直等到所有的goroutine执行完成,并且阻塞主线程的执行,直到所有的goroutine执行完成。
WaitGroup总共有三个方法:Add(delta int),Done(),Wait()。简单的说一下这三个方法的作用。
Add:添加或者减少等待goroutine的数量;
Done:相当于Add(-1);
Wait:执行阻塞,直到所有的WaitGroup数量变成 0;
WaitGroup用于线程同步,WaitGroup等待一组线程集合完成,才会继续向下执行。 主线程(goroutine)调用Add来设置等待的线程(goroutine)数量。 然后每个线程(goroutine)运行,并在完成后调用Done。 同时,Wait用来阻塞,直到所有线程(goroutine)完成才会向下执行。
WaitGroup实例如下:
package main
import (
"fmt"
"sync"
"time"
)
func main() {
var wg sync.WaitGroup
for i := 0; i < 5; i++ {
wg.Add(1)
go func(n int) {
// defer wg.Done()
defer wg.Add(-1)
EchoNum(n)
}(i)
}
wg.Wait()
}
func EchoNum(i int) {
time.Sleep(time.Second)
fmt.Println(i)
}
程序中将每次循环的数量 sleep 1 秒钟后输出。如果程序不使用WaitGroup,将不会输出结果。因为goroutine还没执行完,主线程已经执行完毕。
注掉的 defer wg.Done() 和 defer wg.Add(-1) 作用一样。
WaitGroup应用
一、用 channel 实现信号量 (semaphore)。
package main
import (
"sync"
)
func main() {
wg := sync.WaitGroup{}
wg.Add(3) //增加三个线程
sem := make(chan int, 1)
for i := 0; i < 3; i++ {
go func(id int) {
defer wg.Done() //减少一个线程
sem <- 1 // 向 sem 发送数据,阻塞或者成功。
for x := 0; x < 3; x++ {
println(id, x)
}
<-sem // 接收数据,使得其他阻塞 goroutine 可以发送数据
}(i)
}
wg.Wait()
}
D:goprogramgosrcday10
λ go run test.go
0 0
0 1
0 2
1 0
1 1
1 2
2 0
2 1
2 2
二、用 closed channel 发出退出通知。
package main
import (
"sync"
"time"
)
func main() {
wg := sync.WaitGroup{}
exit := make(chan bool)
for i := 0; i < 2; i++ {
wg.Add(1)
go func(n int) {
defer wg.Done()
task := func() {
println(n, time.Now().String())
time.Sleep(1 * time.Second)
}
for {
select {
case <-exit: // closed channel 不会阻塞,因此可用作退出通知。
return
default: // 执行正常任务。
task()
}
}
}(i)
}
time.Sleep(time.Second * 3) // 让测试 goroutine 运行一会。
close(exit) // 发出退出通知。
wg.Wait()
}
WaitGroup陷阱
一、add 数量小于done数量导致 WaitGroup数量为负数
package main
import (
"fmt"
"sync"
"time"
)
func main() {
var wg sync.WaitGroup
wg.Add(1)
oldboy := func() {
time.Sleep(time.Second)
fmt.Println("The old boy welcomes you.")
wg.Done()
}
go oldboy()
go oldboy()
go oldboy()
time.Sleep(time.Second * 3)
wg.Wait()
}
运行错误:
panic: sync: negative WaitGroup counter
二、add 数量大于 done 数量造成 deadlock
package main
import (
"fmt"
"sync"
"time"
)
func main() {
var wg sync.WaitGroup
wg.Add(4)
oldboy := func() {
time.Sleep(time.Second)
fmt.Println("The old boy welcomes you.")
wg.Done()
}
go oldboy()
go oldboy()
go oldboy()
time.Sleep(time.Second * 3)
wg.Wait()
}
运行错误:
fatal error: all goroutines are asleep - deadlock!
三、跳过 add 和 Done 操作,直接执行 Wait
package main
import (
"fmt"
"sync"
)
func main() {
wg := sync.WaitGroup{}
for i := 0; i < 5; i++ {
go func(wg sync.WaitGroup, i int) {
wg.Add(1)
fmt.Printf("i=>%d
", i)
wg.Done()
}(wg, i)
}
wg.Wait()
fmt.Println("exit")
}
WaitGroup 同步的是 goroutine, 而上面的代码却在 goroutine 中进行 Add(1) 操作。因此,可能在这些 goroutine 还没来得及 Add(1) 就已经执行 Wait 操作了。
四、WaitGroup 拷贝传值问题
package main
import (
"fmt"
"sync"
)
func main() {
wg := sync.WaitGroup{}
for i := 0; i < 5; i++ {
wg.Add(1)
go func(wg sync.WaitGroup, i int) {
fmt.Printf("i=>%d
", i)
wg.Done()
}(wg, i)
}
wg.Wait()
}
运行错误:
fatal error: all goroutines are asleep - deadlock!
wg 给拷贝传递到了 goroutine 中,导致只有 Add 操作,其实 Done操作是在 wg 的副本执行的,因此 Wait 就死锁了。
正确代码实例如下:
package main
import (
"fmt"
"sync"
)
func main() {
wg := new(sync.WaitGroup)
// wg := &sync.WaitGroup{}
for i := 0; i < 5; i++ {
wg.Add(1)
go func(wg *sync.WaitGroup, i int) {
fmt.Printf("i=>%d
", i)
wg.Done()
}(wg, i)
}
wg.Wait()
}
Go 语言runtime
runtime包提供Go语言运行时的系统交互的操作,例如控制goruntine的功能。
调度器不能保证多个 goroutine 执行次序,且进程退出时不会等待它们结束。
默认情况下,进程启动后仅允许一个系统线程服务于 goroutine。可使用环境变量或标准库函数 runtime.GOMAXPROCS 修改,让调度器用多个线程实现多核并行,而不仅仅是并发。
runtime包常用方法
const GOOS string = theGoos
GOOS是可执行程序的目标操作系统(将要在该操作系统的机器上执行):darwin、freebsd、linux等。
func Gosched()
Gosched使当前go程放弃处理器,以让其它go程运行。它不会挂起当前go程,因此当前go程未来会恢复执行。
func NumCPU() int
NumCPU返回本地机器的逻辑CPU个数。
func GOROOT() string
GOROOT返回Go的根目录。如果存在GOROOT环境变量,返回该变量的值;否则,返回创建Go时的根目录。
func GOMAXPROCS(n int) int
GOMAXPROCS设置可同时执行的最大CPU数,并返回先前的设置。 若 n < 1,它就不会更改当前设置。本地机器的逻辑CPU数可通过 NumCPU 查询。本函数在调度程序优化后会去掉。
func Goexit()
Goexit终止调用它的go程。其它go程不会受影响。Goexit会在终止该go程前执行所有defer的函数。
在程序的main go程调用本函数,会终结该go程,而不会让main返回。因为main函数没有返回,程序会继续执行其它的go程。如果所有其它go程都退出了,程序就会崩溃。
func NumGoroutine() int
NumGoroutine返回当前存在的Go程数。
runtime包应用
一、查看机器的逻辑CPU个数、Go的根目录、操作系统
package main
import "runtime"
func main() {
println("cpu:", runtime.NumCPU())
println("go:", runtime.GOROOT())
println("操作系统:", runtime.GOOS)
}
D:goprogramgosrcday10
λ go run test.go
cpu: 4
go: D:gogo
操作系统: windows
二、GOMAXPROCS 设置golang运行的cpu核数
Golang 默认所有任务都运行在一个 cpu 核里,如果要在 goroutine 中使用多核,可以使用 runtime.GOMAXPROCS 函数修改,当参数小于 1 时使用默认值。
package main
import (
"fmt"
"runtime"
)
var (
signal = false
)
func oldboy() {
signal = true
}
func init() {
runtime.GOMAXPROCS(1)
}
func main() {
go oldboy()
for {
if signal {
break
}
}
fmt.Println("end")
}
上述代码单核执行如果for前面或者中间不延迟,主线程不会让出CPU,导致异步的线程无法执行,从而无法设置signal的值,从而出现死循环。
运行的cpu核数设置成2核
package main
import (
"fmt"
"runtime"
)
var (
signal = false
)
func oldboy() {
signal = true
}
func init() {
runtime.GOMAXPROCS(2)
}
func main() {
go oldboy()
for {
if signal {
break
}
}
fmt.Println("end")
}
运行结果:
end
三、Gosched 让当前的 goroutine 让出 CPU
这个函数的作用是让当前 goroutine 让出 CPU,当一个 goroutine 发生阻塞,Go 会自动地把与该 goroutine 处于同一系统线程的其他 goroutine 转移到另一个系统线程上去,以使这些 goroutine 不阻塞。当前的 goroutine 不会挂起,当前的 goroutine 程未来会恢复执行。
runtime.Gosched()用于让出CPU时间片。这就像跑接力赛,A跑了一会碰到代码runtime.Gosched()就把接力棒交给B了,A歇着了,B继续跑。
package main
import (
"runtime"
"sync"
)
func main() {
wg := new(sync.WaitGroup)
wg.Add(1)
go func() {
for i := 0; i < 6; i++ {
println(i)
runtime.Gosched()
}
defer wg.Done()
}()
for i := 0; i < 6; i++ {
wg.Add(1)
go func() {
defer wg.Done()
println("Hello.Golang!")
}()
}
wg.Wait()
}
D:goprogramgosrcday10
λ go run test.go
0
1
2
3
4
5
Hello.Golang!
Hello.Golang!
Hello.Golang!
Hello.Golang!
Hello.Golang!
Hello.Golang!
四、Goexit 终止当前 goroutine 执行
调用 runtime.Goexit 将立即终止当前 goroutine 执行,调度器确保所有已注册 defer 延迟调用被执行。
package main
import (
"fmt"
"runtime"
"sync"
)
func main() {
wg := new(sync.WaitGroup)
wg.Add(1)
go func() {
defer wg.Done()
defer fmt.Println("A.defer")
func() {
defer fmt.Println("B.defer")
runtime.Goexit() // 终止当前 goroutine
fmt.Println("B") // 不会执行
}()
fmt.Println("A") // 不会执行
}()
wg.Wait()
}
B.defer
A.defer
以上是关于Go语言之Go语言并发的主要内容,如果未能解决你的问题,请参考以下文章