go语言并发同步 select

Posted 两片空白

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了go语言并发同步 select相关的知识,希望对你有一定的参考价值。

目录

前言

使用

前言

当协程通过多条管道来进行数据通信时,接收端当没有接收到数据,会一直等待。导致后面的管道即使有数据也收不到。

如下,当c1没有数据时,会一直再那等待,即使c2有数据也接收不到。

package main

func main() 
	var c1, c2 chan int
	n1 := <-c1
	n2 := <-c2


这样效率会很低,select可以实现当那个管道数据先来,可以先被接收到。

使用

 1. 简单使用

package main

import (
	"fmt"
	"math/rand"
	"time"
)

func worker(in chan int) 
	i := 0
	for 
		time.Sleep(time.Duration(rand.Intn(1500)) * time.Millisecond)
		in <- i
		i++
	


func createChan() chan int 
	c := make(chan int)
	go worker(c)
	return c


func main() 
	c1, c2 := createChan(), createChan()
	//不断从c1,c2中收数据,那个先收到,哪个先执行
	for 
		select 
		case n1 := <-c1:
			fmt.Println("receive c1:", n1)
		case n2 := <-c2:
			fmt.Println("receive c2:", n2)

		
	


 2. 又进行收,又进行发

package main

import (
	"fmt"
	"math/rand"
	"time"
)

//发送数据
func generator() chan int 
	c := make(chan int)
	go func() 
		i := 0
		for 
			time.Sleep(time.Duration(rand.Intn(1500)) * time.Millisecond)
			c <- i
			i++
		
	()
	return c


//接收数据
func worker(id int, c chan int) 
	for n := range c 
		fmt.Printf("worker %d, reseived %d\\n", id, n)
	


//返回一个channel
func createWoker(id int) chan<- int 
	c := make(chan int)
	go worker(id, c)
	return c


func main() 
	c1, c2 := generator(), generator()
	worker := createWoker(0)

	//不断从c1,c2中收数据,那个先收到,哪个先执行
	for 
		//var activeWorker chan<- int //初始值为nil
		var n int
		select 
		case n = <-c1:
		case n = <-c2:
		case worker <- n:
			
		
	


由于一开始c1和c2没有数据,就会导致一直执行worker <-n,n初始值为0,会一直打印0

 我们可以利用chan的初始值为nil,select对于nil是不会起作用的。

package main

import (
	"fmt"
	"math/rand"
	"time"
)

//发送数据
func generator() chan int 
	c := make(chan int)
	go func() 
		i := 0
		for 
			time.Sleep(time.Duration(rand.Intn(1500)) * time.Millisecond)
			c <- i
			i++
		
	()
	return c


//接收数据
func worker(id int, c chan int) 
	for n := range c 
		fmt.Printf("worker %d, reseived %d\\n", id, n)
	


//返回一个channel
func createWoker(id int) chan<- int 
	c := make(chan int)
	go worker(id, c)
	return c


func main() 
	c1, c2 := generator(), generator()
	worker := createWoker(0)

	hasValue := false
	var n int

	//不断从c1,c2中收数据,那个先收到,哪个先执行
	for 
		var activeWorker chan<- int //初始值为nil
		if hasValue 
			activeWorker = worker
		
		select 
		case n = <-c1:
			hasValue = true
		case n = <-c2:
			hasValue = true
		case activeWorker <- n:
			hasValue = false
		
	

 但是这样还是会有一个缺点,由于从c1,c2接收数据都复制给了n,当消耗数据比接收数据慢时,有的数据就会被跳过了。

再消耗数据加上延时,比产生数据慢。

//接收数据
func worker(id int, c chan int) 
	for n := range c 
		//消耗数据比接收数据慢
		time.Sleep(time.Second)
		fmt.Printf("worker %d, reseived %d\\n", id, n)
	

 将数据保存再一个切片中。

package main

import (
	"fmt"
	"math/rand"
	"time"
)

//发送数据
func generator() chan int 
	c := make(chan int)
	go func() 
		i := 0
		for 
			time.Sleep(time.Duration(rand.Intn(1500)) * time.Millisecond)
			c <- i
			i++
		
	()
	return c


//接收数据
func worker(id int, c chan int) 
	for n := range c 
		//消耗数据比接收数据慢
		time.Sleep(time.Second)
		fmt.Printf("worker %d, reseived %d\\n", id, n)
	


//返回一个channel
func createWoker(id int) chan<- int 
	c := make(chan int)
	go worker(id, c)
	return c


func main() 
	c1, c2 := generator(), generator()
	worker := createWoker(0)

	var values []int
	//不断从c1,c2中收数据,那个先收到,哪个先执行
	for 
		var actualValue int
		var activeWorker chan<- int //初始值为nil
		if len(values) > 0 
			activeWorker = worker
			actualValue = values[0]
		
		select 
		case n := <-c1:
			values = append(values, n)
		case n := <-c2:
			values = append(values, n)

		case activeWorker <- actualValue:
			values = values[1:]
		
	


 3. 一段时间后退出程序

        需求,程序运行10s后退出,如果在800ms没生成数据,会打印timeout。

使用到time.After,返回一个chan.Time的管道,到时间会发送数据

func main() 
	c1, c2 := generator(), generator()
	worker := createWoker(0)

	var values []int
	//总时间
	tm := time.After(10 * time.Second)
	//不断从c1,c2中收数据,那个先收到,哪个先执行
	for 
		var actualValue int
		var activeWorker chan<- int //初始值为nil
		if len(values) > 0 
			activeWorker = worker
			actualValue = values[0]
		
		select 
		case n := <-c1:
			values = append(values, n)
		case n := <-c2:
			values = append(values, n)

		case activeWorker <- actualValue:
			values = values[1:]
		case <-time.After(800 * time.Millisecond):
			fmt.Println("timeout")
		case <-tm:
			fmt.Println("bye")
			return
		
	

 需求,每隔一秒钟看一下切片的长度。

用到time.Tick

func main() 
	c1, c2 := generator(), generator()
	worker := createWoker(0)

	var values []int
	//总时间
	tm := time.After(10 * time.Second)
	// 即使时间
	tk := time.Tick(time.Second)
	//不断从c1,c2中收数据,那个先收到,哪个先执行
	for 
		var actualValue int
		var activeWorker chan<- int //初始值为nil
		if len(values) > 0 
			activeWorker = worker
			actualValue = values[0]
		
		select 
		case n := <-c1:
			values = append(values, n)
		case n := <-c2:
			values = append(values, n)

		case activeWorker <- actualValue:
			values = values[1:]
		case <-time.After(800 * time.Millisecond):
			fmt.Println("timeout")
		case <-tk:
			fmt.Println("slice len:", len(values))
		case <-tm:
			fmt.Println("bye")
			return
		
	

 注意:

总时间tm和计时时间tk,需要定义在for循环外面,不然每次都是新的,都会重新计时

超时时间需要定义在里面,需要每次重新计时。

以上是关于go语言并发同步 select的主要内容,如果未能解决你的问题,请参考以下文章

2021-GO语言并发编程

2021-GO语言并发编程

Go语言常见的并发模式

Go语言学习之旅--并发编程

Go语言学习之旅--并发编程

Go语言学习之旅--并发编程