go语言设计模式之Concurrency workers pool

Posted aguncn

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了go语言设计模式之Concurrency workers pool相关的知识,希望对你有一定的参考价值。

worker.go

package main

import (
	"fmt"
	"strings"
)

type WorkerLauncher interface {
	LaunchWorker(in chan Request)
}

type PreffixSuffixWorker struct {
	id      int
	prefixS string
	suffixS string
}

func (w *PreffixSuffixWorker) LaunchWorker(in chan Request) {
	w.prefix(w.append(w.upperCase(in)))
}

func (w *PreffixSuffixWorker) upperCase(in <-chan Request) <-chan Request {
	out := make(chan Request)

	go func() {
		for msg := range in {
			s, ok := msg.Data.(string)

			if !ok {
				msg.handler(nil)
				continue
			}

			msg.Data = strings.ToUpper(s)
			out <- msg
		}
		close(out)
	}()
	return out
}

func (w *PreffixSuffixWorker) append(in <-chan Request) <-chan Request {
	out := make(chan Request)

	go func() {
		for msg := range in {
			uppercaseString, ok := msg.Data.(string)

			if !ok {
				msg.handler(nil)
				continue
			}
			msg.Data = fmt.Sprintf("%s%s", uppercaseString, w.suffixS)
			out <- msg
		}
		close(out)
	}()
	return out
}

func (w *PreffixSuffixWorker) prefix(in <-chan Request) {
	go func() {
		for msg := range in {
			uppercasedStringWithSuffix, ok := msg.Data.(string)
			if !ok {
				msg.handler(nil)
				continue
			}
			msg.handler(fmt.Sprintf("%s%S", w.prefixS, uppercasedStringWithSuffix))
		}
	}()
}

  

dispatcher.go

package main

import (
	"time"
)

type Dispatcher interface {
	LaunchWorker(w WorkerLauncher)
	MakeRequest(Request)
	Stop()
}

type dispatcher struct {
	inCh chan Request
}

func (d *dispatcher) LaunchWorker(w WorkerLauncher) {
	w.LaunchWorker(d.inCh)
}

func (d *dispatcher) Stop() {
	close(d.inCh)
}

func (d *dispatcher) MakeRequest(r Request) {
	select {
	case d.inCh <- r:
	case <-time.After(time.Second * 5):
		return
	}
}

func NewDispatcher(b int) Dispatcher {
	return &dispatcher{
		inCh: make(chan Request, b),
	}
}

  

workers_pipeline.go

package main

import (
	"fmt"
	"log"
	"sync"
)

type RequestHandler func(interface{})

type Request struct {
	Data    interface{}
	handler RequestHandler
}

func NewStringRequest(s string, id int, wg *sync.WaitGroup) Request {
	myRequest := Request{
		Data: "Hello",
		handler: func(i interface{}) {
			defer wg.Done()
			s, ok := i.(string)
			if !ok {
				log.Fatal("Invalid casting to string")
			}
			fmt.Println(s)
		},
	}
	return myRequest
}

func main() {
	bufferSize := 100
	var dispatcher Dispatcher = NewDispatcher(bufferSize)

	workers := 3
	for i := 0; i < workers; i++ {
		var w WorkerLauncher = &PreffixSuffixWorker{
			prefixS: fmt.Sprintf("WorkerID: %d -> ", i),
			suffixS: " World",
			id:      i,
		}
		dispatcher.LaunchWorker(w)
	}
	requests := 10
	var wg sync.WaitGroup
	wg.Add(requests)

	for i := 0; i < requests; i++ {
		req := NewStringRequest("(MSG_ID: %d) -> Hello", i, &wg)
		dispatcher.MakeRequest(req)
	}
	dispatcher.Stop()
	wg.Wait()
}

  技术图片

以上是关于go语言设计模式之Concurrency workers pool的主要内容,如果未能解决你的问题,请参考以下文章

go语言设计模式之Concurrency pipeline

go语言设计模式之Concurrency workers pool

GO_11:GO语言基础之并发concurrency

GO语言基础之并发concurrency

《Go语言实战》摘录:7.3 并发模式 - work

《Go语言实战》摘录:7.3 并发模式 - work