Go 语言 Channel

Posted 极客江南

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Go 语言 Channel相关的知识,希望对你有一定的参考价值。

多线程同步问题

  • 互斥锁
    • 互斥锁的本质是当一个goroutine访问的时候, 其它goroutine都不能访问
    • 这样就能实现资源同步, 但是在避免资源竞争的同时也降低了程序的并发性能. 程序由原来的并发执行变成了串行
  • 案例:
    • 有一个打印函数, 用于逐个打印字符串中的字符, 有两个人都开启了goroutine去打印
    • 如果没有添加互斥锁, 那么两个人都有机会输出自己的内容
    • 如果添加了互斥锁, 那么会先输出某一个的, 输出完毕之后再输出另外一个人的
package main
import (
	"fmt"
	"sync"
	"time"
)
// 创建一把互斥锁
var lock sync.Mutex

func printer(str string)  {
	// 让先来的人拿到锁, 把当前函数锁住, 其它人都无法执行
	// 上厕所关门
	lock.Lock()
	for _, v := range str{
		fmt.Printf("%c", v)
		time.Sleep(time.Millisecond * 500)
	}
	// 先来的人执行完毕之后, 把锁释放掉, 让其它人可以继续使用当前函数
	// 上厕所开门
	lock.Unlock()
}
func person1()  {
	printer("hello")
}
func person2()  {
	printer("world")
}
func main() {
	go person1()
	go person2()
	for{
		;
	}
}

生产者消费者问题

  • 所谓的生产者消费者模型就是
    • 某个模块(函数)负责生产数据, 这些数据由另一个模块来负责处理
    • 一般生产者消费者模型包含三个部分"生产者"、“缓冲区”、“消费者”
  • 为什么生产者消费者模型要含三个部分? 直接生产和消费不行么?
  • 一个案例说明一切
    • 生产者好比现实生活中的某个人
    • 缓冲区好比现实生活中的邮箱
    • 消费者好比现实生活中的邮递员
  • 如果只有生产者和消费者, 那么相当于只有写信的人和邮递员, 那么如果将来过去的邮递员离职了, 你想邮寄信件必须想办法结识新的邮递员(消费者发生变化, 会直接影响生产者, 耦合性太强)
  • 如果在生产者和消费者之间添加一个缓冲区, 那么就好比有了邮箱, 以后邮寄信件不是找邮递员, 只需把信件投递到邮箱中即可, 写信的人不需要关心邮递员是谁(解耦)
  • 如果只有生产者和消费者, 那么每个人邮寄信件都需要直接找邮递员(1对1关系), 如果有10个人要邮寄信件, 那么邮递员只能依次找到每个人, 然后才能取件(效率低下)
  • 如果在生产者和消费者之间添加一个缓冲区, 那么所有的人只需要将信件投递到邮箱即可, 邮递员不用关心有多少人要邮寄信件, 也不用依次取件, 只需要找到邮箱从邮箱中统一取件即可(效率提高)
  • 如果只有生产者和消费者, 那么如果邮寄信件太多邮递员无法一次拿走, 这个时候非常难办
  • 如果在生产者和消费者之间添加一个缓冲区, 那么如果信件太多可以先拿走一部分, 剩下的继续放到邮箱中下次再拿
  • ... ...

生产者和消费者资源竞争问题

  • 例如生产比较慢, 而消费比较快, 就会导致消费者消费到错误数据
package main

import (
	"fmt"
	"math/rand"
	"sync"
	"time"
)
// 创建一把互斥锁
var lock = sync.Mutex{}

// 定义缓冲区
var sce []int = make([]int, 10)

// 定义生产者
func producer(){
	// 加锁, 注意是lock就是我们的锁, 全局公用一把锁
	lock.Lock()
	rand.Seed(time.Now().UnixNano())
	for i:=0;i<10;i++{
		num := rand.Intn(100)
		sce[i] = num
		fmt.Println("生产者生产了: ", num)
		time.Sleep(time.Millisecond * 500)
	}
	// 解锁
	lock.Unlock()
}
// 定义消费者
func consumer()  {
	// 加锁, 注意和生产者中用的是同一把锁
	// 如果生产者中已加过了, 则阻塞直到解锁后再重新加锁
	lock.Lock()
	for i:=0;i<10;i++{
		num := sce[i]
		fmt.Println("---消费者消费了", num)
	}
	lock.Unlock()
}

func main() {
	go producer()
	go consumer()
	for{
		;
	}
}
  • 思考: 那如果是一对多, 或者多对多的关系, 上述代码有问题么?

管道(Channel)

  • 上述实现并发的代码中为了保持主线程不挂掉, 我们都会在最后写上一个死循环或者写上一个定时器来实现等待goroutine执行完毕
  • 上述实现并发的代码中为了解决生产者消费者资源同步问题, 我们利用加锁来解决, 但是这仅仅是一对一的情况, 如果是一对多或者多对多, 上述代码还是会出现问题
  • 综上所述, 企业开发中需要一种更牛X的技术来解决上述问题, 那就是管道(Channel)

  • Channel的本质是一个队列
  • Channel是线程安全的, 也就是自带锁定功能
  • Channel声明和初始化
    • 声明: var 变量名chan 数据类型
    • 初始化: mych := make(chan 数据类型, 容量)
    • Channel和切片还有字典一样, 必须make之后才能使用
    • Channel和切片还有字典一样, 是引用类型
package main
import "fmt"
func main() {
	// 1.声明一个管道
	var mych chan int
	// 2.初始化一个管道
	mych = make(chan int, 3)
	// 3.查看管道的长度和容量
	fmt.Println("长度是", len(mych), "容量是", cap(mych))
	// 4.像管道中写入数据
	mych<- 666
	fmt.Println("长度是", len(mych), "容量是", cap(mych))
	// 5.取出管道中写入的数据
	num := <-mych
	fmt.Println("num = ", num)
	fmt.Println("长度是", len(mych), "容量是", cap(mych))
}

  • 注意点:
    • 管道中只能存放声明的数据类型, 不能存放其它数据类型
    • 管道中如果已经没有数据, 再取就会报错
    • 如果管道中数据已满, 再写入就会报错
package main

import "fmt"

func main() {
	// 1.声明一个管道
	var mych chan int
	// 2.初始化一个管道
	mych = make(chan int, 3)

	// 注意点: 管道中只能存放声明的数据类型, 不能存放其它数据类型
	//mych<-3.14

	// 注意点: 管道中如果已经没有数据, 
	// 并且检测不到有其它协程再往管道中写入数据, 那么再取就会报错
	//num = <-mych
	//fmt.Println("num = ", num)

	// 注意点: 如果管道中数据已满, 再写入就会报错
	mych<- 666
	mych<- 777
	mych<- 888
	mych<- 999
}

  • 管道的关闭和遍历
package main

import "fmt"

func main() {
	// 1.创建一个管道
	mych := make(chan int, 3)
	// 2.往管道中存入数据
	mych<-666
	mych<-777
	mych<-888
	// 3.遍历管道
	// 第一次遍历i等于0, len = 3,
	// 第二次遍历i等于1, len = 2
	// 第三次遍历i等于2, len = 1
	//for i:=0; i<len(mych); i++{
	//	fmt.Println(<-mych) // 输出结果不正确
	//}

	// 3.写入完数据之后先关闭管道
	// 注意点: 管道关闭之后只能读不能写
	close(mych)
	//mych<- 999 // 报错

	// 4.遍历管道
	// 利用for range遍历, 必须先关闭管道, 否则会报错
	//for value := range mych{
	//	fmt.Println(value)
	//}

	// close主要用途:
	// 在企业开发中我们可能不确定管道有还没有有数据, 所以我们可能一直获取
	// 但是我们可以通过ok-idiom模式判断管道是否关闭, 如果关闭会返回false给ok
	for{
		if num, ok:= <-mych; ok{
			fmt.Println(num)
		}else{
			break;
		}
	}
	fmt.Println("数据读取完毕")
}

  • Channel阻塞现象
    • 单独在主线程中操作管道, 写满了会报错, 没有数据去获取也会报错
    • 只要在协程中操作管道过, 写满了就会阻塞, 没有就数据去获取也会阻塞
package main
import (
	"fmt"
	"time"
)
// 创建一个管道
var myCh = make(chan int, 5)
func demo()  {
	var myCh = make(chan int, 5)
	//myCh<-111
	//myCh<-222
	//myCh<-333
	//myCh<-444
	//myCh<-555
	//fmt.Println("我是第六次添加之前代码")
	//myCh<-666
	//fmt.Println("我是第六次添加之后代码")

	fmt.Println("我是第六次直接获取之前代码")
	<-myCh
	fmt.Println("我是第六次直接获取之后代码")
}
func test()  {
	//myCh<-111
	//myCh<-222
	//myCh<-333
	//myCh<-444
	//myCh<-555
	//fmt.Println("我是第六次添加之前代码")
	//myCh<-666
	//fmt.Println("我是第六次添加之后代码")

	//fmt.Println("我是第六次直接获取之前代码")
	//<-myCh
	//fmt.Println("我是第六次直接获取之后代码")
}
func example()  {
	time.Sleep(time.Second * 2)
	myCh<-666
}
func main() {
	// 1.同一个go程中操作管道
	// 写满了会报错
	//myCh<-111
	//myCh<-222
	//myCh<-333
	//myCh<-444
	//myCh<-555
	//myCh<-666

	// 没有了去取也会报错
	//<-myCh

	// 2.在协程中操作管道
	// 写满了不会报错, 但是会阻塞
	//go test()

	// 没有了去取也不会报错, 也会阻塞
	//go test()

	//go demo()
	//go demo()
	
	// 3.只要在协程中操作了管道, 就会发生阻塞现象
	go example()
	fmt.Println("myCh之前代码")
	<-myCh
	fmt.Println("myCh之后代码")

	//for{
	//	;
	//}
}

  • 利用Channel实现生产者消费者
package main

import (
	"fmt"
	"math/rand"
	"time"
)
// 定义缓冲区
var myCh = make(chan int, 5)
var exitCh = make(chan bool, 1)

// 定义生产者
func producer(){
	rand.Seed(time.Now().UnixNano())
	for i:=0;i<10;i++{
		num := rand.Intn(100)
		fmt.Println("生产者生产了: ", num)
		// 往管道中写入数据
		myCh<-num
		//time.Sleep(time.Millisecond * 500)
	}
	// 生产完毕之后关闭管道
	close(myCh)
	fmt.Println("生产者停止生产")
}
// 定义消费者
func consumer()  {
	// 不断从管道中获取数据, 直到管道关闭位置
	for{
		if num, ok := <-myCh; !ok{
			break
		}else{
			fmt.Println("---消费者消费了", num)
		}
	}
	fmt.Println("消费者停止消费")
	exitCh<-true
}

func main() {
	go producer()
	go consumer()
	fmt.Println("exitCh之前代码")
	<-exitCh
	fmt.Println("exitCh之后代码") 
}

  • 无缓冲Channel
package main
import "fmt"
var myCh1 = make(chan int, 5)
var myCh2 = make(chan int, 0)
func main() {
	// 有缓冲管道
	// 只写入, 不读取不会报错
	//myCh1<-1
	//myCh1<-2
	//myCh1<-3
	//myCh1<-4
	//myCh1<-5
	//fmt.Println("len =",len(myCh1), "cap =", cap(myCh1))

	// 无缓冲管道
	// 只有两端同时准备好才不会报错
	go func() {
		fmt.Println(<-myCh2)
	}()
	// 只写入, 不读取会报错
	myCh2<-1
	//fmt.Println("len =",len(myCh2), "cap =", cap(myCh2))
	// 写入之后在同一个线程读取也会报错
	//fmt.Println(<-myCh2)
	// 在主程中先写入, 在子程中后读取也会报错
	//go func() {
	//	fmt.Println(<-myCh2)
	//}()
}

  • 无缓冲Channel和有缓冲Channel
    • 有缓冲管道具备异步的能力(写几个读一个或读几个)
    • 无缓冲管道具备同步的能力(写一个读一个)
package main
import (
	"fmt"
	"math/rand"
	"time"
)
// 定义缓冲区
//var myCh = make(chan int, 0)
var myCh = make(chan int)
var exitCh = make(chan bool, 1)

// 定义生产者
func producer(){
	rand.Seed(time.Now().UnixNano())
	for i:=0;i<10;i++{
		num := rand.Intn(100)
		fmt.Println("生产者生产了: ", num)
		// 往管道中写入数据
		myCh<-num
		//time.Sleep(time.Millisecond * 500)
	}
	// 生产完毕之后关闭管道
	close(myCh)
	fmt.Println("生产者停止生产")
}
// 定义消费者
func consumer()  {
	// 不断从管道中获取数据, 直到管道关闭位置
	for{
		if num, ok := <-myCh; !ok{
			break
		}else{
			fmt.Println("---消费者消费了", num)
		}
	}
	fmt.Println("消费者停止消费")
	exitCh<-true
}

func main() {
	go producer()
	go consumer()
	fmt.Println("exitCh之前代码")
	<-exitCh
	fmt.Println("exitCh之后代码")
}

IO的延迟说明:
看到的输出结果和我们想象的不太一样, 是因为IO输出非常消耗性能, 输出之后还没来得及赋值可能就跑去执行别的协程了


  • 单向管道和双向管道
    • 默认情况下所有管道都是双向了(可读可写)
    • 但是在企业开发中, 我们经常需要用到将一个管道作为参数传递
    • 在传递的过程中希望对方只能单向使用, 要么只能写,要么只能读
  • 双向管道
    • var myCh chan int = make(chan int, 0)
  • 单向管道
    • var myCh chan<- int = make(chan<- int, 0)
    • var myCh <-chan int = make(<-chan int, 0)
  • 注意点:
    • 双向管道可以自动转换为任意一种单向管道
    • 单向管道不能转换为双向管道
package main

import "fmt"

func main() {
	// 1.定义一个双向管道
	var myCh chan int = make(chan int, 5)

	// 2.将双向管道转换单向管道
	var myCh2 chan<- int
	myCh2 = myCh
	fmt.Println(myCh2)
	var myCh3 <-chan int
	myCh3 = myCh
	fmt.Println(myCh3)

	// 3.双向管道,可读可写
	myCh<-1
	myCh<-2
	myCh<-3
	fmt.Println(<-myCh)
	
	// 3.只写管道,只能写, 不能读
	//	myCh2<-666
	//	fmt.Println(<-myCh2)

	// 4.指读管道, 只能读,不能写
	fmt.Println(<-myCh3)
	//myCh3<-666
	
	// 注意点: 管道之间赋值是地址传递, 以上三个管道底层指向相同容器
}
  • 单向管道作为函数参数
package main
import (
	"fmt"
	"math/rand"
	"time"
)
// 定义生产者
func producer(myCh chan<- int){
	rand.Seed(time.Now().UnixNano())
	for i:=0;i<10;i++{
		num := rand.Intn(100)
		fmt.Println("生产者生产了: ", num)
		// 往管道中写入数据
		myCh<-num
		//time.Sleep(time.Millisecond * 500)
	}
	// 生产完毕之后关闭管道
	close(myCh)
	fmt.Println("生产者停止生产")
}
// 定义消费者
func consumer(myCh <-chan int)  {
	// 不断从管道中获取数据, 直到管道关闭位置
	for{
		if num, ok := <-myCh; !ok{
			break
		}else{
			fmt.Println("---消费者消费了", num)
		}
	}
	fmt.Println("消费者停止消费")

}

func main() {
	// 定义缓冲区
	var myCh = make(chan int, 5)
	go producer(myCh)
	consumer(myCh)
}

select选择结构

  • select是Go中的一个控制结构,类似于switch语句,用于处理异步IO操作
    • 如果有多个case都可以运行,select会随机选出一个执行,其他不会执行。
    • 如果没有可运行的case语句,且有default语句,那么就会执行default的动作。
    • 如果没有可运行的case语句,且没有default语句,select将阻塞,直到某个case通信可以运行
	select {
	case IO操作1:
		IO操作1读取或写入成功就执行
	case IO操作2:
		IO操作2读取或写入成功就执行
	default:
		如果上面case都没有成功,则进入default处理流程
	}
  • 注意点:
    • select的case后面必须是一个IO操作
    • 一般情况下使用select结构不用写default
package main

import (
	"fmt"
	"time"
)
func main() {
	// 创建管道
	var myCh = make(chan int)
	var exitCh = make(chan bool)

	// 生产数据
	go func() {
		for i:=0;i <10;i++{
			myCh<-i
			time.Sleep(time.Second)
		}
		//close(myCh)
		exitCh<-true
	}()

	// 读取数据
	for{
		fmt.Println("读取代码被执行了")
		select {
		case num:= <-myCh:
			fmt.Println("读到了", num)
		case <-exitCh:
			//break // 没用, 跳出的是select
			return
		}
		fmt.Println("-----------")
	}
}
  • select应用场景
    • 实现多路监听
    • 实现超时处理
package main
import (
	"fmt"
	"runtime"
	"time"
)

func main() {
	// 1.创建管道
	myCh := make(chan int, 5)
	exitCh := make(chan bool)

	// 2.生成数据
	go func() {
		for i:=0; i<10; i++ {
			myCh<-i
			time.Sleep(time.Second * 3)
		}
	}()

	// 3.获取数据
	go func() {
		for{
			select {
			case num:= <-myCh:
				fmt.Println(num)
			case <-time.After(time.Second * 2):
				exitCh<-true
				runtime.Goexit()
			}
		}
	}()

	<-exitCh
	fmt.Println("程序结束")
}

定时器补充

  • 一次性定时器
  • NewTimer函数
    • func NewTimer(d Duration) *Timer
    • NewTimer创建一个Timer,它会在到期后向Timer自身的C字段发送当时的时间
type Timer struct {
	C <-chan Time // 对于我们来说, 这个属性是只读的管道
	r runtimeTimer
}
package main
import (
	"fmt"
	"time"
)
func main() {
	start := time.Now()
	fmt.Println("开始时间", start)
	timer := time.NewTimer(time.Second * 3)
	fmt.Println("读取之前代码被执行")
	end := <-timer.C // 系统写入数据之前会阻塞
	fmt.Println("读取之后代码被执行")
	fmt.Println("结束时间", end)
}
  • After函数
    • func After(d Duration) <-chan Time
    • 底层就是对NewTimer的封装, 只不过返回值不同而已
func After(d Duration) <-chan Time {
	return NewTimer(d).C
}
package main
import (
	"fmt"
	"time"
)
func main() {
	start := time.Now()
	fmt.Println("开始时间", start)
	timer := time.After(time.Second * 3)
	fmt.Println("读取之前代码被执行")
	end := <-timer // 系统写入数据之前会阻塞
	fmt.Println("读取之后代码被执行")
	fmt.Println("结束时间", end)
}

  • 周期性定时器
  • NewTicker函数
    • func NewTicker(d Duration) *Ticker
    • 和NewTimer差不多, 只不过NewTimer只会往管道中写入一次数据, 而NewTicker每隔一段时间就会写一次
type Ticker struct {
    C <-chan Time // 周期性传递时间信息的通道
    // 内含隐藏或非导出字段
}
package main
import (
	"fmt"
	"time"
)
func main() {
	// 1.创建一个周期定时器
	ticker := time.NewTicker(time.Second)
	// 2.不断从重启定时器中获取时间
	for{
		t := <-ticker.C // 系统写入数据之前会阻塞
		fmt.Println(t)
	}
}

以上是关于Go 语言 Channel的主要内容,如果未能解决你的问题,请参考以下文章

go语言管道(channel)

Go语言编程:使用条件变量Cond和channel通道实现多个生产者和消费者模型

Go语言入门goroutine和channel

一文玩转go语言中的channel

Go语言学习——channel的死锁其实没那么复杂

[日常] Go语言圣经--Channel习题