Go-并发模式2(Patterns)
Posted lady_killer9
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Go-并发模式2(Patterns)相关的知识,希望对你有一定的参考价值。
目录
上篇文章,讲到了goroutine和channel,进行了简单的通信,接下来看看有哪些模式。
模式1 Generator:返回channel的函数
package main
import (
"fmt"
"math/rand"
"time"
)
func main() {
c := boring("boring!") // Function returning a channel.
for i := 0; i < 5; i++ {
fmt.Printf("You say: %q\\n", <-c)
}
fmt.Println("You're boring; I'm leaving.")
}
func boring(msg string) <-chan string { // Returns receive-only channel of strings.
c := make(chan string)
go func() { // We launch the goroutine from inside the function.
for i := 0; ; i++ {
c <- fmt.Sprintf("%s %d", msg, i)
time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
}
}()
return c // Return the channel to the caller.
}
boring返回一个channel,不断往里写数据。main调用,并从channel中获取数据,结果如下:
通道做句柄
package main
import (
"fmt"
"math/rand"
"time"
)
func main() {
joe := boring("Joe")
ann := boring("Ann")
for i := 0; i < 5; i++ {
fmt.Println(<-joe)
fmt.Println(<-ann) // ann will block if joe is not ready to give a value
}
fmt.Println("You're both boring; I'm leaving.")
}
func boring(msg string) <-chan string { // Returns receive-only channel of strings.
c := make(chan string)
go func() { // We launch the goroutine from inside the function.
for i := 0; ; i++ {
c <- fmt.Sprintf("%s: %d", msg, i)
time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
}
}()
return c // Return the channel to the caller.
}
返回多个boring服务,channel做服务的句柄(也就是唯一标识)。返回channel时,通道没有数据会阻塞,按顺序来即可保证输出顺序。 结果如下:
模式2 扇入(fan in):多个channel并入一个
package main
import (
"fmt"
"math/rand"
"time"
)
func main() {
c := fanIn(boring("Joe"), boring("Ann"))
for i := 0; i < 10; i++ {
fmt.Println(<-c) // HL
}
fmt.Println("You're both boring; I'm leaving.")
}
func boring(msg string) <-chan string { // Returns receive-only channel of strings.
c := make(chan string)
go func() { // We launch the goroutine from inside the function.
for i := 0; ; i++ {
c <- fmt.Sprintf("%s: %d", msg, i)
time.Sleep(time.Duration(rand.Intn(2e3)) * time.Millisecond)
}
}()
return c // Return the channel to the caller.
}
func fanIn(input1, input2 <-chan string) <-chan string {
c := make(chan string)
go func() { for { c <- <-input1 } }()
go func() { for { c <- <-input2 } }()
return c
}
不考虑顺序时可以这样使用,类似开车时的汇入,“前方小心右侧车辆汇入”。
input1、input2和c的关系如下图所示:
阻塞与按序恢复
package main
import (
"fmt"
"math/rand"
"time"
)
type Message struct {
str string
wait chan bool
}
func main() {
c := fanIn(boring("Joe"), boring("Ann"))
for i := 0; i < 5; i++ {
msg1 := <-c; fmt.Println(msg1.str)
msg2 := <-c; fmt.Println(msg2.str)
msg1.wait <- true // block boring, false is also ok
msg2.wait <- true
}
fmt.Println("You're both boring; I'm leaving.")
}
func boring(msg string) <-chan Message { // Returns receive-only channel of strings.
c := make(chan Message)
waitForIt := make(chan bool) // Shared between all messages.
go func() { // We launch the goroutine from inside the function.
for i := 0; ; i++ {
c <- Message{ fmt.Sprintf("%s: %d", msg, i), waitForIt }
time.Sleep(time.Duration(rand.Intn(2e3)) * time.Millisecond)
<-waitForIt // to be blocked
}
}()
return c // Return the channel to the caller.
}
func fanIn(inputs ... <-chan Message) <-chan Message {
c := make(chan Message)
for i := range inputs {
input := inputs[i] // New instance of 'input' for each loop.
go func() { for { c <- <-input } }()
}
return c
}
乱序到达,通过通道形成加锁同步。通道作为锁,
<-waitForIt
会阻塞,直到有数据。
主函数运行,第一句话,内存中有一个channel(称为),存放Message类型,有,放入后阻塞。同样有和。
fanIn函数将两个channel的数据放到一个channel c,之后按序输出和去除阻塞(去除阻塞就是向对应的WaitForIt channel写一个数据,所以true还是false无所谓)。结果如下:
通道关系如下:
与Select结合
针对并发特有的控制结构。和switch很像,但每个case不是表达式而是通信,当有多个case可以时,将伪随机选择一个,所以不能依赖select来做顺序通信。
Rob给了例子,下方的select 10,篇幅原因就不展示了。
扇入(fan In)模式与Select的结合
package main
import (
"fmt"
"math/rand"
"time"
)
func main() {
c := fanIn(boring("Joe"), boring("Ann"))
for i := 0; i < 10; i++ {
fmt.Println(<-c)
}
fmt.Println("You're both boring1; I'm leaving.")
}
func boring(msg string) <-chan string { // Returns receive-only channel of strings.
c := make(chan string)
go func() { // We launch the goroutine from inside the function.
for i := 0; ; i++ {
c <- fmt.Sprintf("%s: %d", msg, i)
time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
}
}()
return c // Return the channel to the caller.
}
func fanIn(input1, input2 <-chan string) <-chan string {
c := make(chan string)
go func() {
for {
select {
case s := <-input1: c <- s
case s := <-input2: c <- s
}
}
}()
return c
}
fanIn不再使用多个goroutine了,而是使用一个goroutine,在其中有无限循环和select。
模式3:通信超时
单次超时
package main
import (
"fmt"
"math/rand"
"time"
)
func main() {
c := boring("Joe")
for {
select {
case s := <-c:
fmt.Println(s)
case <-time.After(1 * time.Second): // if you change it to more than 1.5 seconds it will not block, eg. 5
fmt.Println("You're too slow.") // it's time to stop last case
return
}
}
}
func boring(msg string) <-chan string { // Returns receive-only channel of strings.
c := make(chan string)
go func() { // We launch the goroutine from inside the function.
for i := 0; ; i++ {
c <- fmt.Sprintf("%s: %d", msg, i)
time.Sleep(time.Duration(rand.Intn(1500)) * time.Millisecond)
}
}()
return c // Return the channel to the caller.
}
有的时候boring服务可能是页面访问,获取资源等服务,我们并不清楚需要多长时间,但是我们有一个时间上限。这个时候可以使用库函数,time.After,到达等待时间后它会返回一个channel,这时我们可以退出程序。
总体超时
package main
import (
"fmt"
"math/rand"
"time"
)
func main() {
c := boring("Joe")
timeout := time.After(5 * time.Second)
for {
select {
case s := <-c:
fmt.Println(s)
case <-timeout:
fmt.Println("You talk too much.")
return
}
}
}
func boring(msg string) <-chan string { // Returns receive-only channel of strings.
c := make(chan string)
go func() { // We launch the goroutine from inside the function.
for i := 0; ; i++ {
c <- fmt.Sprintf("%s: %d", msg, i)
time.Sleep(time.Duration(rand.Intn(1500)) * time.Millisecond)
}
}()
return c // Return the channel to the caller.
}
我们只定义一个timeout,到达后就退出。
模式4:自定义退出
package main
import (
"fmt"
"math/rand"
"time"
)
func main() {
quit := make(chan bool)
c := boring("Joe", quit)
for i := rand.Intn(10); i >= 0; i-- { fmt.Println(<-c) }
// modify by lady_killer9
fmt.Println("You're boring!")
quit <- true
}
func boring(msg string, quit <-chan bool) <-chan string {
c := make(chan string)
go func() {
for i := 0; ; i++ {
time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
select {
case c <- fmt.Sprintf("%s: %d", msg, i):
// do nothing
case <-quit:
return
}
}
}()
return c
}
想退出时,在select的某个return的case对应的channel中写入数据
安全的退出
package main
import (
"fmt"
"math/rand"
"time"
)
func cleanup() {
// added by lady_killer9
fmt.Println("clean up ")
}
func main() {
quit := make(chan string)
c := boring("Joe", quit)
for i := rand.Intn(10); i >= 0; i-- { fmt.Println(<-c) }
quit <- "Bye!"
fmt.Printf("Joe says: %q\\n", <-quit)
}
func boring(msg string, quit chan string) <-chan string {
c := make(chan string)
go func() {
for i := 0; ; i++ {
time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
select {
case c <- fmt.Sprintf("%s: %d", msg, i):
// do nothing
case <-quit:
cleanup()
quit <- "See you!"
return
}
}
}()
return c
}
有的时候,我们收到要关闭的消息后,需要进行文件关闭等清理工作,之后再告诉主程序我们清理完毕,可以退出了,防止内存泄露,文件占用等情况。
goroutine的速度
package main
import (
"fmt"
"time"
)
func f(left, right chan int) {
left <- 1 + <-right
}
func main() {
const n = 10000
leftmost := make(chan int)
right := leftmost
left := leftmost
for i := 0; i < n; i++ {
right = make(chan int)
go f(left, right)
left = right
}
start := time.Now()
go func(c chan int) { c <- 1 }(right)
fmt.Println(<-leftmost)
fmt.Println(time.Since(start))
}
类似于传话筒游戏,我们不断的右耳朵进,左耳朵出。这里使用了10000个goroutine,结果如下
大概花了3ms多一点,就完成了10000个goroutine的通信,如果使用Python等其他语言是很难达到的,这就是goroutine,简单,高效。
接下来是一个模式使用的例子
Google Search
Google搜索是一个很好的例子,我们输入问题,然后Google发给多个后端程序进行搜索,可能是网页,图片,视频等,最后将结果进行一个汇总并返回。接下来进行一个仿造:
Google Search 1.0
package main
import (
"fmt"
"math/rand"
"time"
)
type Result string
func Google(query string) (results []Result) {
results = append(results, Web(query))
results = append(results, Image(query))
results = append(results, Video(query))
return
}
var (
Web = fakeSearch("web")
Image = fakeSearch("image")
Video = fakeSearch("video")
)
type Search func(query string) Result
func fakeSearch(kind string) Search {
return func(query string) Result {
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
return Result(fmt.Sprintf("%s result for %q\\n", kind, query))
}
}
func main() {
rand.Seed(time.Now().UnixNano())
start := time.Now()
results := Google("golang")
elapsed := time.Since(start)
fmt.Println(results)
fmt.Println(elapsed)
}
在Google时,只是将结果放入结果队列依次放入会等待上一个结果出来。
等待太浪费时间了,我们可以使用goroutine
Google Search 2.0
package main
import (
"fmt"
"math/rand"
"time"
)
type Result string
type Search func(query string) Result
var (
Web = fakeSearch("web")
Image = fakeSearch("image")
Video = fakeSearch("video")
)
func Google(query string) (results []Result) {
c := make(chan Result)
go func() { c <- Web(query) } ()
go func() { c <- Image(query) } ()
go func() { c <- Video(query) } ()
for i := 0; i < 3; i++ {
result := <-c
results = append(results, result)
}
return
}
func fakeSearch(kind string) Search {
return func(query string) Result {
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
return Result(fmt.Sprintf("%s result for %q\\n", kind, query))
}
}
func main() {
rand.Seed(time.Now().UnixNano())
start := time.Now()
results := Google("golang")
elapsed := time.Since(start)
fmt.Println(results)
fmt.Println(elapsed)
}
使用goroutine就不必等待上一个结果出来
Google Search 2.1
package main
import (
"fmt"
"math/rand"
"time"
)
type Result string
type Search func(query string) Result
var (
Web = fakeSearch("web")
Image = fakeSearch("image")
Video = fakeSearch("video")
)
func Google(query string) (results []Result) {
c := make(chan Result)
go func() { c <- Web(query) } ()
go func() { c <- Image(query) } ()
go func() { c <- Video(query) } ()
timeout := time.After(80 * time.Millisecond)
for i := 0; i < 3; i++ {
select {
case result := <-c:
results = append(results, result)
case <-timeout:
fmt.Println("timed out")
return
}
}
return
}
func fakeSearch(kind string) Search {
return func(query string) Result {
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
return Result(fmt.Sprintf("%s result for %q\\n", kind, query))
}
}
func main() {
rand.Seed(time.Now().UnixNano())
start := time.Now()
results := Google("golang")
elapsed := time.Since(start)
fmt.Println(results)
fmt.Println(elapsed)
}
在Google 2.0的fan In模式的基础上,增加了总体超时模式,超过时不再等待其他结果。
我们如何避免丢弃慢速服务器的结果呢?例如,上面的video被丢弃了
答:复制服务器。向多个副本发送请求,并使用第一个响应。
Google Search 3.0
package main
import (
"fmt"
"math/rand"
"time"
)
type Result string
type Search func(query string) Result
func First(query string, replicas ...Search) Result {
c := make(chan Result)
searchReplica := func(i int) { c <- replicas[i](query) }
for i := range replicas {
go searchReplica(i)
}
return <-c
}
func main() {
rand.Seed(time.Now().UnixNano())
start := time.Now()
result := First("golang",
fakeSearch("replica 1"),
fakeSearch("replica 2"))
elapsed := time.Since(start)
fmt.Println(result)
fmt.Println(elapsed)
}
func fakeSearch(kind string) Search {
return func(query string) Result {
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
return Result(fmt.Sprintf("%s result for %q\\n", kind, query))
}
}
对于同一个问题,我们启用多个副本,返回最快的服务器搜索到的结果
接下来,我们针对web、image、video都启用多个服务器进行搜索
package main
import (
"fmt"
"math/rand"
"time"
)
type Result string
type Search func(query string) Result
var (
Web1 = fakeSearch("web1")
Web2 = fakeSearch("web2")
Image1 = fakeSearch("image1")
Image2 = fakeSearch("image2")
Video1 = fakeSearch("video1")
Video2 = fakeSearch("video2")
)
func Google(query string) (results []Result) {
c := make(chan Result)
go func() { c <- First(query, Web1, Web2) } ()
go func() { c <- First(query, Image1, Image2) } ()
go func() { c <- First(query, Video1, Video2) } ()
timeout := time.After(80 * time.Millisecond)
for i := 0; i < 3; i++ {
select {
case result := <-c:
results = append(results, result)
case <-timeout:
fmt.Println("timed out")
return
}
}
return
}
func First(query string, replicas ...Search) Result {
c := make(chan Result)
searchReplica := func(i int) {
c <- replicas[i](query)
}
for i := range replicas {
go searchReplica(i)
}
return <-c
}
func fakeSearch(kind string) Search {
return func(query string) Result {
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
return Result(fmt.Sprintf("%s result for %q\\n", kind, query))
}
}
func main() {
rand.Seed(time.Now().UnixNano())
start := time.Now()
results := Google("golang")
elapsed := time.Since(start)
fmt.Println(results)
fmt.Println(elapsed)
}
通过多个副本,选择最快的一个的方式,基本可以保证每种类型的结果都能在超时时间内完成。
参考
Google IO 2012 Go Concurrency Patterns
更多Go相关内容:Go-Golang学习总结笔记
有问题请下方评论,转载请注明出处,并附有原文链接,谢谢!如有侵权,请及时联系。如果您感觉有所收获,自愿打赏,可选择支付宝18833895206(小于),您的支持是我不断更新的动力。
以上是关于Go-并发模式2(Patterns)的主要内容,如果未能解决你的问题,请参考以下文章
Go-并发模式总结(扇入模式,超时模式,callback模式等)