Go语言之Go语言并发

Posted heych

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Go语言之Go语言并发相关的知识,希望对你有一定的参考价值。

Go 语言并发

Golang从语言层面就对并发提供了支持,而goruntine是Go语言并发设计的核心。

Go语言的并发机制运用起来非常舒适,在启动并发的方式上直接添加了语言级的关键字就可以实现,和其他编程语言相比更加轻量。

进程&线程

A、进程是程序在操作系统中的一次执行过程,系统进行资源分配和调度的一个独立单位。

B、线程是进程的一个执行实体,是CPU调度和分派的基本单位,它是比进程更小的能独立运行的基本单位。

C、一个进程可以创建和撤销多个线程;同一个进程中的多个线程之间可以并发执行。

并发&并行

A、多线程程序在一个核的cpu上运行,就是并发。

B、多线程程序在多个核的cpu上运行,就是并行。

并发不是并行:

并发主要由切换时间片来实现"同时"运行,并行则是直接利用多核实现多线程的运行,Go程序可以设置使用核数,以发挥多核计算机的能力。

协程&线程

协程:独立的栈空间,共享堆空间,调度由用户自己控制,本质上有点类似于用户级线程,这些用户级线程的调度也是自己实现的。

线程:一个线程上可以跑多个协程,协程是轻量级的线程。

Goroutine 介绍

goroutine 只是由官方实现的超级"线程池"。每个实力4~5KB的栈内存占用和由于实现机制而大幅减少的创建和销毁开销是Go语言高并发的根本原因。

goroutine 奉行通过通信来共享内存,而不是共享内存来通信。只需在函数调用语句前添加 go 关键字,就可创建并发执行单元。开发人员无需了解任何执行细节,调度器会自动将其安排到合适的系统线程上执行。goroutine 是一种非常轻量级的实现,可在单个进程里执行成千上万的并发任务。

事实上,入口函数 main 就以 goroutine 运行。另有与之配套的 channel 类型,用以实现 "以通讯来共享内存" 的 CSP 模式。

goroutine 是通过 Go 的 runtime管理的一个线程管理器

package main

import (
	"fmt"
	"time"
)

func main() {
	go func() {
		fmt.Println("hello word")
	}()
	time.Sleep(1 * time.Second)
}

进入 main 函数开启一个 goroutine 运行匿名函数函数体内容:fmt.Println("Hello, World!") 。主线程执行 time.Sleep(1 * time.Second) 等待 1 秒。goroutine 执行完毕回到主线程,主线程的sleep 完成结束程序。

注意:若去掉 time.Sleep(1 * time.Second) 这段代码,进入 main 函数开启一个 goroutine,没等 goroutine 运行匿名函数函数体内容,主线程已经完成结束程序。

Go语言Chan应用

Channel 是 CSP 模式的具体实现,用于多个 goroutine 通讯。其内部实现了同步,确保并发安全。
Channel 是先进先出,线程安全的,多个goroutine同时访问,不需要加锁。

chan 阻塞

我们定义的管道 intChan 容量是5,开启 goroutine 写入10条数据,在写满5条数据时会阻塞,而 read() 每秒会从 intChan 中读取一条,然后write() 再会写入一条数据。

package main

import (
	"fmt"
	"time"
)

func write(ch chan int) {
	for i := 0; i < 10; i++ {
		ch <- i
		fmt.Println("write data:", i)
	}
}
func read(ch chan int) {
	for {
		i := <-ch
		fmt.Println("read data:", i)
		time.Sleep(time.Second)
	}
}
func main() {
	intChan := make(chan int, 5)
	go write(intChan)
	go read(intChan)

	time.Sleep(10 * time.Second)
}

同步模式

默认为同步模式,需要发送和接收配对。否则会被阻塞,直到另一方准备好后被唤醒。

package main

import "fmt"

func main() {
	data := make(chan string) // 数据交换队列
	exit := make(chan bool)   // 退出通知

	go func() {
		for d := range data { // 从队列迭代接收数据,直到 close 。
			fmt.Println(d)
		}
		fmt.Println("received over")
		exit <- true // 发出退出通知。
	}()
	data <- "oldboy" // 发送数据。
	data <- "Linux"
	data <- "GOlang"
	data <- "python"
	close(data) // 关闭队列。
	fmt.Println("send over")
	<-exit // 等待退出通知。
}

异步模式

异步方式通过判断缓冲区来决定是否阻塞。如果缓冲区已满,发送被阻塞;缓冲区为空,接收被阻塞。

通常情况下,异步 channel 可减少排队阻塞,具备更高的效率。但应该考虑使用指针规避大对象拷贝,将多个元素打包,减小缓冲区大小。

package main

import "fmt"

func main() {
	data := make(chan string, 3) // 缓冲区可以存储 3 个元素
	exit := make(chan bool)

	data <- "old boy" // 在缓冲区未满前,不会阻塞。
	data <- "python"
	data <- "linux"

	go func() {
		for d := range data { // 在缓冲区未空前,不会阻塞。
			fmt.Println(d)
		}
		//  表示读取出data通道中数据
		exit <- true
	}()
	data <- "java" // 如果缓冲区已满,阻塞。
	data <- "C"
	close(data)
	<-exit
}

chan 选择

如果需要同时处理多个 channel,可使用 select 语句。它随机选择一个可用 channel 做收发操作,或执行 default case。

用 select 实现超时控制:

package main

import (
	"fmt"
	"time"
)

func main() {
	exit := make(chan bool)
	intChan := make(chan int, 2)
	strChan := make(chan string, 2)

	go func() {
		select {
		case vi := <-intChan:
			fmt.Println(vi)
		case vs := <-strChan:
			fmt.Println(vs)
		case <-time.After(time.Second * 3):
			fmt.Println("timeout.")
		}

		exit <- true
	}()

	// intChan <- 100 // 注释掉,引发 timeout。
	// strChan <- "oldboy"

	<-exit
}

在循环中使用 select default case 需要小心,避免形成洪水。

简单工厂模式

用简单工厂模式打包并发任务和 channel。

package main

import (
	"fmt"
	"math/rand"
	"time"
)

func NewTest() chan int {
	c := make(chan int)
	rand.Seed(time.Now().UnixNano())
	go func() {
		time.Sleep(time.Second)
		c <- rand.Int()
	}()
	return c
}
func main() {
	t := NewTest()
	fmt.Println(<-t) // 等待 goroutine 结束返回。
}

Go 语言WaitGroup

WaitGroup能够一直等到所有的goroutine执行完成,并且阻塞主线程的执行,直到所有的goroutine执行完成。

WaitGroup总共有三个方法:Add(delta int),Done(),Wait()。简单的说一下这三个方法的作用。

Add:添加或者减少等待goroutine的数量;

Done:相当于Add(-1);

Wait:执行阻塞,直到所有的WaitGroup数量变成 0;

WaitGroup用于线程同步,WaitGroup等待一组线程集合完成,才会继续向下执行。 主线程(goroutine)调用Add来设置等待的线程(goroutine)数量。 然后每个线程(goroutine)运行,并在完成后调用Done。 同时,Wait用来阻塞,直到所有线程(goroutine)完成才会向下执行。

WaitGroup实例如下:

package main

import (
    "fmt"
    "sync"
    "time"
)

func main() {
    var wg sync.WaitGroup
    for i := 0; i < 5; i++ {
        wg.Add(1)
        go func(n int) {
            // defer wg.Done()
            defer wg.Add(-1)
            EchoNum(n)
        }(i)
    }
    wg.Wait()
}

func EchoNum(i int) {
    time.Sleep(time.Second)
    fmt.Println(i)
}

程序中将每次循环的数量 sleep 1 秒钟后输出。如果程序不使用WaitGroup,将不会输出结果。因为goroutine还没执行完,主线程已经执行完毕。
注掉的 defer wg.Done() 和 defer wg.Add(-1) 作用一样。

WaitGroup应用

一、用 channel 实现信号量 (semaphore)。

package main

import (
	"sync"
)

func main() {
	wg := sync.WaitGroup{}
	wg.Add(3) //增加三个线程
	sem := make(chan int, 1)
	for i := 0; i < 3; i++ {
		go func(id int) {
			defer wg.Done() //减少一个线程
			sem <- 1        // 向 sem 发送数据,阻塞或者成功。
			for x := 0; x < 3; x++ {
				println(id, x)
			}
			<-sem // 接收数据,使得其他阻塞 goroutine 可以发送数据
		}(i)
	}
	wg.Wait()

}

D:goprogramgosrcday10       
λ go run test.go                
0 0                             
0 1                             
0 2                             
1 0                             
1 1                             
1 2                             
2 0                             
2 1                             
2 2                             

二、用 closed channel 发出退出通知。

package main

import (
	"sync"
	"time"
)

func main() {
	wg := sync.WaitGroup{}
	exit := make(chan bool)
	for i := 0; i < 2; i++ {
		wg.Add(1)
		go func(n int) {
			defer wg.Done()
			task := func() {
				println(n, time.Now().String())
				time.Sleep(1 * time.Second)
			}
			for {
				select {
				case <-exit: // closed channel 不会阻塞,因此可用作退出通知。
					return
				default: // 执行正常任务。
					task()
				}
			}
		}(i)
	}
	time.Sleep(time.Second * 3) // 让测试 goroutine 运行一会。
	close(exit)                 // 发出退出通知。
	wg.Wait()
}

WaitGroup陷阱

一、add 数量小于done数量导致 WaitGroup数量为负数

package main

import (
    "fmt"
    "sync"
    "time"
)

func main() {
    var wg sync.WaitGroup
    wg.Add(1)

    oldboy := func() {
        time.Sleep(time.Second)
        fmt.Println("The old boy welcomes you.")
        wg.Done()
    }

    go oldboy()
    go oldboy()
    go oldboy()

    time.Sleep(time.Second * 3)
    wg.Wait()
}

运行错误:

panic: sync: negative WaitGroup counter

二、add 数量大于 done 数量造成 deadlock

package main

import (
    "fmt"
    "sync"
    "time"
)

func main() {
    var wg sync.WaitGroup
    wg.Add(4)

    oldboy := func() {
        time.Sleep(time.Second)
        fmt.Println("The old boy welcomes you.")
        wg.Done()
    }

    go oldboy()
    go oldboy()
    go oldboy()

    time.Sleep(time.Second * 3)
    wg.Wait()
}

运行错误:

fatal error: all goroutines are asleep - deadlock!

三、跳过 add 和 Done 操作,直接执行 Wait

package main

import (
    "fmt"
    "sync"
)

func main() {
    wg := sync.WaitGroup{}

    for i := 0; i < 5; i++ {
        go func(wg sync.WaitGroup, i int) {
            wg.Add(1)
            fmt.Printf("i=>%d
", i)
            wg.Done()
        }(wg, i)
    }
    wg.Wait()
    fmt.Println("exit")
}

WaitGroup 同步的是 goroutine, 而上面的代码却在 goroutine 中进行 Add(1) 操作。因此,可能在这些 goroutine 还没来得及 Add(1) 就已经执行 Wait 操作了。

四、WaitGroup 拷贝传值问题

package main

import (
    "fmt"

    "sync"
)

func main() {
    wg := sync.WaitGroup{}

    for i := 0; i < 5; i++ {
        wg.Add(1)
        go func(wg sync.WaitGroup, i int) {
            fmt.Printf("i=>%d
", i)
            wg.Done()
        }(wg, i)
    }
    wg.Wait()
}

运行错误:

fatal error: all goroutines are asleep - deadlock!

wg 给拷贝传递到了 goroutine 中,导致只有 Add 操作,其实 Done操作是在 wg 的副本执行的,因此 Wait 就死锁了。

正确代码实例如下:

package main

import (
    "fmt"

    "sync"
)

func main() {
    wg := new(sync.WaitGroup)
    // wg := &sync.WaitGroup{}

    for i := 0; i < 5; i++ {
        wg.Add(1)
        go func(wg *sync.WaitGroup, i int) {
            fmt.Printf("i=>%d
", i)
            wg.Done()
        }(wg, i)
    }
    wg.Wait()
}

Go 语言runtime

runtime包提供Go语言运行时的系统交互的操作,例如控制goruntine的功能。

调度器不能保证多个 goroutine 执行次序,且进程退出时不会等待它们结束。

默认情况下,进程启动后仅允许一个系统线程服务于 goroutine。可使用环境变量或标准库函数 runtime.GOMAXPROCS 修改,让调度器用多个线程实现多核并行,而不仅仅是并发。

runtime包常用方法

const GOOS string = theGoos

GOOS是可执行程序的目标操作系统(将要在该操作系统的机器上执行):darwin、freebsd、linux等。

func Gosched()

Gosched使当前go程放弃处理器,以让其它go程运行。它不会挂起当前go程,因此当前go程未来会恢复执行。

func NumCPU() int

NumCPU返回本地机器的逻辑CPU个数。

func GOROOT() string

GOROOT返回Go的根目录。如果存在GOROOT环境变量,返回该变量的值;否则,返回创建Go时的根目录。

func GOMAXPROCS(n int) int

GOMAXPROCS设置可同时执行的最大CPU数,并返回先前的设置。 若 n < 1,它就不会更改当前设置。本地机器的逻辑CPU数可通过 NumCPU 查询。本函数在调度程序优化后会去掉。

func Goexit()

Goexit终止调用它的go程。其它go程不会受影响。Goexit会在终止该go程前执行所有defer的函数。

在程序的main go程调用本函数,会终结该go程,而不会让main返回。因为main函数没有返回,程序会继续执行其它的go程。如果所有其它go程都退出了,程序就会崩溃。

func NumGoroutine() int

NumGoroutine返回当前存在的Go程数。

runtime包应用

一、查看机器的逻辑CPU个数、Go的根目录、操作系统

package main

import "runtime"

func main() {
	println("cpu:", runtime.NumCPU())
	println("go:", runtime.GOROOT())
	println("操作系统:", runtime.GOOS)

}

D:goprogramgosrcday10
λ go run test.go
cpu: 4
go: D:gogo
操作系统: windows

二、GOMAXPROCS 设置golang运行的cpu核数

Golang 默认所有任务都运行在一个 cpu 核里,如果要在 goroutine 中使用多核,可以使用 runtime.GOMAXPROCS 函数修改,当参数小于 1 时使用默认值。

package main

import (
    "fmt"
    "runtime"
)

var (
    signal = false
)

func oldboy() {
    signal = true
}

func init() {
    runtime.GOMAXPROCS(1)
}

func main() {
    go oldboy()
    for {
        if signal {
            break
        }
    }
    fmt.Println("end")
}

上述代码单核执行如果for前面或者中间不延迟,主线程不会让出CPU,导致异步的线程无法执行,从而无法设置signal的值,从而出现死循环。

运行的cpu核数设置成2核

package main

import (
    "fmt"
    "runtime"
)

var (
    signal = false
)

func oldboy() {
    signal = true
}

func init() {
    runtime.GOMAXPROCS(2)
}

func main() {
    go oldboy()
    for {
        if signal {
            break
        }
    }
    fmt.Println("end")
}

运行结果:

end

三、Gosched 让当前的 goroutine 让出 CPU

这个函数的作用是让当前 goroutine 让出 CPU,当一个 goroutine 发生阻塞,Go 会自动地把与该 goroutine 处于同一系统线程的其他 goroutine 转移到另一个系统线程上去,以使这些 goroutine 不阻塞。当前的 goroutine 不会挂起,当前的 goroutine 程未来会恢复执行。

runtime.Gosched()用于让出CPU时间片。这就像跑接力赛,A跑了一会碰到代码runtime.Gosched()就把接力棒交给B了,A歇着了,B继续跑。

package main

import (
	"runtime"
	"sync"
)

func main() {
	wg := new(sync.WaitGroup)
	wg.Add(1)
	go func() {
		for i := 0; i < 6; i++ {
			println(i)
			runtime.Gosched()
		}
		defer wg.Done()
	}()
	for i := 0; i < 6; i++ {
		wg.Add(1)
		go func() {
			defer wg.Done()
			println("Hello.Golang!")
		}()
	}
	wg.Wait()
}

D:goprogramgosrcday10
λ go run test.go
0
1
2
3
4
5
Hello.Golang!
Hello.Golang!
Hello.Golang!
Hello.Golang!
Hello.Golang!
Hello.Golang!

四、Goexit 终止当前 goroutine 执行

调用 runtime.Goexit 将立即终止当前 goroutine 执行,调度器确保所有已注册 defer 延迟调用被执行。

package main

import (
    "fmt"
    "runtime"
    "sync"
)

func main() {
    wg := new(sync.WaitGroup)
    wg.Add(1)

    go func() {
        defer wg.Done()
        defer fmt.Println("A.defer")
        func() {
            defer fmt.Println("B.defer")
            runtime.Goexit() // 终止当前 goroutine
            fmt.Println("B") // 不会执行
        }()

        fmt.Println("A") // 不会执行
    }()

    wg.Wait()
}
B.defer
A.defer


以上是关于Go语言之Go语言并发的主要内容,如果未能解决你的问题,请参考以下文章

Go语言基础之并发

Go语言基础之并发

Go语言基础之并发

Go语言之并发

深度Go并发编程之Go语言概述

Go语言系列之并发编程