如何正确实现并发goroutine(和/或限制它们)以产生一致的结果?
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了如何正确实现并发goroutine(和/或限制它们)以产生一致的结果?相关的知识,希望对你有一定的参考价值。
我正在使用这个:(符号是[]字符串以及filteredSymbols)
concurrency := 5
sem := make(chan bool, concurrency)
for i := range symbols {
sem <- true
go func(int) {
defer func() { <-sem }()
rows, err := stmt.Query(symbols[i])
if <some condition is true> {
filteredSymbols = append(filteredSymbols, symbols[i])
}
}(i)
}
for i := 0; i < cap(sem); i++ {
sem <- true
}
限制同时运行的goroutine数量。我需要限制它,因为每个goroutine都与Postgres数据库交互,有时我确实有超过3000个符号来评估。该代码用于分析大型金融数据,股票和其他证券。我也使用相同的代码从db获取OHLC和预先计算的数据。这是一种现代方法吗?我问这个是因为WaitGroups已经存在,而我正在寻找一种方法来代替它们。
另外,我发现上面的方法有时会产生不同的结果。我有一个代码,有时得到的filteredSymbols数量是1409.如果不改变参数,它将产生1407个结果,然后有1408个结果。我甚至有一个代码,结果有很大的缺陷。
下面的代码非常不一致,所以我删除了并发。 (注意,在下面的代码中,我甚至不必限制并发goroutine,因为它们只使用内存资源)。删除并发修复了它
func getCommonSymbols(symbols1 []string, symbols2 []string) (symbols []string) {
defer timeTrack(time.Now(), "Get common symbols")
// concurrency := len(symbols1)
// sem := make(chan bool, concurrency)
// for _, s := range symbols1 {
for _, sym := range symbols1 {
// sym := s
// sem <- true
// go func(string) {
// defer func() { <-sem }()
for k := range symbols2 {
if sym == symbols2[k] {
symbols = append(symbols, sym)
break
}
}
// }(sym)
}
// for i := 0; i < cap(sem); i++ {
// sem <- true
// }
return
}
答案
您有一个数据竞争,多个goroutines同时更新filteredSymbols。您可以进行的最小修改是在附加调用周围添加互斥锁,例如:
concurrency := 5
sem := make(chan bool, concurrency)
l := sync.Mutex{}
for i := range symbols {
sem <- true
go func(int) {
defer func() { <-sem }()
rows, err := stmt.Query(symbols[i])
if <some condition is true> {
l.Lock()
filteredSymbols = append(filteredSymbols, symbols[i])
l.Unlock()
}
}(i)
}
for i := 0; i < cap(sem); i++ {
sem <- true
}
Race Detector也可以帮助你发现这一点。一种常见的替代方法是使用通道将工作转换为goroutine,并使用通道来获得结果,例如。
concurrency := 5
workCh := make(chan string, concurrency)
resCh := make(chan string, concurrency)
workersWg := sync.WaitGroup{}
// start the required number of workers, use the WaitGroup to see when they're done
for i := 0; i < concurrency; i++ {
workersWg.Add(1)
go func() {
defer workersWg.Done()
for symbol := range workCh {
// do some work
if cond {
resCh <- symbol
}
}
}()
}
go func() {
// when all the workers are done, close the resultsCh
workersWg.Wait()
close(resCh)
}()
// submit all the work
for _, s := range symbols {
workCh <- s
}
close(workCh)
// collect up the results
for r := range resCh {
filteredSymbols = append(filteredSymbols, r)
}
以上是关于如何正确实现并发goroutine(和/或限制它们)以产生一致的结果?的主要内容,如果未能解决你的问题,请参考以下文章