golang Golang的工作队列

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了golang Golang的工作队列相关的知识,希望对你有一定的参考价值。

# Golang Job Queue

A running example of the code from:

* http://marcio.io/2015/07/handling-1-million-requests-per-minute-with-golang
* http://nesv.github.io/golang/2014/02/25/worker-queues-in-go.html

### Step 1

Small refactorings made to original code:

* Use non-exported private methods
* Remove global variables
* Bring the flags closer to their usage in main()

### Step 2

Simplify the worker queue by removing the `Dispatcher`.

* Creates workers directly and passes job queue to them

https://gist.github.com/harlow/dbcd639cf8d396a2ab73#file-worker_refactored-go

### Performance

The test run with Pprof show performance characteristics remain the same between both examples.

## Run the Application

Boot either the `worker_original.go` or the `worker_refactored.go` applications. Use flags to adjust the `max_workers` and `max_queue_size` to override the default values.

    $ go run worker_original.go -max_workers 5

cURL the application from another terminal window:

    $ for i in {1..15}; do curl localhost:8080/work -d name=job$i -d delay=$(expr $i % 9 + 1)s; done
// Original code with Dispatcher
package main

import (
	_ "expvar"
	"flag"
	"fmt"
	"net/http"
	_ "net/http/pprof"
	"os"
	"time"
)

// Job holds the attributes needed to perform unit of work.
type Job struct {
	Name  string
	Delay time.Duration
}

// NewWorker creates takes a numeric id and a channel w/ worker pool.
func NewWorker(id int, workerPool chan chan Job) Worker {
	return Worker{
		id:         id,
		jobQueue:   make(chan Job),
		workerPool: workerPool,
		quitChan:   make(chan bool),
	}
}

type Worker struct {
	id         int
	jobQueue   chan Job
	workerPool chan chan Job
	quitChan   chan bool
}

func (w Worker) start() {
	go func() {
		for {
			// Add my jobQueue to the worker pool.
			w.workerPool <- w.jobQueue

			select {
			case job := <-w.jobQueue:
				// Dispatcher has added a job to my jobQueue.
				fmt.Printf("worker%d: started %s, blocking for %f seconds\n", w.id, job.Name, job.Delay.Seconds())
				time.Sleep(job.Delay)
				fmt.Printf("worker%d: completed %s!\n", w.id, job.Name)
			case <-w.quitChan:
				// We have been asked to stop.
				fmt.Printf("worker%d stopping\n", w.id)
				return
			}
		}
	}()
}

func (w Worker) stop() {
	go func() {
		w.quitChan <- true
	}()
}

// NewDispatcher creates, and returns a new Dispatcher object.
func NewDispatcher(jobQueue chan Job, maxWorkers int) *Dispatcher {
	workerPool := make(chan chan Job, maxWorkers)

	return &Dispatcher{
		jobQueue:   jobQueue,
		maxWorkers: maxWorkers,
		workerPool: workerPool,
	}
}

type Dispatcher struct {
	workerPool chan chan Job
	maxWorkers int
	jobQueue   chan Job
}

func (d *Dispatcher) run() {
	for i := 0; i < d.maxWorkers; i++ {
		worker := NewWorker(i+1, d.workerPool)
		worker.start()
	}

	go d.dispatch()
}

func (d *Dispatcher) dispatch() {
	for {
		select {
		case job := <-d.jobQueue:
			go func() {
				fmt.Printf("fetching workerJobQueue for: %s\n", job.Name)
				workerJobQueue := <-d.workerPool
				fmt.Printf("adding %s to workerJobQueue\n", job.Name)
				workerJobQueue <- job
			}()
		}
	}
}

func requestHandler(w http.ResponseWriter, r *http.Request, jobQueue chan Job) {
	// Make sure we can only be called with an HTTP POST request.
	if r.Method != "POST" {
		w.Header().Set("Allow", "POST")
		w.WriteHeader(http.StatusMethodNotAllowed)
		return
	}

	// Parse the delay.
	delay, err := time.ParseDuration(r.FormValue("delay"))
	if err != nil {
		http.Error(w, "Bad delay value: "+err.Error(), http.StatusBadRequest)
		return
	}

	// Validate delay is in range 1 to 10 seconds.
	if delay.Seconds() < 1 || delay.Seconds() > 10 {
		http.Error(w, "The delay must be between 1 and 10 seconds, inclusively.", http.StatusBadRequest)
		return
	}

	// Set name and validate value.
	name := r.FormValue("name")
	if name == "" {
		http.Error(w, "You must specify a name.", http.StatusBadRequest)
		return
	}

	// Create Job and push the work onto the jobQueue.
	job := Job{Name: name, Delay: delay}
	jobQueue <- job

	// Render success.
	w.WriteHeader(http.StatusCreated)
}

func main() {
	var (
		maxWorkers   = flag.Int("max_workers", 5, "The number of workers to start")
		maxQueueSize = flag.Int("max_queue_size", 100, "The size of job queue")
		port         = flag.String("port", "8080", "The server port")
	)
	flag.Parse()

	// Create the job queue.
	jobQueue := make(chan Job, *maxQueueSize)

	// Start the dispatcher.
	dispatcher := NewDispatcher(jobQueue, *maxWorkers)
	dispatcher.run()

	// Start the HTTP handler.
	http.HandleFunc("/work", func(w http.ResponseWriter, r *http.Request) {
		requestHandler(w, r, jobQueue)
	})
	log.Fatal(http.ListenAndServe(":"+*port, nil))
}
package main

import (
	_ "expvar"
	"flag"
	"fmt"
	"log"
	"net/http"
	_ "net/http/pprof"
	"time"
)

type job struct {
	name     string
	duration time.Duration
}

type worker struct {
	id int
}

func (w worker) process(j job) {
	fmt.Printf("worker%d: started %s, working for %f seconds\n", w.id, j.name, j.duration.Seconds())
	time.Sleep(j.duration)
	fmt.Printf("worker%d: completed %s!\n", w.id, j.name)
}

func requestHandler(jobCh chan job, w http.ResponseWriter, r *http.Request) {
	// Make sure we can only be called with an HTTP POST request.
	if r.Method != "POST" {
		w.Header().Set("Allow", "POST")
		w.WriteHeader(http.StatusMethodNotAllowed)
		return
	}

	// Parse the durations.
	duration, err := time.ParseDuration(r.FormValue("delay"))
	if err != nil {
		http.Error(w, "Bad delay value: "+err.Error(), http.StatusBadRequest)
		return
	}

	// Validate delay is in range 1 to 10 seconds.
	if duration.Seconds() < 1 || duration.Seconds() > 10 {
		http.Error(w, "The delay must be between 1 and 10 seconds, inclusively.", http.StatusBadRequest)
		return
	}

	// Set name and validate value.
	name := r.FormValue("name")
	if name == "" {
		http.Error(w, "You must specify a name.", http.StatusBadRequest)
		return
	}

	// Create Job and push the work onto the jobCh.
	job := job{name, duration}
	go func() {
		fmt.Printf("added: %s %s\n", job.name, job.duration)
		jobCh <- job
	}()

	// Render success.
	w.WriteHeader(http.StatusCreated)
	return
}

func main() {
	var (
		maxQueueSize = flag.Int("max_queue_size", 100, "The size of job queue")
		maxWorkers   = flag.Int("max_workers", 5, "The number of workers to start")
		port         = flag.String("port", "8080", "The server port")
	)
	flag.Parse()

	// create job channel
	jobCh := make(chan job, *maxQueueSize)

	// create workers
	for i := 0; i < *maxWorkers; i++ {
		w := worker{i}
		go func(w worker) {
			for j := range jobCh {
				w.process(j)
			}
		}(w)
	}

	// handler for adding jobs
	http.HandleFunc("/work", func(w http.ResponseWriter, r *http.Request) {
		requestHandler(jobCh, w, r)
	})
	log.Fatal(http.ListenAndServe(":"+*port, nil))
}
package main

import (
	_ "expvar"
	"fmt"
	"math/rand"
	"sync"
	"time"
)

type job struct {
	name     string
	duration time.Duration
}

type worker struct {
	id int
}

func (w worker) process(j job) {
	fmt.Printf("worker%d: started %s, working for %fs\n", w.id, j.name, j.duration.Seconds())
	time.Sleep(j.duration)
	fmt.Printf("worker%d: completed %s!\n", w.id, j.name)
}

func main() {
	wg := &sync.WaitGroup{}
	jobCh := make(chan job)

	// start workers
	for i := 0; i < 10; i++ {
		wg.Add(1)
		w := worker{i}
		go func(w worker) {
			for j := range jobCh {
				w.process(j)
			}
			wg.Done()
		}(w)
	}

	// add jobs to queue
	for i := 0; i < 100; i++ {
		name := fmt.Sprintf("job-%d", i)
		duration := time.Duration(rand.Intn(1000)) * time.Millisecond
		fmt.Printf("adding: %s %s\n", name, duration)
		jobCh <- job{name, duration}
	}

	// close jobCh and wait for workers to complete
	close(jobCh)
	wg.Wait()
}

golang使用rabbitmq工作队列

在helloworld那一章讲了一个简单的一个生产者一个消费者的模型,但生产系统中很多时候是多个生产者或者多个消费者同时处理的。

生产者

	body := bodyFrom(os.Args)
	err = ch.Publish("", q.Name, false, false, amqp.Publishing
		DeliveryMode: amqp.Persistent,
		ContentType:  "text/plain",
		Body:         []byte(body),
	)

bodyFrom

func bodyFrom(args []string) string 
	if len(args) > 1 
		return strings.Join(args[1:], " ")
	
	return "default"

消费者

	msgs, err := ch.Consume(q.Name, "", true, false, false, false, nil)
	failOnError(err, "Failed to register a consumer")


	go func() 
		for msg := range msgs 
			log.Printf("Received a message: %s", msg.Body)
			dotCount := bytes.Count(msg.Body, []byte("."))
			t := time.Duration(dotCount)
			time.Sleep(t * time.Second)
			log.Printf("Done")
		
	()

默认情况下消息会循环分发到消费者

消息ack

当消费者的autoack为true时,一旦收到消息就会直接把该消息设置为删除状态,如果消息的处理时间之内,消费者挂掉了那么这条消息就会丢失掉。
rabbitmq支持消息ack机制,将autoack设为false,当处理完毕再手动触发ack操作。如果处理消息的过程中挂掉了,那么这条消息就会分发给其他都消费者。

	msgs, err := ch.Consume(q.Name, "", false, false, false, false, nil)
	failOnError(err, "Failed to register a consumer")


	go func() 
		for msg := range msgs 
			log.Printf("Received a message: %s", msg.Body)
			dotCount := bytes.Count(msg.Body, []byte("."))
			t := time.Duration(dotCount)
			time.Sleep(t * time.Second)
			log.Printf("Done")
			msg.Ack(false)
		
	()

消息持久性

之前说的是消费者挂掉不会丢失消息的解决方法,那么如果mq挂掉呢,如果保证mq挂掉都不会丢失消息,那么就是消息持久性做到事情。

消息持久性分为两步

队列的持久性

将队列设为持久的,当mq重启的时候不会丢失队列

q, err := ch.QueueDeclare("workQueue", true, false, false, false, nil)

消息的持久性

发布消息时,设置消息为持久的

	err = ch.Publish("", q.Name, false, false, amqp.Publishing
		DeliveryMode: amqp.Persistent,
		ContentType:  "text/plain",
		Body:         []byte(body),
	)

公平分配

循环分配方式只能做到平均分配,每个消费者一个一次分配,但是平均不一定就是公平,比如有的消费者所在的服务器资源较差的情况下,平均肯定是不合理的,为了做到公平那么使用如下设置即可

err = ch.Qos(1, 0, false)

这段代码的意思是最多同时处理一个,多了先别发给我。

以上是关于golang Golang的工作队列的主要内容,如果未能解决你的问题,请参考以下文章

golang使用rabbitmq工作队列

golang使用rabbitmq工作队列

AMQP Golang优先级不工作

golang使用rabbitmq发布/订阅

golang使用rabbitmq发布/订阅

golang使用rabbitmq发布/订阅