Go-并发模式2(Patterns)

Posted lady_killer9

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Go-并发模式2(Patterns)相关的知识,希望对你有一定的参考价值。

目录

模式1 Generator:返回channel的函数

 通道做句柄

 模式2 扇入(fan in):多个channel并入一个

阻塞与按序恢复

 与Select结合

扇入(fan In)模式与Select的结合

模式3:通信超时

单次超时

总体超时

模式4:自定义退出

 安全的退出

goroutine的速度

Google Search

Google Search 1.0

Google Search 2.0

Google Search 2.1

Google Search 3.0

参考


上篇文章,讲到了goroutine和channel,进行了简单的通信,接下来看看有哪些模式。

模式1 Generator:返回channel的函数

package main

import (
	"fmt"
	"math/rand"
	"time"
)

func main() {
	c := boring("boring!") // Function returning a channel.
	for i := 0; i < 5; i++ {
		fmt.Printf("You say: %q\\n", <-c)
	}
	fmt.Println("You're boring; I'm leaving.")
}

func boring(msg string) <-chan string { // Returns receive-only channel of strings. 
	c := make(chan string)
	go func() { // We launch the goroutine from inside the function.
		for i := 0; ; i++ {
			c <- fmt.Sprintf("%s %d", msg, i)
			time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
		}
	}()
	return c // Return the channel to the caller.
}

 boring返回一个channel,不断往里写数据。main调用,并从channel中获取数据,结果如下:

 通道做句柄

package main

import (
	"fmt"
	"math/rand"
	"time"
)

func main() {
	joe := boring("Joe")
	ann := boring("Ann")
	for i := 0; i < 5; i++ {
		fmt.Println(<-joe)
		fmt.Println(<-ann) // ann will block if joe is not ready to give a value
	}
	fmt.Println("You're both boring; I'm leaving.")
}

func boring(msg string) <-chan string { // Returns receive-only channel of strings.
	c := make(chan string)
	go func() { // We launch the goroutine from inside the function.
		for i := 0; ; i++ {
			c <- fmt.Sprintf("%s: %d", msg, i)
			time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
		}
	}()
	return c // Return the channel to the caller.
}

返回多个boring服务,channel做服务的句柄(也就是唯一标识)。返回channel时,通道没有数据会阻塞,按顺序来即可保证输出顺序。 结果如下:

 模式2 扇入(fan in):多个channel并入一个

package main

import (
	"fmt"
	"math/rand"
	"time"
)

func main() {
	c := fanIn(boring("Joe"), boring("Ann"))
	for i := 0; i < 10; i++ {
		fmt.Println(<-c) // HL
	}
	fmt.Println("You're both boring; I'm leaving.")
}

func boring(msg string) <-chan string { // Returns receive-only channel of strings.
	c := make(chan string)
	go func() { // We launch the goroutine from inside the function.
		for i := 0; ; i++ {
			c <- fmt.Sprintf("%s: %d", msg, i)
			time.Sleep(time.Duration(rand.Intn(2e3)) * time.Millisecond)
		}
	}()
	return c // Return the channel to the caller.
}


func fanIn(input1, input2 <-chan string) <-chan string {
	c := make(chan string)
	go func() { for { c <- <-input1 } }()
	go func() { for { c <- <-input2 } }()
	return c
}

不考虑顺序时可以这样使用,类似开车时的汇入,“前方小心右侧车辆汇入”。

input1、input2和c的关系如下图所示: 

来源参考PPT

阻塞与按序恢复

package main

import (
	"fmt"
	"math/rand"
	"time"
)

type Message struct {
	str string
	wait chan bool
}

func main() {
	c := fanIn(boring("Joe"), boring("Ann"))

	for i := 0; i < 5; i++ {
		msg1 := <-c; fmt.Println(msg1.str)
		msg2 := <-c; fmt.Println(msg2.str)
		msg1.wait <- true // block boring, false is also ok
		msg2.wait <- true
	}

	fmt.Println("You're both boring; I'm leaving.")
}

func boring(msg string) <-chan Message { // Returns receive-only channel of strings.
	c := make(chan Message)

	waitForIt := make(chan bool) // Shared between all messages.

	go func() { // We launch the goroutine from inside the function.
		for i := 0; ; i++ {

			c <- Message{ fmt.Sprintf("%s: %d", msg, i), waitForIt }
			time.Sleep(time.Duration(rand.Intn(2e3)) * time.Millisecond)
			<-waitForIt // to be blocked
		}
	}()
	return c // Return the channel to the caller.
}

func fanIn(inputs ... <-chan Message) <-chan Message {
	c := make(chan Message)
	for i := range inputs {
		input := inputs[i] // New instance of 'input' for each loop.
		go func() { for { c <- <-input } }()
	}
	return c
}

乱序到达,通过通道形成加锁同步。通道作为锁,

<-waitForIt

会阻塞,直到有数据。

主函数运行,第一句话,内存中有一个channel(称为),存放Message类型,有,放入后阻塞。同样有

fanIn函数将两个channel的数据放到一个channel c,之后按序输出和去除阻塞(去除阻塞就是向对应的WaitForIt channel写一个数据,所以true还是false无所谓)。结果如下:

通道关系如下:

 与Select结合

针对并发特有的控制结构。和switch很像,但每个case不是表达式而是通信,当有多个case可以时,将伪随机选择一个,所以不能依赖select来做顺序通信。

Rob给了例子,下方的select 10,篇幅原因就不展示了。

扇入(fan In)模式与Select的结合

package main

import (
	"fmt"
	"math/rand"
	"time"
)

func main() {
	c := fanIn(boring("Joe"), boring("Ann"))
	for i := 0; i < 10; i++ {
		fmt.Println(<-c)
	}
	fmt.Println("You're both boring1; I'm leaving.")
}

func boring(msg string) <-chan string { // Returns receive-only channel of strings.
	c := make(chan string)
	go func() { // We launch the goroutine from inside the function.
		for i := 0; ; i++ {
			c <- fmt.Sprintf("%s: %d", msg, i)
			time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
		}
	}()
	return c // Return the channel to the caller.
}

func fanIn(input1, input2 <-chan string) <-chan string {
	c := make(chan string)
	go func() {
		for {
			select {
			case s := <-input1:  c <- s
			case s := <-input2:  c <- s
			}
		}
	}()
	return c
}

fanIn不再使用多个goroutine了,而是使用一个goroutine,在其中有无限循环和select。 

模式3:通信超时

单次超时

package main

import (
	"fmt"
	"math/rand"
	"time"
)

func main() {
	c := boring("Joe")
	for {
		select {
		case s := <-c:
			fmt.Println(s)
		case <-time.After(1 * time.Second): // if you change it to more than 1.5 seconds it will not block, eg. 5
			fmt.Println("You're too slow.") // it's time to stop last case
			return
		}
	}
}

func boring(msg string) <-chan string { // Returns receive-only channel of strings.
	c := make(chan string)
	go func() { // We launch the goroutine from inside the function.
		for i := 0; ; i++ {
			c <- fmt.Sprintf("%s: %d", msg, i)
			time.Sleep(time.Duration(rand.Intn(1500)) * time.Millisecond)
		}
	}()
	return c // Return the channel to the caller.
}

有的时候boring服务可能是页面访问,获取资源等服务,我们并不清楚需要多长时间,但是我们有一个时间上限。这个时候可以使用库函数,time.After,到达等待时间后它会返回一个channel,这时我们可以退出程序。

总体超时

package main

import (
	"fmt"
	"math/rand"
	"time"
)

func main() {
	c := boring("Joe")
	timeout := time.After(5 * time.Second)
	for {
		select {
		case s := <-c:
			fmt.Println(s)
		case <-timeout:
			fmt.Println("You talk too much.")
			return
		}
	}
}

func boring(msg string) <-chan string { // Returns receive-only channel of strings.
	c := make(chan string)
	go func() { // We launch the goroutine from inside the function.
		for i := 0; ; i++ {
			c <- fmt.Sprintf("%s: %d", msg, i)
			time.Sleep(time.Duration(rand.Intn(1500)) * time.Millisecond)
		}
	}()
	return c // Return the channel to the caller.
}

我们只定义一个timeout,到达后就退出。

模式4:自定义退出

package main

import (
	"fmt"
	"math/rand"
	"time"
)

func main() {
	quit := make(chan bool)
	c := boring("Joe", quit)
	for i := rand.Intn(10); i >= 0; i-- { fmt.Println(<-c) }
	// modify by lady_killer9
	fmt.Println("You're boring!")
	quit <- true
}

func boring(msg string, quit <-chan bool) <-chan string {
	c := make(chan string)
	go func() {
		for i := 0; ; i++ {
			time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
			select {
			case c <- fmt.Sprintf("%s: %d", msg, i):
				// do nothing
			case <-quit:
				return
			}
		}
	}()
	return c
}

 想退出时,在select的某个return的case对应的channel中写入数据

 安全的退出

package main

import (
	"fmt"
	"math/rand"
	"time"
)

func cleanup() {
	// added by lady_killer9
	fmt.Println("clean up ")
}

func main() {
	quit := make(chan string)
	c := boring("Joe", quit)
	for i := rand.Intn(10); i >= 0; i-- { fmt.Println(<-c) }
	quit <- "Bye!"
	fmt.Printf("Joe says: %q\\n", <-quit)
}

func boring(msg string, quit chan string) <-chan string {
	c := make(chan string)
	go func() {
		for i := 0; ; i++ {
			time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
			select {
			case c <- fmt.Sprintf("%s: %d", msg, i):
				// do nothing
			case <-quit:
				cleanup()
				quit <- "See you!"
				return
			}
		}
	}()
	return c
}

有的时候,我们收到要关闭的消息后,需要进行文件关闭等清理工作,之后再告诉主程序我们清理完毕,可以退出了,防止内存泄露,文件占用等情况。

goroutine的速度

package main

import (
	"fmt"
	"time"
)

func f(left, right chan int) {
	left <- 1 + <-right
}

func main() {
	const n = 10000
	leftmost := make(chan int)
	right := leftmost
	left := leftmost
	for i := 0; i < n; i++ {
		right = make(chan int)
		go f(left, right)
		left = right
	}
	start := time.Now()
	go func(c chan int) { c <- 1 }(right)
	fmt.Println(<-leftmost)
	fmt.Println(time.Since(start))
}

 类似于传话筒游戏,我们不断的右耳朵进,左耳朵出。这里使用了10000个goroutine,结果如下

 大概花了3ms多一点,就完成了10000个goroutine的通信,如果使用Python等其他语言是很难达到的,这就是goroutine,简单,高效。

接下来是一个模式使用的例子

Google Search

Google搜索是一个很好的例子,我们输入问题,然后Google发给多个后端程序进行搜索,可能是网页,图片,视频等,最后将结果进行一个汇总并返回。接下来进行一个仿造:

Google Search 1.0

package main

import (
	"fmt"
	"math/rand"
	"time"
)

type Result string

func Google(query string) (results []Result) {
	results = append(results, Web(query))
	results = append(results, Image(query))
	results = append(results, Video(query))
	return
}

var (
	Web = fakeSearch("web")
	Image = fakeSearch("image")
	Video = fakeSearch("video")
)

type Search func(query string) Result

func fakeSearch(kind string) Search {
        return func(query string) Result {
	          time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
	          return Result(fmt.Sprintf("%s result for %q\\n", kind, query))
        }
}

func main() {
	rand.Seed(time.Now().UnixNano())
	start := time.Now()
	results := Google("golang")
	elapsed := time.Since(start)
	fmt.Println(results)
	fmt.Println(elapsed)
}

在Google时,只是将结果放入结果队列依次放入会等待上一个结果出来

 等待太浪费时间了,我们可以使用goroutine

Google Search 2.0

package main

import (
	"fmt"
	"math/rand"
	"time"
)

type Result string
type Search func(query string) Result

var (
	Web = fakeSearch("web")
	Image = fakeSearch("image")
	Video = fakeSearch("video")
)


func Google(query string) (results []Result) {
	c := make(chan Result)
	go func() { c <- Web(query) } ()
	go func() { c <- Image(query) } ()
	go func() { c <- Video(query) } ()

	for i := 0; i < 3; i++ {
		result := <-c
		results = append(results, result)
	}
	return
}

func fakeSearch(kind string) Search {
        return func(query string) Result {
	          time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
	          return Result(fmt.Sprintf("%s result for %q\\n", kind, query))
        }
}

func main() {
	rand.Seed(time.Now().UnixNano())
	start := time.Now()
	results := Google("golang")
	elapsed := time.Since(start)
	fmt.Println(results)
	fmt.Println(elapsed)
}

使用goroutine就不必等待上一个结果出来 

Google Search 2.1

package main

import (
	"fmt"
	"math/rand"
	"time"
)

type Result string
type Search func(query string) Result

var (
	Web = fakeSearch("web")
	Image = fakeSearch("image")
	Video = fakeSearch("video")
)

func Google(query string) (results []Result) {
	c := make(chan Result)
	go func() { c <- Web(query) } ()
	go func() { c <- Image(query) } ()
	go func() { c <- Video(query) } ()

	timeout := time.After(80 * time.Millisecond)
	for i := 0; i < 3; i++ {
		select {
		case result := <-c:
			results = append(results, result)
		case <-timeout:
			fmt.Println("timed out")
			return
		}
	}
	return
}

func fakeSearch(kind string) Search {
        return func(query string) Result {
	          time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
	          return Result(fmt.Sprintf("%s result for %q\\n", kind, query))
        }
}

func main() {
	rand.Seed(time.Now().UnixNano())
	start := time.Now()
	results := Google("golang")
	elapsed := time.Since(start)
	fmt.Println(results)
	fmt.Println(elapsed)
}

在Google 2.0的fan In模式的基础上,增加了总体超时模式,超过时不再等待其他结果。

我们如何避免丢弃慢速服务器的结果呢?例如,上面的video被丢弃了
答:复制服务器。向多个副本发送请求,并使用第一个响应

Google Search 3.0

package main

import (
	"fmt"
	"math/rand"
	"time"
)

type Result string
type Search func(query string) Result

func First(query string, replicas ...Search) Result {
	c := make(chan Result)
	searchReplica := func(i int) { c <- replicas[i](query) }
	for i := range replicas {
		go searchReplica(i)
	}
	return <-c
}

func main() {
	rand.Seed(time.Now().UnixNano())
	start := time.Now()
	result := First("golang",
		fakeSearch("replica 1"),
		fakeSearch("replica 2"))
	elapsed := time.Since(start)
	fmt.Println(result)
	fmt.Println(elapsed)
}

func fakeSearch(kind string) Search {
        return func(query string) Result {
	          time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
	          return Result(fmt.Sprintf("%s result for %q\\n", kind, query))
        }
}

对于同一个问题,我们启用多个副本,返回最快的服务器搜索到的结果

 接下来,我们针对web、image、video都启用多个服务器进行搜索

package main

import (
	"fmt"
	"math/rand"
	"time"
)

type Result string
type Search func(query string) Result

var (
	Web1 = fakeSearch("web1")
	Web2 = fakeSearch("web2")
	Image1 = fakeSearch("image1")
	Image2 = fakeSearch("image2")
	Video1 = fakeSearch("video1")
	Video2 = fakeSearch("video2")
)

func Google(query string) (results []Result) {
	c := make(chan Result)
	go func() { c <- First(query, Web1, Web2) } ()
	go func() { c <- First(query, Image1, Image2) } ()
	go func() { c <- First(query, Video1, Video2) } ()
	timeout := time.After(80 * time.Millisecond)
	for i := 0; i < 3; i++ {
		select {
		case result := <-c:
			results = append(results, result)
		case <-timeout:
			fmt.Println("timed out")
			return
		}
	}
	return
}

func First(query string, replicas ...Search) Result {
	c := make(chan Result)
	searchReplica := func(i int) {
		c <- replicas[i](query)
	}
	for i := range replicas {
		go searchReplica(i)
	}
        return <-c
}

func fakeSearch(kind string) Search {
        return func(query string) Result {
	          time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
	          return Result(fmt.Sprintf("%s result for %q\\n", kind, query))
        }
}

func main() {
	rand.Seed(time.Now().UnixNano())
	start := time.Now()
	results := Google("golang")
	elapsed := time.Since(start)
	fmt.Println(results)
	fmt.Println(elapsed)
}

通过多个副本,选择最快的一个的方式,基本可以保证每种类型的结果都能在超时时间内完成。

参考

Google IO 2012 Go Concurrency Patterns

b站中文字幕版

PPT

全部代码

 更多Go相关内容:Go-Golang学习总结笔记

有问题请下方评论,转载请注明出处,并附有原文链接,谢谢!如有侵权,请及时联系。如果您感觉有所收获,自愿打赏,可选择支付宝18833895206(小于),您的支持是我不断更新的动力。

以上是关于Go-并发模式2(Patterns)的主要内容,如果未能解决你的问题,请参考以下文章

Go-并发模式总结(扇入模式,超时模式,callback模式等)

golang代码片段(摘抄)

[Go] 并发和并行的区别

Go 并发模式 - 模式 #2

[Go] 通过 17 个简短代码片段,切底弄懂 channel 基础

通过 SingleFlight 模式学习 Go 并发编程