go语言管道(channel)
Posted 两片空白
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了go语言管道(channel)相关的知识,希望对你有一定的参考价值。
前言
channel式go语言协程中数据通信的双向通道。但是在实际应用中,为了代码的简单和易懂,一般使用的channel是单向的。
使用
1. channel的定义和收发数据
package channel
func main()
//var c chan int c的默认值位nil,一般不使用
c := make(chan int)
c <- 1 //发数据
n := <-c //收数据
但是上面收发数据的写法是错误的,因为一个协程往管道里发送数据,必须要有一个协程来接受数据,否则会造成死锁。(协程往管道发送数据和从管道接受数据为了防止数据紊乱,会加上锁)
注意:
协程从管道里接收数据时,当没有收到数据时,会一直等待。这样效率不是很高,使用select,可以实现同步的功能,后面有介绍。
正确写法:
package main
import (
"fmt"
"time"
)
func chanDemo()
c := make(chan int)
go func()
for
n := <-c //收数据
fmt.Println(n)
()
c <- 1 //发数据
c <- 2
//防止数据还没有在协程里打印,main函数退出
time.Sleep(time.Millisecond)
func main()
chanDemo()
2. channel作为参数
channel在go语言里作为一等公民,可以作为参数和返回值
package main
import (
"fmt"
"time"
)
func worker(c chan int)
for
n := <-c
fmt.Println(n)
func chanDemo()
var channels [10]chan int
for i := 0; i < 10; i++
channels[i] = make(chan int)
go worker(channels[i])
for i := 0; i < 10; i++
channels[i] <- i
//防止数据还没有在协程里打印,main函数退出
time.Sleep(time.Second)
func main()
chanDemo()
3. channel做返回值
package main
import (
"fmt"
"time"
)
func worker(id int, c chan int)
for
fmt.Printf("worker %d, reseived %d\\n", id, <-c)
//返回一个channel
func createWoker(id int) chan int
c := make(chan int)
go worker(id, c)
return c
func chanDemo()
var channels [10]chan int
for i := 0; i < 10; i++
channels[i] = createWoker(i)
for i := 0; i < 10; i++
channels[i] <- i
//防止数据还没有在协程里打印,main函数退出
time.Sleep(time.Second)
func main()
chanDemo()
但是在实际应用中,当项目很复杂时,createWoker返回的channel需要告诉调用方,返回的channel该如何使用,我们可以在返回的channel加上限制。如下:
//返回一个channel,只能被接收数据
func createWoker(id int) <-chan int
c := make(chan int)
go worker(id, c)
return c
//返回一个channel,只能发送数据
func createWoker(id int) chan<- int
c := make(chan int)
go worker(id, c)
return c
Channel缓存
上面说,当一个协程向channel发送数据时,必须要有一个协程来接收数据,否则会造成死锁。
package main
import (
"fmt"
"time"
)
func channel()
c := make(chan int)
c <- 1
func main()
channel()
但是我们可以建立一个channel缓存,先将数据放到缓存中,需要时再使用,即使,没有协程接收也不会造成死锁。
package main
import (
"fmt"
"time"
)
func bufferedChannel()
//channel缓存
c := make(chan int, 3)
c <- 1
c <- 2
c <- 3
func main()
bufferedChannel()
关闭channel,并不是销毁
当发送完数据,channel是可以被关闭的。
package main
import (
"fmt"
"time"
)
func worker(id int, c chan int)
for
fmt.Printf("worker %d, reseived %d\\n", id, <-c)
func closeChannel()
c := make(chan int)
go worker(0, c)
c <- 1
c <- 2
c <- 3
close(c)
time.Sleep(time.Millisecond)
func main()
bufferedChannel()
当发送方关闭channel后,接收方还是可以从channel接收到数据,接收到的数据是后面类型的默认值。
但是,这样是不合理的,接收方同样可以判断发送方是否关闭了channel
有两种方法可以判断发送方是否关闭了channel
方法一:利用判断。
package main
import (
"fmt"
"time"
)
func worker(id int, c chan int)
for
n, ok := <-c
//说明发送方关闭了channel
if !ok
break
fmt.Printf("worker %d, reseived %d\\n", id, n)
func closeChannel()
c := make(chan int)
go worker(0, c)
c <- 1
c <- 2
c <- 3
close(c)
time.Sleep(time.Millisecond)
func main()
closeChannel()
方法二:利用range,当发送方关闭了channel,range也会自动退出。
package main
import (
"fmt"
"time"
)
func worker(id int, c chan int)
for n := range c
fmt.Printf("worker %d, reseived %d\\n", id, n)
func closeChannel()
c := make(chan int)
go worker(0, c)
c <- 1
c <- 2
c <- 3
close(c)
time.Sleep(time.Millisecond)
func main()
closeChannel()
使用channel等待任务结束
上面在每一个函数中,我们都加了一段延时,来等待协程的结束。但是,这种做法很不好。
处理一般是,当协程任务结束,通知外层任务结束。一种思想,通过通信来共享内存,而不是通过共享内存来通信。延时就不需要了。
方法一:用一个channel来通知外层
package main
import (
"fmt"
)
type worker struct
in chan int
done chan bool
func doWorker(id int, in chan int, done chan bool)
for n := range in
fmt.Printf("worker %d, reseived %d\\n", id, n)
//说明任务结束
done <- true
//返回一个channel
func createWoker(id int) worker
w := worker
in: make(chan int),
done: make(chan bool),
go doWorker(id, w.in, w.done)
return w
func chanDemo()
var channels [10]worker
for i := 0; i < 10; i++
channels[i] = createWoker(i)
for i := 0; i < 10; i++
channels[i].in <- i
//等到任务结束
<-channels[i].done
func main()
chanDemo()
注意:
//等到任务结束 <-channels[i].done
channel发送和接收数据时阻塞式的,必须要有人接收和发送数据,不然会一直等待。
如果在doWorker中,不返回任务结束的消息,会造成死锁。
package main
import (
"fmt"
)
type worker struct
in chan int
done chan bool
func doWorker(id int, in chan int, done chan bool)
for n := range in
fmt.Printf("worker %d, reseived %d\\n", id, n)
// //说明任务结束
// done <- true
//返回一个channel
func createWoker(id int) worker
w := worker
in: make(chan int),
done: make(chan bool),
go doWorker(id, w.in, w.done)
return w
func chanDemo()
var channels [10]worker
for i := 0; i < 10; i++
channels[i] = createWoker(i)
for i := 0; i < 10; i++
channels[i].in <- i
//等到任务结束
<-channels[i].done
func main()
chanDemo()
但是,上面正确的情况打印是顺序打印的,没有体现并行的好处。这是因为发送数据时,是发一个数据,等一个数据。我们需要做到,将数据全部发出去,再进行全部的等待。
package main
import (
"fmt"
)
type worker struct
in chan int
done chan bool
func doWorker(id int, in chan int, done chan bool)
for n := range in
fmt.Printf("worker %d, reseived %d\\n", id, n)
// //说明任务结束
done <- true
//返回一个channel
func createWoker(id int) worker
w := worker
in: make(chan int),
done: make(chan bool),
go doWorker(id, w.in, w.done)
return w
func chanDemo()
var channels [10]worker
for i := 0; i < 10; i++
channels[i] = createWoker(i)
//发送一次
for i, worker := range channels
worker.in <- i
//发送第二次
for i, worker := range channels
worker.in <- i * 10
//等待任务结束
for _, worker := range channels
//发送了两次,接收两次
<-worker.done
<-worker.done
func main()
chanDemo()
但是上面会造成死锁,因为,发送第一次数据的时候,在协程中,往done中发送了数据,没有接收,一直在done<-true等待,第二次往in中再发送数据,没有接收,造成死锁。
解决:
1. 我们可以在在协程里再开一个协程,专门往done中发送数据,那么从in中接收数据的协程就不会阻塞了。
package main
import (
"fmt"
)
type worker struct
in chan int
done chan bool
func doWorker(id int, in chan int, done chan bool)
for n := range in
fmt.Printf("worker %d, reseived %d\\n", id, n)
// //说明任务结束
go func()
done <- true
()
//返回一个channel
func createWoker(id int) worker
w := worker
in: make(chan int),
done: make(chan bool),
go doWorker(id, w.in, w.done)
return w
func chanDemo()
var channels [10]worker
for i := 0; i < 10; i++
channels[i] = createWoker(i)
//发送一次
for i, worker := range channels
worker.in <- i
//发送第二次
for i, worker := range channels
worker.in <- i * 10
//等待任务结束
for _, worker := range channels
//发送了两次,接收两次
<-worker.done
<-worker.done
func main()
chanDemo()
解决2:使用sync官方库的waitgroup
package main
import (
"fmt"
"sync"
)
type worker struct
in chan int
wg *sync.WaitGroup
func doWorker(id int, in chan int, wg *sync.WaitGroup)
for n := range in
fmt.Printf("worker %d, reseived %d\\n", id, n)
//任务做完
wg.Done()
//返回一个channel
func createWoker(id int, wg *sync.WaitGroup) worker
w := worker
in: make(chan int),
wg: wg,
go doWorker(id, w.in, w.wg)
return w
func chanDemo()
var wg sync.WaitGroup
var channels [10]worker
for i := 0; i < 10; i++
channels[i] = createWoker(i, &wg)
//增加20个任务进行等待,总共有20个任务
wg.Add(20)
//发送一次
for i, worker := range channels
worker.in <- i
//发送第二次
for i, worker := range channels
worker.in <- i * 10
//等待任务结束
wg.Wait()
func main()
chanDemo()
以上是关于go语言管道(channel)的主要内容,如果未能解决你的问题,请参考以下文章