浅谈errgroup的使用以及源码分析

Posted huageyiyangdewo

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了浅谈errgroup的使用以及源码分析相关的知识,希望对你有一定的参考价值。

本文讲解的是golang.org/x/sync这个包中的errgroup

1、errgroup 的基础介绍

学习过 Go 的朋友都知道 Go 实现并发编程是比较容易的事情,只需要使用go关键字就可以开启一个 goroutine。那对于并发场景中,如何实现goroutine的协调控制呢?常见的一种方式是使用sync.WaitGroup 来进行协调控制。

使用过sync.WaitGroup 的朋友知道,sync.WaitGroup 虽然可以实现协调控制,但是不能传递错误,那该如何解决呢?聪明的你可能马上想到使用 chan 或者是 context来传递错误,确实是可以的。那接下来,我们一起看看官方是怎么实现上面的需求的呢?

1.1 errgroup的安装

安装命令:

go get golang.org/x/sync

//下面的案例是基于v0.1.0 演示的
go get golang.org/x/sync@v0.1.0

1.2 errgroup的基础例子

这里我们需要请求3个url来获取数据,假设请求url2时报错,url3耗时比较久,需要等一秒。

package main

import (
	"errors"
	"fmt"
	"golang.org/x/sync/errgroup"
	"strings"
	"time"
)

func main()  
	queryUrls := map[string]string
		"url1": "http://localhost/url1",
		"url2": "http://localhost/url2",
		"url3": "http://localhost/url3",
	

	var eg errgroup.Group
	var results []string

	for _, url := range queryUrls 
		url := url
		eg.Go(func() error 
			result, err := query(url)
			if err != nil 
				return err
			
			results = append(results, fmt.Sprintf("url:%s -- ret: %v", url, result))
			return nil
		)
	
	
  // group 的wait方法,等待上面的 eg.Go 的协程执行完成,并且可以接受错误
	err := eg.Wait()
	if err != nil 
		fmt.Println("eg.Wait error:", err)
		return
	

	for k, v := range results 
		fmt.Printf("%v ---> %v\\n", k, v)
	


func query(url string) (ret string, err error) 
	// 假设这里是发送请求,获取数据
	if strings.Contains(url, "url2") 
		// 假设请求 url2 时出现错误
		fmt.Printf("请求 %s 中....\\n", url)
		return "", errors.New("请求超时")
	 else if strings.Contains(url, "url3") 
		// 假设 请求 url3 需要1秒
		time.Sleep(time.Second*1)
	
	fmt.Printf("请求 %s 中....\\n", url)
	return "success", nil

执行结果:

请求 http://localhost/url2 中....
请求 http://localhost/url1 中....
请求 http://localhost/url3 中....
eg.Wait error: 请求超时

果然,当其中一个goroutine出现错误时,会把goroutine中的错误传递出来。

我们自己运行一下上面的代码就会发现这样一个问题,请求 url2 出错了,但是依旧在请求 url3 。因为我们需要聚合 url1、url2、url3 的结果,所以当其中一个出现问题时,我们是可以做一个优化的,就是当其中一个出现错误时,取消还在执行的任务,直接返回结果,不用等待任务执行结果。

那应该如何做呢?

这里假设 url1 执行1秒,url2 执行报错,url3执行3秒。所以当url2报错后,就不用等url3执行结束就可以返回了。

package main

import (
	"context"
	"errors"
	"fmt"
	"golang.org/x/sync/errgroup"
	"strings"
	"time"
)

func main()  
	queryUrls := map[string]string
		"url1": "http://localhost/url1",
		"url2": "http://localhost/url2",
		"url3": "http://localhost/url3",
	

	var results []string
	ctx, cancel := context.WithCancel(context.Background())
	eg, errCtx := errgroup.WithContext(ctx)

	for _, url := range queryUrls 
		url := url
		eg.Go(func() error 
			result, err := query(errCtx, url)
			if err != nil 
        //其实这里不用手动取消,看完源码就知道为啥了
				cancel()
				return err
			
			results = append(results, fmt.Sprintf("url:%s -- ret: %v", url, result))
			return nil
		)
	

	err := eg.Wait()
	if err != nil 
		fmt.Println("eg.Wait error:", err)
		return
	

	for k, v := range results 
		fmt.Printf("%v ---> %v\\n", k, v)
	



func query(errCtx context.Context, url string) (ret string, err error) 
	fmt.Printf("请求 %s 开始....\\n", url)
	// 假设这里是发送请求,获取数据
	if strings.Contains(url, "url2") 
		// 假设请求 url2 时出现错误
		time.Sleep(time.Second*2)
		return "", errors.New("请求出错")


	 else if strings.Contains(url, "url3") 
		// 假设 请求 url3 需要1秒
		select 
		case <- errCtx.Done():
			ret, err = "", errors.New("请求3被取消")
			return
		case <- time.After(time.Second*3):
			fmt.Printf("请求 %s 结束....\\n", url)
			return "success3", nil
		
	 else 
		select 
		case <- errCtx.Done():
			ret, err = "", errors.New("请求1被取消")
			return
		case <- time.After(time.Second):
			fmt.Printf("请求 %s 结束....\\n", url)
			return "success1", nil
		
	



执行结果:

请求 http://localhost/url2 开始....
请求 http://localhost/url3 开始....
请求 http://localhost/url1 开始....
请求 http://localhost/url1 结束....
eg.Wait error: 请求出错

2、errgroup源码分析

看了上面的例子,我们对errgroup有了一定了解,接下来,我们一起看看errgroup做了那些封装。

2.1 errgroup.Group

errgroup.Group源码如下:

// A Group is a collection of goroutines working on subtasks that are part of
// the same overall task.
//
// A zero Group is valid, has no limit on the number of active goroutines,
// and does not cancel on error.
type Group struct 
  // context 的 cancel 方法
	cancel func()

	wg sync.WaitGroup
	
  //传递信号的通道,这里主要是用于控制并发创建 goroutine 的数量
  //通过 SetLimit 设置过后,同时创建的goroutine 最大数量为n
	sem chan token
	
  // 保证只接受一次错误
	errOnce sync.Once
  // 最先返回的错误
	err     error

看结构体中的内容,发现比原生的sync.WaitGroup多了下面的内容:

  • cancel func()
  • sem chan token
  • errOnce sync.Once
  • err error

2.2 WithContext 方法

// WithContext returns a new Group and an associated Context derived from ctx.
//
// The derived Context is canceled the first time a function passed to Go
// returns a non-nil error or the first time Wait returns, whichever occurs
// first.
func WithContext(ctx context.Context) (*Group, context.Context) 
	ctx, cancel := context.WithCancel(ctx)
	return &Groupcancel: cancel, ctx

方法逻辑还是比较简单的,主要做了两件事:

  • 使用contextWithCancel()方法创建一个可取消的Context
  • context.WithCancel(ctx)创建的 cancel赋值给 Group中的cancel

2.3 Go

1.2 最后一个例子说,不用手动去执行 cancel 的原因就在这里。

g.cancel() //这里就是为啥不用手动执行 cancel的原因

// Go calls the given function in a new goroutine.
// It blocks until the new goroutine can be added without the number of
// active goroutines in the group exceeding the configured limit.
//
// The first call to return a non-nil error cancels the group\'s context, if the
// group was created by calling WithContext. The error will be returned by Wait.
func (g *Group) Go(f func() error) 
	if g.sem != nil 
    //往 sem 通道中发送空结构体,控制并发创建 goroutine 的数量
		g.sem <- token
	

	g.wg.Add(1)
	go func() 
    // done()函数的逻辑就是当 f 执行完后,从 sem 取一条数据,并且 g.wg.Done()
		defer g.done()

		if err := f(); err != nil 
			g.errOnce.Do(func()  // 这里就是确保 g.err 只被赋值一次
				g.err = err
				if g.cancel != nil 
					g.cancel() //这里就是为啥不用手动执行 cancel的原因
				
			)
		
	()

2.4 TryGo

看注释,知道此函数的逻辑是:当正在执行的goroutine数量小于通过SetLimit()设置的数量时,可以启动成功,返回 true,否则启动失败,返回false。

// TryGo calls the given function in a new goroutine only if the number of
// active goroutines in the group is currently below the configured limit.
//
// The return value reports whether the goroutine was started.
func (g *Group) TryGo(f func() error) bool 
	if g.sem != nil 
		select 
		case g.sem <- token: // 当g.sem的缓冲区满了过后,就会执行default,也代表着未启动成功
			// Note: this allows barging iff channels in general allow barging.
		default:
			return false
		
	
  
  //----主要看上面的逻辑,下面的逻辑和Go中的一样-------

	g.wg.Add(1)
	go func() 
		defer g.done()

		if err := f(); err != nil 
			g.errOnce.Do(func() 
				g.err = err
				if g.cancel != nil 
					g.cancel()
				
			)
		
	()
	return true

2.5 Wait

代码逻辑很简单,这里主要注意这里:

//我看这里的时候,有点疑惑,为啥这里会去调用 cancel()方法呢?
//这里是为了代码的健壮性,用 context.WithCancel() 创建得到的 cancel,在代码执行完毕之前取消是一个好习惯
g.cancel()

// Wait blocks until all function calls from the Go method have returned, then
// returns the first non-nil error (if any) from them.
func (g *Group) Wait() error 
  g.wg.Wait() //通过 g.wg.Wait() 阻塞等待所有的 goroutine 执行完
	if g.cancel != nil 
    //我看这里的时候,有点疑惑,为啥这里会去调用 cancel()方法呢?
    //这里是为了代码的健壮性,用 context.WithCancel() 创建得到的 cancel,在代码执行完毕之前取消是一个好习惯
 		g.cancel()
	
	return g.err

2.6 SetLimit

看代码的注释,我们知道:SetLimit的逻辑主要是限制同时执行的 goroutines 的数量为n,当n小于0时,没有限制。如果有运行的 goroutine,调用此方法会报错。

// SetLimit limits the number of active goroutines in this group to at most n.
// A negative value indicates no limit.
//
// Any subsequent call to the Go method will block until it can add an active
// goroutine without exceeding the configured limit.
//
// The limit must not be modified while any goroutines in the group are active.
func (g *Group) SetLimit(n int) 
	if n < 0 
		g.sem = nil
		return
	
	if len(g.sem) != 0 
		panic(fmt.Errorf("errgroup: modify limit while %v goroutines in the group are still active", len(g.sem)))
	
	g.sem = make(chan token, n)

3、errgroup 容易忽视的坑

这个坑是看别人的记录看到的,对errgroup不太熟悉时,是不小心确实容易掉进去,所以摘抄了过来,如果侵权,请联系删除,谢谢!

原文链接:并发编程包之 errgroup

需求:

开启多个Goroutine去缓存中设置数据,同时开启一个Goroutine去异步写日志,很快我的代码就写出来了:

package main

import (
	"context"
	"errors"
	"fmt"
	"golang.org/x/sync/errgroup"
	"time"
)

func main()  
	g, ctx := errgroup.WithContext(context.Background())

	// 单独开一个协程去做其他的事情,不参与waitGroup
	go WriteChangeLog(ctx)

	for i:=0 ; i< 3; i++
		g.Go(func() error 
			return errors.New("访问redis失败\\n")
		)
	
	if err := g.Wait();err != nil
		fmt.Printf("appear error and err is %s",err.Error())
	
	time.Sleep(1 * time.Second)


func WriteChangeLog(ctx context.Context) error 
	select 
	case <- ctx.Done():
		return nil
	case <- time.After(time.Millisecond * 50):
		fmt.Println("write changelog")
	
	return nil

结果:

appear error and err is 访问redis失败

代码看着没有问题,但是日志一直没有写入。这是为什么呢?

其实原因就是因为这个ctxerrgroup.WithContext方法返回的一个带取消的ctx,我们把这个ctx当作父context传入WriteChangeLog方法中了,如果errGroup取消了,也会导致上下文的context都取消了,所以WriteChangelog方法就一直执行不到。

这个点是我们在日常开发中想不到的,所以需要注意一下~。

解决方法:

解决方法就是在 go WriteChangeLog(context.Background()) 传入新的ctx

参考资料:

八. Go并发编程--errGroup

并发编程包之 errgroup

上面这个案例中讲了一个容易忽视的坑,大家可以看看

浅谈ElasticSearch架构以及集成

简介

Elasticsearch是一个高度可扩展的开源的分布式Restful全文搜索和分析引擎。它允许用户快速的(近实时的)存储、搜索和分析海量数据。它通常用作底层引擎技术,为具有复杂搜索功能和要求的应用程序提供支持。以下是ES可用于的一些场景:

  1. 电商网站提供搜索功能:可使用 ES来存储产品的目录和库存,并为它们提供搜索和自动填充建议。
  2. 收集日志和交易数据,并进行分析:可使用 Logstash来收集、聚合和解析数据, 然后让 Logstash将此数据提供给 ES。然后可在 ES中搜索和聚合开发者感兴趣的信息。
  3. 需要快速调查、分析、可视化查询大量数据的特定问题:可以使用ES存储数据,然后使用 Kibana构建自定义仪表板,来可视化展示数据。还可以使用ES的聚合功能针对这些数据进行复杂的商业分析。

我们要认识一个人Doug Cutting

为什么要提Doug Cutting,因为Elasticsearch的底层是Lucene,而Lucene就是Doug Cutting大神写的。

引用来自于:鲜枣课堂

1998年9月4日,Google公司在美国硅谷成立。正如大家所知,它是一家做搜索引擎起家的公司。

http://static.cyblogs.com/640.jpg

无独有偶,一位名叫Doug Cutting的美国工程师,也迷上了搜索引擎。他做了一个用于文本搜索的函数库(姑且理解为软件的功能组件),命名为Lucene。

浅谈ElasticSearch架构以及集成
http://static.cyblogs.com/pUm6Hxkd434Mk1VTAruKa8.jpg

左为Doug Cutting,右为Lucene的LOGO

Lucene是用JAVA写成的,目标是为各种中小型应用软件加入全文检索功能。因为好用而且开源(代码公开),非常受程序员们的欢迎。

早期的时候,这个项目被发布在Doug Cutting的个人网站和SourceForge(一个开源软件网站)。后来,2001年底,Lucene成为Apache软件基金会jakarta项目的一个子项目。

浅谈ElasticSearch架构以及集成
http://static.cyblogs.com/iaqYeVeXiaLwMxssVyfyV0f69tfVMod6.jpg

Apache软件基金会,搞IT的应该都认识

2004年,Doug Cutting再接再励,在Lucene的基础上,和Apache开源伙伴Mike Cafarella合作,开发了一款可以代替当时的主流搜索的开源搜索引擎,命名为Nutch。

浅谈ElasticSearch架构以及集成
http://static.cyblogs.com/aqYeVeXiaLwMxssV.png

Nutch是一个建立在Lucene核心之上的网页搜索应用程序,可以下载下来直接使用。它在Lucene的基础上加了网络爬虫和一些网页相关的功能,目的就是从一个简单的站内检索推广到全球网络的搜索上,就像Google一样。

Nutch在业界的影响力比Lucene更大。

大批网站采用了Nutch平台,大大降低了技术门槛,使低成本的普通计算机取代高价的Web服务器成为可能。甚至有一段时间,在硅谷有了一股用Nutch低成本创业的潮流。

随着时间的推移,无论是Google还是Nutch,都面临搜索对象“体积”不断增大的问题。

尤其是Google,作为互联网搜索引擎,需要存储大量的网页,并不断优化自己的搜索算法,提升搜索效率。

浅谈ElasticSearch架构以及集成
http://static.cyblogs.com/TAruKa8WKbr3qDia9ba.jpg

Google搜索栏

在这个过程中,Google确实找到了不少好办法,并且无私地分享了出来。

2003年,Google发表了一篇技术学术论文,公开介绍了自己的谷歌文件系统GFS(Google File System)。这是Google公司为了存储海量搜索数据而设计的专用文件系统。

第二年,也就是2004年,Doug Cutting基于Google的GFS论文,实现了分布式文件存储系统,并将它命名为NDFS(Nutch Distributed File System)。

浅谈ElasticSearch架构以及集成
http://static.cyblogs.com/google_gfs_ndfs.jpg

还是2004年,Google又发表了一篇技术学术论文,介绍自己的MapReduce编程模型。这个编程模型,用于大规模数据集(大于1TB)的并行分析运算。

第二年(2005年),Doug Cutting又基于MapReduce,在Nutch搜索引擎实现了该功能。

浅谈ElasticSearch架构以及集成
http://static.cyblogs.com/goole_mapreduce.jpg

2006年,当时依然很厉害的Yahoo(雅虎)公司,招安了Doug Cutting。

浅谈ElasticSearch架构以及集成
http://static.cyblogs.com/goole_mapreduce_002.jpg

这里要补充说明一下雅虎招安Doug的背景:2004年之前,作为互联网开拓者的雅虎,是使用Google搜索引擎作为自家搜索服务的。在2004年开始,雅虎放弃了Google,开始自己研发搜索引擎。所以。。。

加盟Yahoo之后,Doug Cutting将NDFS和MapReduce进行了升级改造,并重新命名为Hadoop(NDFS也改名为HDFS,Hadoop Distributed File System)。

这个,就是后来大名鼎鼎的大数据框架系统——Hadoop的由来。而Doug Cutting,则被人们称为Hadoop之父。

浅谈ElasticSearch架构以及集成
http://static.cyblogs.com/goole_mapreduce_003.jpg

Hadoop这个名字,实际上是Doug Cutting他儿子的黄色玩具大象的名字。所以,Hadoop的Logo,就是一只奔跑的黄色大象。

浅谈ElasticSearch架构以及集成
http://static.cyblogs.com/goole_mapreduce_004.jpg

我们继续往下说。

还是2006年,Google又发论文了。

这次,它们介绍了自己的BigTable。这是一种分布式数据存储系统,一种用来处理海量数据的非关系型数据库。

Doug Cutting当然没有放过,在自己的hadoop系统里面,引入了BigTable,并命名为HBase。

浅谈ElasticSearch架构以及集成
http://static.cyblogs.com/goole_mapreduce_005.jpg

好吧,反正就是紧跟Google时代步伐,你出什么,我学什么。

所以,Hadoop的核心部分,基本上都有Google的影子。

浅谈ElasticSearch架构以及集成
http://static.cyblogs.com/goole_mapreduce_006.png

其实从这里也能看到,站在巨人肩膀上或者仿照强者,也可以走出一条属于自己的道路。

安装Elasticsearch

➜  Tools  brew search elasticsearch
==> Formulae
elasticsearch                                  elasticsearch@2.4                              elasticsearch@5.6

➜  Tools  brew install elasticsearch@5.6
==> Downloading https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-5.6.16.tar.gz
######################################################################## 100.0%
Warning: elasticsearch@5.6 has been deprecated!
==> Caveats
Data:    /usr/local/var/elasticsearch/elasticsearch_chenyuan/
Logs:    /usr/local/var/log/elasticsearch/elasticsearch_chenyuan.log
Plugins: /usr/local/opt/elasticsearch@5.6/libexec/plugins/
Config:  /usr/local/etc/elasticsearch/
plugin script: /usr/local/opt/elasticsearch@5.6/libexec/bin/elasticsearch-plugin

elasticsearch@5.6 is keg-only, which means it was not symlinked into /usr/local,
because this is an alternate version of another formula.

If you need to have elasticsearch@5.6 first in your PATH run:
  echo 'export PATH="/usr/local/opt/elasticsearch@5.6/bin:$PATH"' >> ~/.zshrc


To have launchd start elasticsearch@5.6 now and restart at login:
  brew services start elasticsearch@5.6
Or, if you don't want/need a background service you can just run:
  /usr/local/opt/elasticsearch@5.6/bin/elasticsearch
==> Summary

以上是关于浅谈errgroup的使用以及源码分析的主要内容,如果未能解决你的问题,请参考以下文章

浅谈AQS同步队列(含ReentrantLock加锁和解锁源码分析)

浅谈AQS锁实现机制(含ReentrantReadWriteLock读写锁加锁解锁相关源码分析)

mybatis缓存源码分析之浅谈缓存设计

浅谈ElasticSearch架构以及集成

浅谈ThreadPoolExecutor线程池底层源码

浅谈Java集合(底层源码解析)