golang 管道并发模式

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了golang 管道并发模式相关的知识,希望对你有一定的参考价值。

package main

import (
	"fmt"
	"sync"
)

func main() {
	done := make(chan struct{})
	defer close(done)
	numbers := gen(2, 4, 6)
	c1 := sq(done, numbers)
	c2 := sq(done, numbers)
	c3 := sq(done, numbers)
	out := merge(done, c1, c2, c3)

	fmt.Println(<-out)
}

func gen(numbers ...int) <-chan int {
	out := make(chan int, len(numbers))
	for _, n := range numbers {
		out <- n
	}
	close(out)

	return out
}

func sq(done <-chan struct{}, in <-chan int) <-chan int {
	out := make(chan int)
	go func() {
		defer close(out)
		for n := range in {
			select {
			case out <- n * n:
			case <-done:
			}
		}
	}()

	return out
}

func merge(done <-chan struct{}, in ...<-chan int) <-chan int {
	var wg sync.WaitGroup
	out := make(chan int)
	output := func(c <-chan int) {
		defer wg.Done()
		for n := range c {
			select {
			case out <- n:
				fmt.Println("Adding ", n)
			case <-done:
				fmt.Println("Exiting from ", n)
				return
			}
		}
	}

	wg.Add(len(in))
	for _, c := range in {
		go output(c)
	}

	go func() {
		wg.Wait()
		close(out)
	}()

	return out
}

Golang 管道

Golang 引用类型 channel 是 CSP 模式的具体实现,用于多个 goroutine 通讯。其内部实现了同步,确保并发安全。

channel概念

    a. 类似unix中管道(pipe
   b. 先进先出
   c. 线程安全,多个goroutine同时访问,不需要加锁
   d. channel是有类型的,一个整数的channel只能存放整数

channel声明
var 变量名 chan 类型

package main
var ch0 chan int
var ch1 chan string
var ch2 chan map[string]string
type stu struct{}
var ch3 chan stu
var ch4 chan *stu
func main() {
}

channel初始化

使用make进行初始化,比如:

package main
import (
   "fmt"
)
var ch0 chan int = make(chan int)
var ch1 chan int = make(chan int, 10)
func main() {
   var ch2 chan string
   ch2 = make(chan string)
   var ch3 chan string
   ch3 = make(chan string, 1)
   ch4 := make(chan float32)
   ch5 := make(chan float64, 2)
   fmt.Printf("无缓冲 全局变量 chan ch0 : %v\n", ch0)
   fmt.Printf("有缓冲 全局变量 chan ch1 : %v\n", ch1)
   fmt.Printf("无缓冲 局部变量 chan ch2 : %v\n", ch2)
   fmt.Printf("有缓冲 局部变量 chan ch3 : %v\n", ch3)
   fmt.Printf("无缓冲 局部变量 chan ch4 : %v\n", ch4)
   fmt.Printf("有缓冲 局部变量 chan ch5 : %v\n", ch5)
}

输出结果:

无缓冲 全局变量 chan ch0 : 0xc420070060
有缓冲 全局变量 chan ch1 : 0xc42001c0b0
无缓冲 局部变量 chan ch2 : 0xc4200700c0
有缓冲 局部变量 chan ch3 : 0xc420054060
无缓冲 局部变量 chan ch4 : 0xc420070120
有缓冲 局部变量 chan ch5 : 0xc420050070

无缓冲的与有缓冲channel有着重大差别,那就是一个是同步的 一个是非同步的。

比如

c1:=make(chan int) 无缓冲

c2:=make(chan int,1) 有缓冲

c1<-1

无缓冲: 不仅仅是向 c1 通道放 1,而是一直要等有别的携程 <-c1 接手了这个参数,那么c1<-1才会继续下去,要不然就一直阻塞着。

有缓冲: c2<-1 则不会阻塞,因为缓冲大小是1(其实是缓冲大小为0),只有当放第二个值的时候,第一个还没被人拿走,这时候才会阻塞。

缓冲区是内部属性,并非类型构成要素。

var a, b chan int = make(chan int), make(chan int, 3)

channel基本操作

不同类型channel写入、读取

package main
import (
   "fmt"
)
type Stu struct {
   name string
}
func main() {
   //int类型
   var intChan chan int
   intChan = make(chan int, 10)
   intChan <- 10
   a := <-intChan
   fmt.Printf("int 类型 chan : %v\n", a)
   //map类型
   var mapChan chan map[string]string
   mapChan = make(chan map[string]string, 10)
   m := make(map[string]string, 16)
   m["stu01"] = "001"
   m["stu02"] = "002"
   m["stu03"] = "003"
   mapChan <- m
   b := <-mapChan
   fmt.Printf("map 类型 chan : %v\n", b)
   //结构体
   var stuChan chan Stu
   stuChan = make(chan Stu, 10)
   stu := Stu{
       name: "Murphy",
   }
   stuChan <- stu
   tempStu := <-stuChan
   fmt.Printf("struct 类型 chan : %v\n", tempStu)
   //结构体内存地址值
   var stuChanId chan *Stu
   stuChanId = make(chan *Stu, 10)
   stuId := &Stu{
       name: "Murphy",
   }
   stuChanId <- stuId
   tempStuId := <-stuChanId
   fmt.Printf("*struct 类型 chan : %v\n", tempStuId)
   fmt.Printf("*struct 类型 chan 取值 : %v\n", *(tempStuId))
   //接口
   var StuInterChain chan interface{}
   StuInterChain = make(chan interface{}, 10)
   stuInit := Stu{
       name: "Murphy",
   }
   //存
   StuInterChain <- &stuInit
   //取
   mFetchStu := <-StuInterChain
   fmt.Printf("interface 类型 chan : %v\n", mFetchStu)
   //转
   var mStuConvert *Stu
   mStuConvert, ok := mFetchStu.(*Stu)
   if !ok {
       fmt.Println("cannot convert")
       return
   }
   fmt.Printf("interface chan转 *struct chan : %v\n", mStuConvert)
   fmt.Printf("interface chan转 *struct chan 取值 : %v\n", *(mStuConvert))
}

输出结果:

int 类型 chan : 10
map 类型 chan : map[stu02:002 stu03:003 stu01:001]
struct 类型 chan : {Murphy}
*struct 类型 chan : &{Murphy}
*struct 类型 chan 取值 : {Murphy}
interface 类型 chan : &{Murphy}
interface chan转 *struct chan : &{Murphy}
interface chan转 *struct chan 取值 : {Murphy}

channel 写入、读取、遍历、关闭:

package main
import (
   "fmt"
)
func main() {
   ch := make(chan int, 11)
   //写入chan
   ch <- 99
   for i := 0; i < 10; i++ {
       ch <- i
   }
   fmt.Printf("writed chan ch : %v\n", ch)
   //读取chan
   first_chan, ok := <-ch
   if ok {
       fmt.Printf("first chan is %v\n", first_chan)
   }
   ch <- 10
   //遍历chan
   for value := range ch {
       fmt.Println(value)
       if value == 10 {
           // 关闭chan
           close(ch)
           //break // 在这里break循环也可以
       }
   }
   fmt.Println("after range or close ch!")
}

输出结果:

first chan is 99
0
1
2
3
4
5
6
7
8
9
10
after range or close ch!

channel关闭
channel关闭后,就不能取出数据了

  1. 使用内置函数close进行关闭,chan关闭之后,for range遍历chan中已经存在的元素后结束

  2. 使用内置函数close进行关闭,chan关闭之后,没有使用for range的写法需要使用,v, ok := <- ch进行判断chan是否关闭

package main
import "fmt"
func main() {
   var ch chan int
   ch = make(chan int, 5)
   for i := 0; i < 5; i++ {
       ch <- i
   }
   close(ch)
   for {
       var b int
       b, ok := <-ch
       if ok == false {
           fmt.Println("chan is close")
           break
       }
       fmt.Println(b)
   }
}

输出结果:

0
1
2
3
4
chan is close

如果将close(ch)注释掉,意思是不关闭管道,那么会出现dead lock死锁 
因为存入管道5个数字,然后无限取数据,会出现死锁。

package main
import "fmt"
func main() {
   var ch chan int
   ch = make(chan int, 5)
   for i := 0; i < 5; i++ {
       ch <- i
   }
   // close(ch)
   for {
       var b int
       b, ok := <-ch
       if ok == false {
           fmt.Println("chan is close")
           break
       }
       fmt.Println(b)
   }
}

输出结果:

0
1
2
3
4
fatal error: all goroutines are asleep - deadlock!
goroutine 1 [chan receive]:
main.main()
   /Users/***/Desktop/go/src/main.go:16 +0xfb
exit status 2

range 遍历 chan

package main
import "fmt"
func main() {
   var ch chan int
   ch = make(chan int, 10)
   for i := 0; i < 10; i++ {
       ch <- i
   }
   close(ch)
   for v := range ch {
       fmt.Println(v)
   }
}

输出结果:

0
1
2
3
4
5
6
7
8
9

同样如果将close(ch)注释掉,意思是不关闭管道,那么会出现dead lock死锁 
因为存入管道10个数字,然后无限取数据,在取出来第10个数据,在次range管道,会dead lock。

package main
import "fmt"
func main() {
   var ch chan int
   ch = make(chan int, 10)
   for i := 0; i < 10; i++ {
       ch <- i
   }
   // close(ch)
   for v := range ch {
       fmt.Println(v)
   }
}

输出结果:

0
1
2
3
4
5
6
7
8
9
fatal error: all goroutines are asleep - deadlock!
goroutine 1 [chan receive]:
main.main()
   /Users/***/Desktop/go/src/main.go:14 +0x106
exit status 2

除用 range 外,还可用 ok-idiom 模式判断 channel 是否关闭。

package main
import "fmt"
func main() {
   var ch chan int
   ch = make(chan int, 10)
   for i := 0; i < 10; i++ {
       ch <- i
   }
   close(ch)
   for {
       if d, ok := <-ch; ok {
           fmt.Println(d)
       } else {
           break
       }
   }
}

输出结果:

0
1
2
3
4
5
6
7
8
9

向 closed channel 发送数据引发 panic 错误,接收立即返回零值。而 nil channel, 无论收发都会被阻塞。

package main
func main() {
   ch := make(chan int, 1)
   close(ch)
   ch <- 2
}

输出结果:

panic: send on closed channel
goroutine 1 [running]:
main.main()
   /Users/***/Desktop/go/src/main.go:6 +0x63
exit status 2

内置函数 len 返回未被读取的缓冲元素数量,cap 返回缓冲区大小。

package main
import "fmt"
func main() {
   ch1 := make(chan int)
   ch2 := make(chan int, 3)
   ch2 <- 1
   fmt.Println(len(ch1), cap(ch1))
   fmt.Println(len(ch2), cap(ch2))
}

输出结果:

0 0
1 3

对chan进行select操作
select 语句类似于 switch 语句,但是select会随机执行一个可运行的case。如果没有case可运行,它将阻塞,直到有case可运行。

package main
import (
   "fmt"
)
func main() {
   ch1 := make(chan int, 1)
   ch1 <- 1
   ch2 := make(chan int, 1)
   ch2 <- 2
   select {
   case k1 := <-ch1:
       fmt.Println(k1)
   case k2 := <-ch2:
       fmt.Println(k2)
   default:
       fmt.Println("chan")
   }
}

输出结果:

//结果1,2随机

chan的只读和只写

a. 只读chan的声明

var 变量名 <-chan 类型

package main
var ch0 <-chan int
var ch1 <-chan string
var ch2 <-chan map[string]string
type stu struct{}
var ch3 <-chan stu
var ch4 <-chan *stu
func main() {
}

b. 只写chan的声明

var 变量名 chan<- 类型

package main
var ch0 chan<- int
var ch1 chan<- string
var ch2 chan<- map[string]string
type stu struct{}
var ch3 chan<- stu
var ch4 chan<- *stu
func main() {
}

channel单向 :可以将 channel 隐式转换为单向队列,只收或只发。

package main
import (
   "fmt"
)
func main() {
   c := make(chan int, 3)
   var send chan<- int = c // send-only
   var recv <-chan int = c // receive-only
   send <- 1
   // <-send               // Error: receive from send-only type chan<- int
   val, ok := <-recv
   if ok {
       fmt.Println(val)
   }
   // recv <- 2           // Error: send to receive-only type <-chan int
}

输出结果:

1

不能将单向 channel 转换为普通 channel。

package main
func main() {
   c := make(chan int, 3)
   var send chan<- int = c // send-only
   var recv <-chan int = c // receive-only
   ch1 := (chan int)(send)
   // Error: cannot convert type chan<- int to type chan int
   ch2 := (chan int)(recv)
   // Error: cannot convert type <-chan int to type chan int
}

输出结果:

./main.go:8:19: cannot convert send (type chan<- int) to type chan int
./main.go:9:19: cannot convert recv (type <-chan int) to type chan int

channel 是第一类对象,可传参 (内部实现为指针) 或者作为结构成员。

package main
import "fmt"
type Request struct {
   data []int
   ret  chan int
}
func NewRequest(data ...int) *Request {
   return &Request{data, make(chan int, 1)}
}
func Process(req *Request) {
   x := 0
   for _, i := range req.data {
       x += i
   }
   req.ret <- x
}
func main() {
   req := NewRequest(10, 20, 30)
   Process(req)
   fmt.Println(<-req.ret)
}

输出结果:

60


以上是关于golang 管道并发模式的主要内容,如果未能解决你的问题,请参考以下文章

Golang并发(Go程管道)

(四十二)golang--管道

golang管道

Golang入门到项目实战 golang并发变成之通道channel

[转]Golang号称高并发,但高并发时性能不高

golang总结-并发