golang pipeline_by_running_chan.go

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了golang pipeline_by_running_chan.go相关的知识,希望对你有一定的参考价值。

package main

import (
	"fmt"
	"runtime"
	"sync"
	"time"
)

func main() {
	runtime.GOMAXPROCS(runtime.NumCPU())

	in := input_gen(10)
	out1 := stage1_gen(in, 2)
	out2 := stage2_gen(out1, 3)

	for o := range out2 {
		p("output: ", o)
	}
}

func input_gen(count int) chan int {
	in := make(chan int)
	go func() {
		defer close(in)
		for i := 0; i < count; i++ {
			in <- i
		}
	}()
	return in
}

func stage1_gen(in <-chan int, limit int) chan int {
	out := make(chan int)
	wg := CloseTrigger(limit, func() { close(out) })

	for j := 0; j < limit; j++ {
		go func() {
			defer wg.Done()
			for i := range in {
				p("start stage 1 worker with input", i)
				time.Sleep(time.Second / 2)
				p("exit stage 1 worker with input", i)
				out <- i
			}
		}()
	}
	return out
}

func stage2_gen(in <-chan int, limit int) chan int {
	out := make(chan int)
	wg := CloseTrigger(limit, func() { close(out) })

	for j := 0; j < limit; j++ {
		go func() {
			defer wg.Done()
			for i := range in {
				p("start stage 2 worker with input", i)
				time.Sleep(time.Second / 2)
				p("exit stage 2 worker with input", i)
				out <- i
			}
		}()
	}
	return out
}

func CloseTrigger(limit int, f func()) *sync.WaitGroup {
	wg := new(sync.WaitGroup)
	wg.Add(limit)
	go func() {
		wg.Wait()
		f()
	}()
	return wg
}

func p(v ...interface{}) {
	fmt.Println(v...)
}

Golang 学习之路

Golang基础

Golang基础之包概念
Golang基础之数据类型
Golang基础之流程控制
Golang基础之数组
Golang基础之切片
Golang基础之map
Golang基础之函数
Golang基础之指针
Golang基础之结构体
Golang基础之接口

Golang Web开发

Golang 其它

算法

以上是关于golang pipeline_by_running_chan.go的主要内容,如果未能解决你的问题,请参考以下文章

Golang入门到项目实战 第一个golang应用

golang编译androidso无法加载

golang如何打印内存内容

Golang入门到项目实战 golang匿名函数

json [Golang] golang #golang #snippets中有用的片段

golang使用成本高