使用 QueueUserWorkItem(不是 .NET)等待工作项完成池化

Posted

技术标签:

【中文标题】使用 QueueUserWorkItem(不是 .NET)等待工作项完成池化【英文标题】:wait for works item to complete pooled using QueueUserWorkItem (not .NET) 【发布时间】:2009-08-18 09:54:48 【问题描述】:

我有一些使用遗留 QueueUserWorkItem 函数汇集的工作项(我需要支持比 vista 更早的操作系统,所以

for( <loop thru some items >)

    QueueUserWorkItem( )

在继续下一步之前,我需要等待这些。 我见过几个类似的答案......但它们在.NET中。 我正在考虑为每个项目存储一个事件并等待它们(喘气!),但还有其他更好、更轻量级的方法吗? (没有内核锁)

澄清:我知道使用事件。我对不需要内核级锁的解决方案感兴趣。

【问题讨论】:

【参考方案1】:

AFAIK 唯一可以做到这一点的方法是在每个任务完成时设置一个 InterlockIncrement'ed 计数器。

然后你可以做一个

while( counter < total )
    Sleep( 0 );

任务可以发出事件(或其他同步对象)信号,您可以执行以下操作

while( count < total )
    WaitForSingleObject( hEvent, INFINITE );

第二种方法意味着主线程使用更少的处理时间。

编辑:TBH 避免内核锁定的唯一方法是自旋锁定,这意味着您将有一个内核浪费时间,否则这些时间可用于处理您的工作队列(或其他任何事情)。如果你真的必须避免内核锁,那么使用 Sleep(0) 旋转锁。但是,我绝对建议只使用内核锁,将额外的 CPU 时间收回来进行有价值的处理,并且不要再担心“非”问题。

【讨论】:

Sleep(0) 和 WaitForSingleObject(INFINITE) 都会导致 UI 或 STA 线程出现问题。这两个调用都会阻止消息循环,从而阻止窗口和 COM 消息处理,从而导致 UI 和 COM 对象无响应。此答案中的模式是等待多个线程的最糟糕的模式。【参考方案2】:

如果您实际拆分执行堆栈,则无需等待即可:直到您在调用线程上执行的 for 循环,在 for 循环之后您在最后一个队列线程上完成:

 CallerFunction(...)
 
   sharedCounter = <numberofitems>;
   for (<loop>)
   
     QueueUserWorkItem(WorkerFunction, ...);
   
   exit thread; (return, no wait logic)
 

WorkerFunction(, ...) 

  // execute queued item here
  counter = InterlockeDdecrement(sharedCounter);
  if (0 == counter)
  
    // this is the last thread, all others are done
   continue with the original logic from  CallerFunction here
  

这是否可行取决于许多因素,我不能说如果不了解调用者上下文的更多信息,是否可以在调用线程上暂停其执行并在排队线程上恢复它。顺便说一句,“退出线程”并不是指突然的线程中止,而是优雅的返回,并且整个调用堆栈都准备将执行上下文移交给队列线程。我认为这不是一项微不足道的任务。

【讨论】:

+1:我们使用非常相似的模式,除了我们使用单个内核事件来表示所有工作人员都已完成。 @Filip:是的,最后一个线程发出事件信号,调用者唤醒并恢复。正如 OP 所要求的那样,我上面提出的完全是无事件的,但代价是“切换”执行上下文的复杂性。事实上,这个“开关”只不过是一个伪装的异步回调调用,所以并不是那么难。 如果没有自动选择答案,我会选择这个。 :(【参考方案3】:

这是我过去成功使用的一种方法:

将您的“完成”任务实现为引用计数对象。每个工作线程在工作时都持有对该对象的引用,然后在完成时释放它。完成任务在 ref 计数达到 0 时开始工作。

示例

注意:我的 C++ 在主要使用 C# 工作多年后已经生锈了,所以将下面的示例视为伪代码

完成任务

class MyCompletionTask 

private:

    long _refCount;

public:

    MyCompletionTask() 
        _refCount = 0;
    

public: // Reference counting implementation

   // Note ref-counting mechanism must be thread-safe,
   // so we use the Interlocked functions.

    void AddRef()
    
        InterlockedIncrement(&_refCount);
    

    void Release()
    
        long newCount = InterlockedDecrement(&_refCount);

        if (newCount == 0) 
             DoCompletionTask();
             delete this;
        
    

private:

    void DoCompletionTask()
    
        // TODO: Do your thing here
    

调用代码

MyCompletionTask *task = new MyCompletionTask();

task->AddRef(); // add a reference for the main thread

for( <loop thru some items >)

    task->AddRef(); // Add a reference on behalf of the worker
                    // thread.  The worker thread is responsible
                    // for releasing when it is done.

    QueueUserWorkItem(ThreadProc, (PVOID)task, <etc> );


task->Release(); // release main thread reference

// Note: this thread can exit.  The completion task will run
// on the thread that does the last Release.

线程进程

void ThreadProc(void *context) 

    MyCompletionTask *task = (MyCompletionTask)context;

    //  TODO: Do your thing here

    task->Release();

使用这种方法要记住的一点是,完成任务在其上完成的线程是不确定的。这将取决于哪个工作线程首先完成(或主线程,如果所有工作线程在主线程调用 Release 之前完成)

【讨论】:

但这不会给我同样的问题吗?我一直在等待调用线程完成任务完成。对不起,如果我不明白。 使用这种方法,您无需在主线程中等待。在完成任务中,放置所有工作线程完成后要运行的代码,并在调用最后一个 Release 时调用。【参考方案4】:
    Sleep(N)(顺便说一下 .Net spinlock 在 Sleep(0) 上实现) WaitForSingleObject 与 Event 或其他同步对象一起使用,无论是否超时

【讨论】:

【参考方案5】:

您可以有一个计数器,每个任务在完成时自动递增(如前所述),然后让它检查它是否是最后一个任务(计数器 == 总计),如果是,则设置一个事件。然后主线程只需要WaitForSingleObject()。只需确保检查也以原子方式完成即可。

// on task finished
Lock();
count++;
bool done = count == total;
Unlock();
if ( done )
    SetEvent( hEvent );

【讨论】:

【参考方案6】:

我认为使用事件几乎是你最好的选择(也是最轻量级的)。我唯一要补充的是,在等待工作项完成时,您可以通过以下方式简化代码:

HANDLE WorkEvents[ 5 ]; // store you event handles here

...

WaitForMultipleObjects(5, WorkEvents, TRUE, INFINITE);

这将等待所有事件在一个系统调用中完成。


编辑:如果您不介意旋转工作项,另一种方法是在每个线程上调用 GetExitCodeThread,检查退出状态。

【讨论】:

【参考方案7】:

您有多种选择:

使用一个 Semaphore 而不是多个 Event 实例 - 它支持计数锁定/等待。 使用多个 Critical Section 实例 - 它们比 Event 轻。 从工作线程使用InterlockedDecrement 并在等待线程句柄上旋转Wait...(超时时间短)循环。您必须根据自己的喜好调整超时值。此外,如果您的主线程是 STA,请确保您使用了正确的 Wait... 方法来旋转消息循环。请注意,在某些情况下,您的等待也会因 APC 事件而中断。 使用其他答案中的任何技术在最后一个排队的工作线程上传输完成任务。请注意,这仅在完成任务未绑定到主线程时才有用。

【讨论】:

信号量在这种情况下不起作用,因为当计数>0 时会发出信号。他想等到所有线程都完成(count==0)。 @DSO - 真的吗?他不能倒数吗? :-)

以上是关于使用 QueueUserWorkItem(不是 .NET)等待工作项完成池化的主要内容,如果未能解决你的问题,请参考以下文章

使用私有成员函数调用 Win32 QueueUserWorkItem()

为啥在“QueueUserWorkItem”中使用“SND_SYNC”不好?

在高流量场景中使用 ASP.NET 中的 ThreadPool.QueueUserWorkItem

ThreadPool.QueueUserWorkItem 方法 (WaitCallback)

如何将信息从 ThreadPool.QueueUserWorkItem 传递回 UI 线程?

如何实现线程池的 QueueUserWorkItem 方法的延续?