// Worker represents the worker that executes the job
type Worker struct {
JobChannel chan Job
quit chan bool
}
func NewWorker(jobChannel chan Job) Worker {
return Worker{
JobChannel: jobChannel,
quit: make(chan bool)}
}
// Start method starts the run loop for the worker, listening for a quit channel in
// case we need to stop it
func (w Worker) Start() {
go func() {
for {
select {
case job := <-w.JobChannel:
// we have received a work request.
if err := job.Payload.UploadToS3(); err != nil {
log.Errorf("Error uploading to S3: %s", err.Error())
}
case <-w.quit:
// we have received a signal to stop
return
}
}
}()
}
func (w Worker) Stop() {
go func() {
w.quit <- true
}()
}
type Dispatcher struct {
JobChannel chan Job
}
func NewDispatcher(maxWorkers int) *Dispatcher {
pool := make(chan Job, maxWorkers)
return &Dispatcher{JobChannel: pool}
}
func (d *Dispatcher) Run() {
// starting n number of workers
for i := 0; i < d.maxWorkers; i++ {
worker := NewWorker(d.JobChannel)
worker.Start()
}
go d.dispatch()
}
func (d *Dispatcher) dispatch() {
for {
select {
case job := <-JobQueue:
// a job request has been received
go func(job Job) {
// dispatch the job to the workers job channel
// we can buffer up to one job per worker
d.JobChannel <- job
}(job)
}
}
}