如何在执行一组工作人员之间正确延迟

Posted

技术标签:

【中文标题】如何在执行一组工作人员之间正确延迟【英文标题】:How to properly delay between executing a pool of workers 【发布时间】:2022-01-19 17:45:27 【问题描述】:

美好的一天

我正在尝试在工作人员执行之间实现正确的延迟,例如,工作人员需要完成 30 个任务并进入睡眠状态 5 秒,我如何在代码中准确跟踪 30 个任务已完成,然后才进入睡眠状态 5 秒

下面是创建 30 个工人 池的代码,这些工人依次以无序的方式一次执行 30 件任务,代码如下:


import (
    "fmt"
    "math/rand"
    "sync"
    "time"
)

type Job struct 
    id       int
    randomno int

type Result struct 
    job         Job
    sumofdigits int


var jobs = make(chan Job, 10)
var results = make(chan Result, 10)

func digits(number int) int 
    sum := 0
    no := number
    for no != 0 
        digit := no % 10
        sum += digit
        no /= 10
    
    time.Sleep(2 * time.Second)
    return sum

func worker(wg *sync.WaitGroup) 
    for job := range jobs 
        output := Resultjob, digits(job.randomno)
        results <- output
    
    wg.Done()

func createWorkerPool(noOfWorkers int) 
    var wg sync.WaitGroup
    for i := 0; i < noOfWorkers; i++ 
        wg.Add(1)
        go worker(&wg)
    

    wg.Wait()
    close(results)

func allocate(noOfJobs int) 
    for i := 0; i < noOfJobs; i++ 
        if i != 0 && i%30 == 0 
            fmt.Printf("SLEEPAGE 5 sec...")
            time.Sleep(10 * time.Second)

        
        randomno := rand.Intn(999)
        job := Jobi, randomno
        jobs <- job
    
    close(jobs)

func result(done chan bool) 
    for result := range results 
        fmt.Printf("Job id %d, input random no %d , sum of digits %d\n", result.job.id, result.job.randomno, result.sumofdigits)
    
    done <- true

func main() 
    startTime := time.Now()
    noOfJobs := 100
    go allocate(noOfJobs)
    done := make(chan bool)
    go result(done)
    noOfWorkers := 30
    createWorkerPool(noOfWorkers)
    <-done
    endTime := time.Now()
    diff := endTime.Sub(startTime)
    fmt.Println("total time taken ", diff.Seconds(), "seconds")

播放:https://go.dev/play/p/lehl7hoo-kp

目前尚不清楚如何确保完成 30 个任务以及在哪里插入延迟,如果有任何帮助,我将不胜感激

【问题讨论】:

所有worker都休眠5秒还是当worker完成30个任务时休眠5秒? 每个人都需要入睡 您当前的代码似乎已经在 allocate 函数中执行此操作,此时您睡眠了 10 秒。唯一的问题是您正在使用缓冲通道,因此工作人员将继续工作,直到它为空。如果您将var jobs = make(chan Job, 10) 更改为var jobs = make(chan Job),工人将完成他们当前的工作然后停止,这是否会产生预期的结果? 我没有看到工作有任何特别的变化,虽然一切都正常,但我不确定这和我的代码是否正确 【参考方案1】:

好的,让我们从这个工作示例开始:

func Test_t(t *testing.T) 

    // just a published, this publishes result on a chan
    publish := func(s int, ch chan int, wg *sync.WaitGroup) 
        ch <- s // this is blocking!!!
        wg.Done()
    

    wg := &sync.WaitGroup
    wg.Add(100)

    // we'll use done channel to notify the work is done
    res := make(chan int)
    done := make(chan struct)
    // create worker that will notify that all results were published
    go func() 
        wg.Wait()
        done <- struct
    ()
    
    // let's create a jobs that publish on our res chan
    // please note all goroutines are created immediately
    for i := 0; i < 100; i++ 
        go publish(i, res, wg)
    

    // lets get 30 args and then wait
    var resCounter int
forloop:
    for 
        select 
        case ss := <-res:
            println(ss)
            resCounter += 1
            // break the loop
            if resCounter%30 == 0 
                // after receiving 30 results we are blocking this thread
                // no more results will be taken from the channel for 5 seconds
                println("received 30 results, waiting...")
                time.Sleep(5 * time.Second)
            
        case <-done:
            // we are done here, let's break this infinite loop
            break forloop
        
    

我希望这进一步表明它是如何做到的。

那么,您的代码有什么问题? 老实说,它看起来不错(我的意思是发布了 30 个结果,然后代码等待,然后是另外 30 个结果,等等),但问题是你想在哪里等待?

我猜有几种可能:

创建工人(这就是您的代码现在的工作方式,正如我所见,它以 30 个包发布作业;请注意,您在 digit 函数中的 2 秒延迟仅适用于代码的 goroutine被执行)

触发工人(所以“等待”代码应该在工人函数中,不允许运行更多工人 - 所以它必须观察发布了多少结果)

处理结果(这是我的代码的工作方式,正确的同步在 forloop 中)

【讨论】:

感谢您的代码,我会检查您的代码并让您知道它是否适合我 是否有可能以某种方式限制每单位时间执行的发布数量,这样就不会破坏同步?例如,发布的数量 = 30,间隔 = 1 秒是否有可能以某种方式在 1 秒内分配发布的执行,例如? 1 秒内发表不超过 30 篇文章? 我会使用 time.Ticker(每 X 秒在频道上发布一条消息)并在其后发布消息。无论如何,您遇到的问题是位置同步 - 但就像我说的那样,这取决于应该同步的内容(并且您需要以某种方式使用频道制作魔法)。

以上是关于如何在执行一组工作人员之间正确延迟的主要内容,如果未能解决你的问题,请参考以下文章

Foreach 循环仅针对第一组工作人员返回错误消息

风暴,螺栓延迟和总延迟之间的巨大差异?

NodeJS Cluster如何在工作人员之间共享对象数组

理解延迟与吞吐量

如何取消延迟的线程?

我可以在 parfor (MATLAB) 上在工作人员之间发送和接收数据吗?