使用Go的sync.ErrGroup类快速并行搜索文件
Posted henreash
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了使用Go的sync.ErrGroup类快速并行搜索文件相关的知识,希望对你有一定的参考价值。
Go的主要特性之一是其强大的并发性能,如通道和goroutines。但是对于新手来说,goroutines是一个陌生的概念,新手在掌握并发概念过程中,经常遇到挫折。
Go团队发布的第一个帮助管理goroutines复杂性的工具是sync.WaitGroup,创建一个WaitGroup,该WaitGroup将阻塞,直到指定数量的goroutines完成执行为止。以下是文档中的一个例子:
var wg sync.WaitGroup
var urls = []string
"http://www.golang.org/",
"http://www.google.com/",
"http://www.somestupidname.com/",
for _, url := range urls
// Increment the WaitGroup counter.
wg.Add(1)
// Launch a goroutine to fetch the URL.
go func(url string)
// Decrement the counter when the goroutine completes.
defer wg.Done()
// Fetch the URL.
http.Get(url)
(url)
// Wait for all HTTP fetches to complete.
wg.Wait()
WaitGroups大大简化了在Go中处理并发的过程,减少了启动goroutines时必须进行的计算。每次启动goroutine时,调用Add()来增加WaitGroup计数。完成后,调用wg.Done()。调用wg.Wait(),会阻塞直到全部任务完成。但有一个缺点,在goroutines中报错,很难找出错误是什么。
sync.WaitGroup功能扩展
最近,Go提供了一个sync.ErrGroup包。sync.ErrGroup扩展了sync.WaitGroup,添加了错误传播和在发生不可恢复的错误或超时时取消整个goroutines集的能力。下面是使用ErrGroup重写的相同示例:
var g errgroup.Group
var urls = []string
"http://www.golang.org/",
"http://www.google.com/",
"http://www.somestupidname.com/",
for _, url := range urls
// Launch a goroutine to fetch the URL.
url := url // https://golang.org/doc/faq#closures_and_goroutines
g.Go(func() error
// Fetch the URL.
resp, err := http.Get(url)
if err == nil
resp.Body.Close()
return err
)
// Wait for all HTTP fetches to complete.
if err := g.Wait(); err == nil
fmt.Println("Successfully fetched all URLs.")
上面的g.Go()函数是一个包装器,传递一个匿名函数,但仍可以捕获它可能返回的错误,避免使用额外的错误处理代码。提高使用goroutines的开发效率。
为测试sync.ErrGroup的功能,下面提供了一个小程序,递归地搜索一个目录,查找具有指定模式的Go文件。这个功能可以用于在使用包的项目中,查找go源码目录树中过期或更新过的文件。为测试其他特性,还为应用程序添加了时间限制。如果达到时间限制,所有的搜索和处理goroutines将被取消,程序将退出。
在范例程序目录中运行代码,产生以下结果:
$ gogrep -timeout 1000ms . fmt
gogrep.go
1 hits
如果输入错误的参数,将会提示正确的用法:
gogrep by Brian Ketelsen
Flags:
-timeout duration
timeout in milliseconds (default 500ms)
Usage:
gogrep [flags] path pattern
sync.ErrGroup简化代码
下面看一下sync.ErrGroup是如何简化代码的。为易于理解编写main()函数,其他函数在main中被调用。
package main
import (
"bytes"
"flag"
"fmt"
"io/ioutil"
"log"
"os"
"path/filepath"
"strings"
"time"
"golang.org/x/net/context"
"golang.org/x/sync/errgroup"
)
func main()
duration := flag.Duration("timeout", 500*time.Millisecond, "timeout in milliseconds")
flag.Usage = func()
fmt.Printf("%s by Brian Ketelsen\\n", os.Args[0])
fmt.Println("Usage:")
fmt.Printf(" gogrep [flags] path pattern \\n")
fmt.Println("Flags:")
flag.PrintDefaults()
flag.Parse()
if flag.NArg() != 2
flag.Usage()
os.Exit(-1)
path := flag.Arg(0)
pattern := flag.Arg(1)
ctx, _ := context.WithTimeout(context.Background(), *duration)
m, err := search(ctx, path, pattern)
if err != nil
log.Fatal(err)
for _, name := range m
fmt.Println(name)
fmt.Println(len(m), "hits")
前15行配置程序期望参数及提示信息,在参数传递错误时,打印友好的提示信息。有用的代码从第16行开始:
ctx, _ := context.WithTimeout(context.Background(), *duration)
这里,创建了新的context.Context实例,指定超时时间。duration变量即为超时时长。当运行超时时,ctx和从其继承的所有上下文将在通道上收到一条消息,提醒它们超时。WithTimeout还返回一个我们不需要的cancel函数,将其赋值为" _ "忽略它。
下一行调用search()函数,传递了ctx实例、搜索路径和搜索模式参数。最后,将结果打印到终端,然后打印搜索结果的计数。
分析search() 函数
search()函数比main()稍长一些,将对它进行分别解释。
首先创建一个新的errgroup实例,其中包含了传递过来的上下文变量,并发执行后续的所有过程。
func search(ctx context.Context, root string, pattern string) ([]string, error)
g, ctx := errgroup.WithContext(ctx)
接下来,创建了一个通道来保存搜索到的文件。稍后把搜索候选对象发送到此通道进行进一步处理,以确定它们是否与提供的模式匹配。这个通道有一个100的缓冲区,因此可以在文件搜索goroutines完成之前开始处理goroutines。
paths := make(chan string, 100)
errgroup类型有两个方法:Wait和Go。Go方法启动子任务,Wait方法阻塞例程直至所有子任务完成。这里给Go传递一个可返回错误信息的匿名函数。
g.Go(func() error
接下来,延迟关闭“paths”通道,以表示所有目录搜索已经完成。这样可以使用Go的“range”语句来启动多个goroutines处理候选文件。
defer close(paths)
最后,使用filepath包的Walk()函数递归遍历命令行参数中指定的目录中的所有文件。它检查文件是可读的,如果文件不是.go为后缀则不进行处理。在非Go源码文件中搜索Go源代码是没有意义的。
return filepath.Walk(root, func(path string, info os.FileInfo, err error) error
if err != nil
return err
if !info.Mode().IsRegular()
return nil
if !info.IsDir() && !strings.HasSuffix(info.Name(), ".go")
return nil
见证sync.Errgroup的威力
符合上面三个条件的文件被抛弃,它不是我们要搜索的候选对象。其他都是我们要检查的Go源文件。sync.Errgroup的威力开始显现。在两个可能的情况下使用select语句。第一种情况是将文件名发送到另一个goroutine将搜索其内容的“paths”通道。第二种情况是等待上下文超时。只要未超时,就会继续发送当前文件进行处理。当计时器过期时,上下文的Done通道将发送一条捕获到的消息,导致goroutine返回,从而停止文件搜索。
select
case paths <- path:
case <-ctx.Done():
return ctx.Err()
return nil
)
)
接下来,我创建了一个通道来存储与搜索模式匹配的所有文件。
c := make(chan string,100)
现在可以遍历paths通道,并搜索其中的内容。
for path := range paths
需要指出的是:因为goroutine是一个闭包,它会在执行的时候捕获周围变量的值。因为多个goroutine并行执行,所以需要首先获取path变量的当前值,否则所有goroutine都将对path变量的值进行操作,该值可能在for循环的下一次迭代中更改。这是必须避免的。
p := path
现在对每个候选文件启动一个匿名函数。函数读取go源码文件,检查是否含有与搜索模式匹配的内容。
g.Go(func() error
data, err := ioutil.ReadFile(p)
if err != nil
return err
if !bytes.Contains(data, []byte(pattern))
return nil
同样,再次使用select语句在处理完成之前监视是否超时。
select
case c <- p:
case <-ctx.Done():
return ctx.Err()
return nil
)
这个函数将等待errgroup上的所有goroutines完成,然后关闭结果通道,发出所有处理完成的信号并终止上面的range语句。
go func()
g.Wait()
close(c)
()
现在在通道中获取结果,存入切片,返回给main函数中。
var m []string
for r := range c
m = append(m, r)
最后,检查errgroup中的错误。如果上面的任何goroutines返回错误,都会将其它返回给main()。
return m, g.Wait()
这个小程序远非最优。例如,在测试模式匹配的内容之前,我将源代码文件的全部内容读入内存。使用流来读取会更有效率。任意通道缓冲区大小为100意味着最多可以有100个不同的goroutine将文件读入内存。如果您有一个包含大量Go源文件的大目录,它可能会显著降低您的内存消耗。我能够在几秒钟内搜索我的整个Go源代码树来查找“fmt”包,而没有任何CPU或内存使用量的峰值,所以暂不做调整。
以上是关于使用Go的sync.ErrGroup类快速并行搜索文件的主要内容,如果未能解决你的问题,请参考以下文章