并发Goroute
Posted 关灯吃面
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了并发Goroute相关的知识,希望对你有一定的参考价值。
Goroutine(轻量级的线程,开线程没有数量限制)
1.进程和线程
A.进程是程序在操作系统中的一次执行过程,系统进行资源分配和调度的一个独立单位。
B.线程是进程的一个执行实体,是CPU调度和分派的基本单位,它是比进程更小的能独立运行的基本单位。
C.一个进程可以创建和撤销多个线程;同一个进程中的多个线程之间可以并发执行。
2、进程和线程
ngix是多进程的单线程程序
3.并发和并行
A.多线程程序在一个核的cpu上运行,就是并发。go多线程的切换都是在用户态操作的,不像其他语言先切换到内核态,完成线程切换,然后返回用户态继续执行程序。
B.多线程程序在多个核的cpu上运行,就是并行
4.协程和线程
协程:独立的栈空间,共享堆空间,调度由用户自己控制,本质上有点类似于用户级线程,这些用户级线程的调度也是自己实现的
线程:一个线程上可以跑多个协程,协程是轻量量级的线程。一个线程可以跑多个Goroutine。
5.goroutine调度模型
M是物理线程,P是资源,G是Goroutine
如果有IO操作时,会新起一个线程等待IO操作的Goroutine
6.如何设置golang运行的cpu核数
1.5之前go需要手动设置程序执行的内核数,1.5之后go自动设置
package main
import (
"fmt"
"runtime"
)
func main() {
num := runtime.NumCPU() //查看有几个内核
fmt.Printf("cpu num:%d\\n", num)
runtime.GOMAXPROCS(1) //设置有程序用几个内核执行
}
7、不同goroutine之间进行通讯
a.全局变量
package main import ( "fmt" "time" ) var exits [3]bool func calc(index int) { for i := 0; i < 1000; i++ { time.Sleep(time.Millisecond) //休息1毫秒 } exits[index] = true } func main() { start := time.Now().UnixNano() go calc(0) //起一个goroutine,不用go就是串行 go calc(1) go calc(2) for { //保证主线程不挂断,子线程才能执行 if exits[0] && exits[1] && exits[2] { break } time.Sleep(time.Millisecond) } end := time.Now().UnixNano() fmt.Printf("finished, cost:%d ms\\n", (end-start)/1000/1000) }
b.锁同步
package main import ( "fmt" "sync" "time" ) func calc(index int, waitGroup *sync.WaitGroup) { for i := 0; i < 1000; i++ { time.Sleep(time.Millisecond) } waitGroup.Done() //释放一把锁 } func main() { var waitGroup sync.WaitGroup //等待一组Goroutine执行完毕 start := time.Now().UnixNano() for i := 0; i < 3; i++ { waitGroup.Add(1) //添加一把锁 go calc(i, &waitGroup) } waitGroup.Wait() end := time.Now().UnixNano() fmt.Printf("finished, cost:%d ms\\n", (end-start)/1000/1000) }
c.Channel(oroutine和channel相结合)
package main import ( "fmt" "time" ) func () { var intChan chan int = make(chan int, 1) //如果是0就是没有缓冲区的管道,必须有程序来取才能放进去 go func() { intChan <- 10 }() result := <-intChan fmt.Printf("result:%d\\n", result) }
8.goroutine中使用recover
应用场景,如果某个goroutine panic了,而且这个goroutine 里面没有捕获(recover),那么整个进程就会挂掉。所以,好的习惯是每当go产生一个goroutine,就需要写下recover。
package main import ( "fmt" "time" ) func calc() { defer func() { //defer必须放置在最前面,才能捕获后面所有的panic,程序退出时执行defer err := recover() //捕获goroutine错误 if err != nil { fmt.Println(err) } }() var p *int *p = 100 } func main() { go calc() time.Sleep(time.Second * 3) fmt.Println("progress exited") }
Channel
1、channel概念
- 类似unix中管道(pipe)
- 先进先出
- 线程安全,多个goroutine同时访问,不需要加锁
- channel是有类型的, 一个整数的channel只能存放整数
2、 channel声明
var 变量量名 chan 类型
var test chan int
var test chan string
var test chan map[string]string
var test chan stu
3、channel初始化
使用make进行初始化,比如:
var test chan int
test = make(chan int, 10)
var test chan string
test = make(chan string, 10)
4、channel基本操作
1. 从channel读取数据:
var testChan chan int
testChan = make(chan int, 10)
var a int
a = <- testChan
2. 从channel写 入数据:
var testChan chan int
testChan = make(chan int, 10)
var a int = 10
testChan <- a
5.带缓冲区的channel
1.如下所示,testChan只能放 一个元素:
var testChan chan int
testChan = make(chan int)
var a int
a = <- testChan
2.如下所示,testChan是带缓冲区的chan, 一次可以放10个元素:
var testChan chan int
testChan = make(chan int, 10)
var a int = 10
testChan <- a
package main import ( "fmt" "time" ) func test() { var intChan chan int = make(chan int, 1) //如果是0就是没有缓冲区的管道,必须有程序来取才能放进去 go func() { intChan <- 10 }() result := <-intChan fmt.Printf("result:%d\\n", result) } func testNoBufChan() { var intChan chan int = make(chan int, 2) go func() { fmt.Printf("begin input to chan\\n") intChan <- 10 intChan <- 10 fmt.Printf("end input to chan\\n") }() //result := <- intChan //fmt.Printf("result:%d\\n", result) time.Sleep(10 * time.Second) } func main() { test() testNoBufChan() }
package main import ( "fmt" "sync" ) var waitGroup sync.WaitGroup func produce(ch chan<- string) { //生产者 ch <- "hello1" ch <- "hello2" ch <- "hello3" close(ch) //关闭管道 waitGroup.Done() } func consume(ch <-chan string) { //消费者 for { str, ok := <-ch //判断管道是否关闭 if !ok { fmt.Printf("ch is closed") break } fmt.Printf("value:%s\\n", str) } waitGroup.Done() } func main() { //生产者消费者模型 var ch chan string = make(chan string) waitGroup.Add(2) go produce(ch) go consume(ch) waitGroup.Wait() }
6. channel阻塞
package main import ( "fmt" "time" ) func sendData(ch chan string) { var i int for { var str string str = fmt.Sprintf("stu %d", i) fmt.Println("write:", str) ch <- str i++ } } func main() { ch := make(chan string) go sendData(ch) time.Sleep(100 * time.Second) }
7.chan之间的同步
package main import ( "fmt" "time" ) func sendData(ch chan string) { ch <- "Washington" ch <- "Tripoli" ch <- "London" ch <- "Beijing" ch <- "Tokio" } func getData(ch chan string) { var input string for { input = <-ch fmt.Println(input) } } func main() { ch := make(chan string) go sendData(ch) go getData(ch) time.Sleep(100 * time.Second) }
8.for range遍历chan
package main import ( "fmt" "time" ) func sendData(ch chan string) { ch <- "Washington" ch <- "Tripoli" ch <- "London" ch <- "Beijing" ch <- "Tokio" } func getData(ch chan string) { for input := range ch { fmt.Println(input) } } func main() { ch := make(chan string) go sendData(ch) go getData(ch) time.Sleep(100 * time.Second) }
9.chan的关闭
1.使用内置函数close进行关闭,chan关闭之后,for range遍历chan中已经存在的元素后结束
2.使用内置函数close进行关闭,chan关闭之后,没有使用for range的写法需要使用,v, ok := <- ch进行判断chan是否关闭
10.chan的只读和只写
a.只读chan的声明
Var 变量量的名字 <-chan int
Var readChan <- chan int
b.只写chan的声明
Var 变量量的名字 chan<- int
Var writeChan chan<- int
11. 对chan进行select操作
Select {
case u := <- ch1:
case e := <- ch2:
default:
}
定时器
1、定时器的使用
package main import ( "fmt" "time" ) func main() { t := time.NewTicker(time.Second) for v := range t.C { fmt.Println("hello, ", v) } }
2、一次定时器
package main import ( "fmt" "time" ) func main() { select { Case <- time.After(time.Second): fmt.Println(“after”) } }
3、超时控制
package main import ( "fmt" "time" ) func queryDb(ch chan int) { time.Sleep(time.Second) ch <- 100 } func main() { ch := make(chan int) go queryDb(ch) t := time.NewTicker(time.Second) select { case v := <-ch: fmt.Println("result", v) case <-t.C: fmt.Println("timeout") } }
信号处理
package main import ( "fmt" "os" "os/signal" "sync" "syscall" ) var waitGroup sync.WaitGroup func produce(ch chan<- string, exitChan chan bool) { var i int var exit bool for { str := fmt.Sprintf("hello %d", i) select { //select检测哪个管道可写或者可读 case ch <- str: case exit = <-exitChan: } if exit { fmt.Printf("user notify produce exited\\n") break } } close(ch) waitGroup.Done() } func consume(ch <-chan string) { for { str, ok := <-ch if !ok { fmt.Printf("ch is closed") break } fmt.Printf("value:%s\\n", str) } waitGroup.Done() } func main() { // 在shell终端输入 kill -SIGUSR2 ID 给程序输入终止信号 var ch chan string = make(chan string) var exitChan chan bool = make(chan bool, 1) var sinalChan chan os.Signal = make(chan os.Signal, 1) waitGroup.Add(2) signal.Notify(sinalChan, syscall.SIGUSR2) go produce(ch, exitChan) go consume(ch) <-sinalChan //读取然丢弃 exitChan <- true waitGroup.Wait() }
单元测试
1.文件名必须以_test.go结尾
2.使用go test -v执行单元测试
calc.go
package test
//对函数进行单元测试
func add(a int, b int) int {
return a - b
}
func sub(a int, b int) int {
return a - b
}
calc_test.go
package test import ( "testing" ) func TestAdd(t *testing.T) { //TestAdd必须大写的Test开头 result := add(2, 8) //测试add函数 if result != 10 { t.Fatalf("add is not right") //致命错误记录 return } t.Logf("add is right") //记录一些常规信息 } func TestSub(t *testing.T) { result := sub(2, 8) if result != -6 { t.Fatalf("add is not right") return } t.Logf("add is right") }
以上是关于并发Goroute的主要内容,如果未能解决你的问题,请参考以下文章