go语言管道(channel)

Posted 两片空白

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了go语言管道(channel)相关的知识,希望对你有一定的参考价值。

前言

        channel式go语言协程中数据通信的双向通道。但是在实际应用中,为了代码的简单和易懂,一般使用的channel是单向的。

使用

1. channel的定义和收发数据

package channel


func main()
	//var c chan int   c的默认值位nil,一般不使用 
	c := make(chan int)
	c <- 1 //发数据
	n := <-c //收数据 

但是上面收发数据的写法是错误的,因为一个协程往管道里发送数据,必须要有一个协程来接受数据,否则会造成死锁。(协程往管道发送数据和从管道接受数据为了防止数据紊乱,会加上锁)

注意:

        协程从管道里接收数据时,当没有收到数据时,会一直等待。这样效率不是很高,使用select,可以实现同步的功能,后面有介绍。

正确写法:

package main

import (
	"fmt"
	"time"
)

func chanDemo() 
	c := make(chan int)
	go func() 
		for 
			n := <-c //收数据
			fmt.Println(n)
		
	()
	c <- 1 //发数据
	c <- 2

	//防止数据还没有在协程里打印,main函数退出
	time.Sleep(time.Millisecond)


func main() 
	chanDemo()

2. channel作为参数

        channel在go语言里作为一等公民,可以作为参数和返回值

package main

import (
	"fmt"
	"time"
)

func worker(c chan int) 
	for 
		n := <-c
		fmt.Println(n)
	


func chanDemo() 
	var channels [10]chan int
	for i := 0; i < 10; i++ 
		channels[i] = make(chan int)
		go worker(channels[i])
	

	for i := 0; i < 10; i++ 
		channels[i] <- i
	

	//防止数据还没有在协程里打印,main函数退出
	time.Sleep(time.Second)


func main() 
	chanDemo()

3. channel做返回值

package main

import (
	"fmt"
	"time"
)

func worker(id int, c chan int) 
	for 
		fmt.Printf("worker %d, reseived %d\\n", id, <-c)
	


//返回一个channel
func createWoker(id int) chan int 
	c := make(chan int)
	go worker(id, c)
	return c


func chanDemo() 
	var channels [10]chan int
	for i := 0; i < 10; i++ 
		channels[i] = createWoker(i)
	

	for i := 0; i < 10; i++ 
		channels[i] <- i
	

	//防止数据还没有在协程里打印,main函数退出
	time.Sleep(time.Second)


func main() 
	chanDemo()

但是在实际应用中,当项目很复杂时,createWoker返回的channel需要告诉调用方,返回的channel该如何使用,我们可以在返回的channel加上限制。如下:

//返回一个channel,只能被接收数据
func createWoker(id int) <-chan int 
	c := make(chan int)
	go worker(id, c)
	return c


//返回一个channel,只能发送数据
func createWoker(id int) chan<- int 
	c := make(chan int)
	go worker(id, c)
	return c

Channel缓存

        上面说,当一个协程向channel发送数据时,必须要有一个协程来接收数据,否则会造成死锁。

package main

import (
	"fmt"
	"time"
)

func channel() 
	c := make(chan int)
	c <- 1


func main() 
	channel()

         但是我们可以建立一个channel缓存,先将数据放到缓存中,需要时再使用,即使,没有协程接收也不会造成死锁。

package main

import (
	"fmt"
	"time"
)

func bufferedChannel() 
	//channel缓存
	c := make(chan int, 3)

	c <- 1
	c <- 2
	c <- 3


func main() 
	bufferedChannel()

 关闭channel,并不是销毁

        当发送完数据,channel是可以被关闭的。

package main

import (
	"fmt"
	"time"
)

func worker(id int, c chan int) 
	for 
		fmt.Printf("worker %d, reseived %d\\n", id, <-c)
	


func closeChannel() 
	c := make(chan int)
	go worker(0, c)
	c <- 1
	c <- 2
	c <- 3
	close(c)

	time.Sleep(time.Millisecond)


func main() 
	bufferedChannel()

        当发送方关闭channel后,接收方还是可以从channel接收到数据,接收到的数据是后面类型的默认值。

         但是,这样是不合理的,接收方同样可以判断发送方是否关闭了channel

        有两种方法可以判断发送方是否关闭了channel

方法一:利用判断。

package main

import (
	"fmt"
	"time"
)

func worker(id int, c chan int) 
	for 
		n, ok := <-c
		//说明发送方关闭了channel
		if !ok 
			break
		
		fmt.Printf("worker %d, reseived %d\\n", id, n)
	


func closeChannel() 
	c := make(chan int)
	go worker(0, c)
	c <- 1
	c <- 2
	c <- 3
	close(c)

	time.Sleep(time.Millisecond)


func main() 
	closeChannel()

 方法二:利用range,当发送方关闭了channel,range也会自动退出。

package main

import (
	"fmt"
	"time"
)

func worker(id int, c chan int) 
	for n := range c 
		fmt.Printf("worker %d, reseived %d\\n", id, n)
	


func closeChannel() 
	c := make(chan int)
	go worker(0, c)
	c <- 1
	c <- 2
	c <- 3
	close(c)

	time.Sleep(time.Millisecond)


func main() 
	closeChannel()

使用channel等待任务结束

        上面在每一个函数中,我们都加了一段延时,来等待协程的结束。但是,这种做法很不好。

        处理一般是,当协程任务结束,通知外层任务结束。一种思想,通过通信来共享内存,而不是通过共享内存来通信。延时就不需要了。

方法一:用一个channel来通知外层

package main

import (
	"fmt"
)

type worker struct 
	in   chan int
	done chan bool


func doWorker(id int, in chan int, done chan bool) 
	for n := range in 
		fmt.Printf("worker %d, reseived %d\\n", id, n)
		//说明任务结束
		done <- true
	


//返回一个channel
func createWoker(id int) worker 
	w := worker
		in:   make(chan int),
		done: make(chan bool),
	
	go doWorker(id, w.in, w.done)
	return w


func chanDemo() 
	var channels [10]worker
	for i := 0; i < 10; i++ 
		channels[i] = createWoker(i)
	

	for i := 0; i < 10; i++ 
		channels[i].in <- i
		//等到任务结束
		<-channels[i].done
	



func main() 
	chanDemo()

注意:

//等到任务结束
		<-channels[i].done

channel发送和接收数据时阻塞式的,必须要有人接收和发送数据,不然会一直等待。

如果在doWorker中,不返回任务结束的消息,会造成死锁。

package main

import (
	"fmt"
)

type worker struct 
	in   chan int
	done chan bool


func doWorker(id int, in chan int, done chan bool) 
	for n := range in 
		fmt.Printf("worker %d, reseived %d\\n", id, n)
		// //说明任务结束
		// done <- true
	


//返回一个channel
func createWoker(id int) worker 
	w := worker
		in:   make(chan int),
		done: make(chan bool),
	
	go doWorker(id, w.in, w.done)
	return w


func chanDemo() 
	var channels [10]worker
	for i := 0; i < 10; i++ 
		channels[i] = createWoker(i)
	

	for i := 0; i < 10; i++ 
		channels[i].in <- i
		//等到任务结束
		<-channels[i].done
	



func main() 
	chanDemo()

 但是,上面正确的情况打印是顺序打印的,没有体现并行的好处。这是因为发送数据时,是发一个数据,等一个数据。我们需要做到,将数据全部发出去,再进行全部的等待。

 

package main

import (
	"fmt"
)

type worker struct 
	in   chan int
	done chan bool


func doWorker(id int, in chan int, done chan bool) 
	for n := range in 
		fmt.Printf("worker %d, reseived %d\\n", id, n)
		// //说明任务结束
		done <- true
	


//返回一个channel
func createWoker(id int) worker 
	w := worker
		in:   make(chan int),
		done: make(chan bool),
	
	go doWorker(id, w.in, w.done)
	return w


func chanDemo() 
	var channels [10]worker
	for i := 0; i < 10; i++ 
		channels[i] = createWoker(i)
	

	//发送一次
	for i, worker := range channels 
		worker.in <- i
	
	//发送第二次
	for i, worker := range channels 
		worker.in <- i * 10
	

	//等待任务结束
	for _, worker := range channels 
		//发送了两次,接收两次
		<-worker.done
		<-worker.done
	



func main() 
	chanDemo()

但是上面会造成死锁,因为,发送第一次数据的时候,在协程中,往done中发送了数据,没有接收,一直在done<-true等待,第二次往in中再发送数据,没有接收,造成死锁。

 解决:

1. 我们可以在在协程里再开一个协程,专门往done中发送数据,那么从in中接收数据的协程就不会阻塞了。

package main

import (
	"fmt"
)

type worker struct 
	in   chan int
	done chan bool


func doWorker(id int, in chan int, done chan bool) 
	for n := range in 
		fmt.Printf("worker %d, reseived %d\\n", id, n)
		// //说明任务结束
		go func() 
			done <- true
		()
	


//返回一个channel
func createWoker(id int) worker 
	w := worker
		in:   make(chan int),
		done: make(chan bool),
	
	go doWorker(id, w.in, w.done)
	return w


func chanDemo() 
	var channels [10]worker
	for i := 0; i < 10; i++ 
		channels[i] = createWoker(i)
	

	//发送一次
	for i, worker := range channels 
		worker.in <- i
	
	//发送第二次
	for i, worker := range channels 
		worker.in <- i * 10
	

	//等待任务结束
	for _, worker := range channels 
		//发送了两次,接收两次
		<-worker.done
		<-worker.done
	



func main() 
	chanDemo()

 解决2:使用sync官方库的waitgroup

package main

import (
	"fmt"
	"sync"
)

type worker struct 
	in chan int
	wg *sync.WaitGroup


func doWorker(id int, in chan int, wg *sync.WaitGroup) 
	for n := range in 
		fmt.Printf("worker %d, reseived %d\\n", id, n)

		//任务做完
		wg.Done()
	


//返回一个channel
func createWoker(id int, wg *sync.WaitGroup) worker 
	w := worker
		in: make(chan int),
		wg: wg,
	
	go doWorker(id, w.in, w.wg)
	return w


func chanDemo() 
	var wg sync.WaitGroup

	var channels [10]worker
	for i := 0; i < 10; i++ 
		channels[i] = createWoker(i, &wg)
	

	//增加20个任务进行等待,总共有20个任务
	wg.Add(20)
	//发送一次
	for i, worker := range channels 
		worker.in <- i
	
	//发送第二次
	for i, worker := range channels 
		worker.in <- i * 10
	

	//等待任务结束
	wg.Wait()


func main() 
	chanDemo()

 

以上是关于go语言管道(channel)的主要内容,如果未能解决你的问题,请参考以下文章

7.2 什么是Go语言中的管道Channel

7.2 什么是Go语言中的管道Channel

go语言管道(channel)

go语言管道(channel)

Golang channel源码分析

Nil Channels Always Block(Go语言中空管道总是阻塞)