package main
import (
"fmt"
"runtime"
"sync"
"time"
)
func main() {
runtime.GOMAXPROCS(runtime.NumCPU())
in := input_gen(10)
out1 := stage1_gen(in, 2)
out2 := stage2_gen(out1, 3)
for o := range out2 {
p("output: ", o)
}
}
func input_gen(count int) chan int {
in := make(chan int)
go func() {
defer close(in)
for i := 0; i < count; i++ {
in <- i
}
}()
return in
}
func stage1_gen(in <-chan int, limit int) chan int {
out := make(chan int)
wg := CloseTrigger(limit, func() { close(out) })
for j := 0; j < limit; j++ {
go func() {
defer wg.Done()
for i := range in {
p("start stage 1 worker with input", i)
time.Sleep(time.Second / 2)
p("exit stage 1 worker with input", i)
out <- i
}
}()
}
return out
}
func stage2_gen(in <-chan int, limit int) chan int {
out := make(chan int)
wg := CloseTrigger(limit, func() { close(out) })
for j := 0; j < limit; j++ {
go func() {
defer wg.Done()
for i := range in {
p("start stage 2 worker with input", i)
time.Sleep(time.Second / 2)
p("exit stage 2 worker with input", i)
out <- i
}
}()
}
return out
}
func CloseTrigger(limit int, f func()) *sync.WaitGroup {
wg := new(sync.WaitGroup)
wg.Add(limit)
go func() {
wg.Wait()
f()
}()
return wg
}
func p(v ...interface{}) {
fmt.Println(v...)
}