如何以“优先级”进行多线程?
Posted
技术标签:
【中文标题】如何以“优先级”进行多线程?【英文标题】:How to go about multithreading with "priority"? 【发布时间】:2012-12-02 08:49:29 【问题描述】:我有多个线程在后台处理多个文件,而程序处于空闲状态。 为了提高磁盘吞吐量,我使用临界区来确保不会有两个线程同时使用同一个磁盘。
(伪)代码如下所示:
void RunThread(HANDLE fileHandle)
// Acquire CRITICAL_SECTION for disk
CritSecLock diskLock(GetDiskLock(fileHandle));
for (...)
// Do some processing on file
一旦用户请求处理文件,我需要停止所有线程 -- 除了正在处理请求文件的线程。处理完文件后,我想再次恢复所有线程。
鉴于SuspendThread
是个坏主意,我该如何停止除处理相关输入的线程之外的所有线程?
我需要什么样的线程对象/功能——互斥体、信号量、事件或其他?我将如何使用它们? (我希望与 Windows XP 兼容。)
【问题讨论】:
我认为条件变量是最好的方法。 听起来您不仅需要暂停线程。您要运行的线程可能没有它需要的磁盘锁。您需要正在“暂停”的线程来释放磁盘锁,以便处理请求的文件的线程可以获得所需的磁盘锁。没有内置机制可以做到这一点,您需要线程本身在其工作时偶尔检查某些条件。一种方法是让每个线程处理一个文件块,释放磁盘锁,等到它有权继续(可能在条件变量上)然后重新获得磁盘锁。 @SteveJessop:是的,完全正确!这是问题的另一部分...我需要以某种方式执行if (ThisThreadNeedsToSuspend()) ReleaseDiskLock(); WaitForResumeNotice(); ReacquireDiskLock();
之类的操作,但我不确定实现它的正确方法是什么,而不会导致ThisThreadNeedsToSuspend
和WaitForResumeNotice
之间的竞争条件。
@Mehrdad:好的,如果这是你需要的,我会发布答案:-)
@SteveJessop:很酷,谢谢! :)
【参考方案1】:
我建议您以完全不同的方式进行。如果您真的希望每个磁盘只有一个线程(我不相信这是一个好主意),那么您应该为每个磁盘创建一个线程,并在排队处理文件时分发文件。
为了实现对特定文件的优先级请求,我会让一个线程在其正常处理过程中的多个点检查“优先级槽”(当然在其主队列等待循环中)。
【讨论】:
问题是,在我得到用户的输入之前,所有文件都没有任何“优先级”。一旦我这样做了,“磁盘线程”通常已经开始处理不同的文件......此时我需要等待它完成才能继续,这会给用户带来很多延迟。当我得到用户输入时,我真的需要停止一切......所以我看不到每个磁盘的线程将如何工作,除非我使用纤程(难以管理)或以某种方式“拆分”处理每个文件的工作(如果可能,我想避免,因为它变得丑陋)。 正如我所说,我会在处理过程中多点检查。或者,只需降低线程的优先级,让一些更高优先级的线程来处理用户请求。 @SebastianRedl:“检查”部分不是问题——问题是,如果我做检查并看到还有其他需要处理的东西同一个(磁盘)线程,我如何“停止”我正在做的事情?在我可以处理不同的文件之前,它必须继续。 @SebastianRedl:优先级并没有多大帮助,因为它实际上只对 CPU 有效——而且因为处理每个文件都是 I/O 绑定和内存绑定(在过程),它最终并没有很好地确定优先级。 +1 塞巴斯蒂安的方法是正确的。 @Mehrdad 如果您遵循塞巴斯蒂安的建议,您的问题并不难。 (你也与自己的规范相矛盾——你说用户想要设置优先级,然后你说你不能很好地优先级)事实上,按照他的建议,你可以更好地确保你不会遇到一些疯狂线程错误 - 每个线程/磁盘有一个互斥的作业队列可以工作............在你有工作代码之后担心效率(你总是可以重构)。【参考方案2】:这里的困难不是优先级本身,而是你希望一个线程退出它持有的锁,让另一个线程拿走它。 “优先级”与应安排运行一组可运行线程中的哪一个有关——您希望使一个线程不可运行(因为它正在等待另一个线程持有的锁)。
所以,你想实现(如你所说):
if (ThisThreadNeedsToSuspend()) ReleaseDiskLock(); WaitForResume(); ReacquireDiskLock();
由于您(明智地)使用范围锁,我想反转逻辑:
while (file_is_not_finished)
WaitUntilThisThreadCanContinue();
CritSecLock diskLock(blah);
process_part_of_the_file();
ReleasePriority();
...
void WaitUntilThisThreadCanContinue()
MutexLock lock(thread_priority_mutex);
while (thread_with_priority != NOTHREAD and thread_with_priority != thisthread)
condition_variable_wait(thread_priority_condvar);
void GiveAThreadThePriority(threadid)
MutexLock lock(thread_priority_mutex);
thread_with_priority = threadid;
condition_variable_broadcast(thread_priority_condvar);
void ReleasePriority()
MutexLock lock(thread_priority_mutex);
if (thread_with_priority == thisthread)
thread_with_priority = NOTHREAD;
condition_variable_broadcast(thread_priority_condvar);
阅读条件变量——所有最近的操作系统都有它们,具有类似的基本操作。它们也在 Boost 和 C++11 中。
如果您无法编写函数process_part_of_the_file
,那么您就不能以这种方式构造它。相反,您需要一个可以释放和重新获得磁盘锁的作用域锁。最简单的方法是使其成为互斥锁,然后您可以使用相同的互斥锁等待 condvar。您仍然可以以几乎相同的方式使用 mutex/condvar 对和 thread_with_priority
对象。
您可以根据需要系统对优先级更改的响应程度来选择“文件的一部分”的大小。如果您需要它非常响应,那么该方案实际上并不起作用——这是合作多任务处理。
我对这个答案并不完全满意,如果有很多其他线程已经在同一个磁盘锁上等待,那么具有优先级的线程可能会被饿死很长时间。为了避免这种情况,我会考虑更多。可能不应该有每个磁盘的锁,而是应该在条件变量及其关联的互斥锁下处理整个事情。不过,我希望这能让你开始。
【讨论】:
+1 感谢关于条件变量的提示; XP 没有它们,但我会看看我是否可以实现与它们类似的东西。我必须重新阅读答案几次才能更好地理解它:)但到目前为止,这看起来最有希望。 啊,没想到。检查 Boost 在 XP 上提供的内容,他们可能已经为您完成了工作。 我最终找到了一种更简单的方法,似乎适用于我的特定情况,但我会接受这个,因为它似乎是通用情况的更好答案。【参考方案3】:您可以要求线程优雅地停止。只需检查线程内循环中的一些变量,然后根据其值继续或终止工作。
关于它的一些想法:
此值的设置和检查应在临界区内完成。 因为临界区会减慢线程的速度,所以检查应该经常进行,以便在需要时快速停止线程,并且很少进行检查,这样线程就不会因获取和释放临界区而停止。【讨论】:
【参考方案4】:每个工作线程处理文件后,检查与该线程关联的条件变量。条件变量可以简单地实现为布尔+临界区。或使用 InterlockedExchange* 功能。老实说,我通常只是在线程之间使用不受保护的布尔值来表示“需要退出”——如果工作线程可能正在休眠,有时会使用事件句柄。
为每个线程设置条件变量后,主线程通过WaitForSingleObject等待每个线程退出。
DWORD __stdcall WorkerThread(void* pThreadData)
ThreadData* pData = (ThreadData*) pTheradData;
while (pData->GetNeedToExit() == false)
ProcessNextFile();
return 0;
void StopWokerThread(HANDLE hThread, ThreadData* pData)
pData->SetNeedToExit = true;
WaitForSingleObject(hThread);
CloseHandle(hThread);
struct ThreadData()
CRITICAL_SECITON _cs;
ThreadData()
InitializeCriticalSection(&_cs);
~ThreadData()
DeleteCriticalSection(&_cs);
ThreadData::SetNeedToExit()
EnterCriticalSection(&_cs);
_NeedToExit = true;
LeaveCriticalSeciton(&_cs);
bool ThreadData::GetNeedToExit()
bool returnvalue;
EnterCriticalSection(&_cs);
returnvalue = _NeedToExit = true;
LeaveCriticalSeciton(&_cs);
return returnvalue;
;
【讨论】:
我需要了解更多关于条件变量的知识;我以前没有用过它们。 :) 但是关于你的其余答案——一旦我处理了用户输入,我需要其他线程继续他们正在做的事情,所以它不是真的“需要退出”而是“需要暂停”。 一旦线程完成执行(或者如果用户取消操作),线程将告诉主线程刚刚处理的文件——此时我可以唤醒所有线程向上。问题是,如何在没有SuspendThread
和ResumeThread
的情况下正确地做到这一点,这很危险。
你为什么要在线程暂停时保持线程活跃?只要让它们退出,然后在可以安全地进行更多后台工作时创建新线程。
但是我将不得不重新开始。拆分每个文件的处理对我来说并不容易——这就是我想暂停线程的原因。
我要说的一点是,线程在处理单个文件(工作单元)之前不会退出。在它处理一个单独的文件之后,如果它看到主线程告诉它“是时候退出”,它只是清理一个出口。当它再次被创建时,它会返回处理下一个文件。【参考方案5】:
您还可以使用线程池并通过使用I/O Completion port 来调节它们的工作。
池中的线程通常会休眠以等待 I/O 完成端口事件/活动。 当您有请求时,I/O 完成端口会释放线程并开始执行工作。
【讨论】:
【参考方案6】:好的,这个怎么样:
每个磁盘有两个线程,用于高优先级和低优先级请求,每个线程都有自己的输入队列。
最初提交的高优先级磁盘任务随后将与任何正在运行的低优先级任务并行发出其磁盘请求。它可以重置低优先级线程在可能时等待的 ManualResetEvent(WaitForSingleObject),因此如果高优先级线程正在执行磁盘操作,它将被阻塞。高优先级线程应该在完成任务后设置事件。
这应该将磁盘抖动限制在提交高优先级任务和低优先级线程可以在 MRE 上等待之间的时间间隔(如果有)。提高为高优先级队列提供服务的线程的 CPU 优先级可能有助于提高此时间间隔内高优先级工作的性能。
编辑:“队列”是指线程安全的、阻塞的、生产者-消费者队列,(明确一点:)。
更多编辑 - 如果发布线程需要通知作业完成,则发布到队列的任务可以包含一个“OnCompletion”事件,以使用任务对象作为参数进行调用。例如,事件处理程序可以发出一个 AutoResetEvent 信号,表明发起线程正在等待,从而提供同步通知。
【讨论】:
我的印象是,特定文件(和其他文件)运行后可以从低优先级“提升”到高优先级。基本上,后台任务知道有许多文件要处理,并以任意顺序运行它们,但如果用户对某个特定文件感兴趣,则应切换到该文件。那么在您的解决方案中,增加文件优先级的机制是什么? 哦 - 如果这确实是一个要求,我还没有 :( 我没有考虑在提交后重新确定任务的优先级,我认为这些任务会被遗忘,直到一个或其他线程触发完成事件。实时优先级绑定意味着必须维护某种列表或对现有任务的其他引用,例如在 listBox 项目中。然后必须更改实际的“优先级” . 那会很混乱——我会努力避免那种枪战。以上是关于如何以“优先级”进行多线程?的主要内容,如果未能解决你的问题,请参考以下文章