go语言设计模式之Concurrency workers pool
Posted aguncn
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了go语言设计模式之Concurrency workers pool相关的知识,希望对你有一定的参考价值。
worker.go
package main import ( "fmt" "strings" ) type WorkerLauncher interface { LaunchWorker(in chan Request) } type PreffixSuffixWorker struct { id int prefixS string suffixS string } func (w *PreffixSuffixWorker) LaunchWorker(in chan Request) { w.prefix(w.append(w.upperCase(in))) } func (w *PreffixSuffixWorker) upperCase(in <-chan Request) <-chan Request { out := make(chan Request) go func() { for msg := range in { s, ok := msg.Data.(string) if !ok { msg.handler(nil) continue } msg.Data = strings.ToUpper(s) out <- msg } close(out) }() return out } func (w *PreffixSuffixWorker) append(in <-chan Request) <-chan Request { out := make(chan Request) go func() { for msg := range in { uppercaseString, ok := msg.Data.(string) if !ok { msg.handler(nil) continue } msg.Data = fmt.Sprintf("%s%s", uppercaseString, w.suffixS) out <- msg } close(out) }() return out } func (w *PreffixSuffixWorker) prefix(in <-chan Request) { go func() { for msg := range in { uppercasedStringWithSuffix, ok := msg.Data.(string) if !ok { msg.handler(nil) continue } msg.handler(fmt.Sprintf("%s%S", w.prefixS, uppercasedStringWithSuffix)) } }() }
dispatcher.go
package main import ( "time" ) type Dispatcher interface { LaunchWorker(w WorkerLauncher) MakeRequest(Request) Stop() } type dispatcher struct { inCh chan Request } func (d *dispatcher) LaunchWorker(w WorkerLauncher) { w.LaunchWorker(d.inCh) } func (d *dispatcher) Stop() { close(d.inCh) } func (d *dispatcher) MakeRequest(r Request) { select { case d.inCh <- r: case <-time.After(time.Second * 5): return } } func NewDispatcher(b int) Dispatcher { return &dispatcher{ inCh: make(chan Request, b), } }
workers_pipeline.go
package main import ( "fmt" "log" "sync" ) type RequestHandler func(interface{}) type Request struct { Data interface{} handler RequestHandler } func NewStringRequest(s string, id int, wg *sync.WaitGroup) Request { myRequest := Request{ Data: "Hello", handler: func(i interface{}) { defer wg.Done() s, ok := i.(string) if !ok { log.Fatal("Invalid casting to string") } fmt.Println(s) }, } return myRequest } func main() { bufferSize := 100 var dispatcher Dispatcher = NewDispatcher(bufferSize) workers := 3 for i := 0; i < workers; i++ { var w WorkerLauncher = &PreffixSuffixWorker{ prefixS: fmt.Sprintf("WorkerID: %d -> ", i), suffixS: " World", id: i, } dispatcher.LaunchWorker(w) } requests := 10 var wg sync.WaitGroup wg.Add(requests) for i := 0; i < requests; i++ { req := NewStringRequest("(MSG_ID: %d) -> Hello", i, &wg) dispatcher.MakeRequest(req) } dispatcher.Stop() wg.Wait() }
以上是关于go语言设计模式之Concurrency workers pool的主要内容,如果未能解决你的问题,请参考以下文章