StaTaskScheduler 和 STA 线程消息泵送
Posted
技术标签:
【中文标题】StaTaskScheduler 和 STA 线程消息泵送【英文标题】:StaTaskScheduler and STA thread message pumping 【发布时间】:2014-02-08 07:46:38 【问题描述】:TL;DR:StaTaskScheduler
运行的任务内部出现死锁。 长版本:
我正在使用 Parallel Team 的 ParallelExtensionsExtras 中的 StaTaskScheduler
来托管第三方提供的一些旧版 STA COM 对象。 StaTaskScheduler
实现细节的描述如下:
好消息是 TPL 的实现可以在任一平台上运行 MTA 或 STA 线程,并考虑到相关差异 WaitHandle.WaitAll 等底层 API(仅支持 MTA 当方法提供多个等待句柄时线程)。
我认为这意味着 TPL 的阻塞部分将使用等待 API 来泵送消息,例如 CoWaitForMultipleHandles
,以避免在 STA 线程上调用时出现死锁情况。
在我的情况下,我相信正在发生以下情况:进程内 STA COM 对象 A 调用进程外对象 B,然后期望 B 回调作为传出调用的一部分。
简化形式:
var result = await Task.Factory.StartNew(() =>
// in-proc object A
var a = new A();
// out-of-proc object B
var b = new B();
// A calls B and B calls back A during the Method call
return a.Method(b);
, CancellationToken.None, TaskCreationOptions.None, staTaskScheduler);
问题是,a.Method(b)
永远不会返回。据我所知,这是因为BlockingCollection<Task>
内部某处的阻塞等待不会发送消息,所以我对引用语句的假设可能是错误的。
已编辑 在测试 WinForms 应用程序的 UI 线程上执行相同的代码时(即提供 TaskScheduler.FromCurrentSynchronizationContext()
而不是 staTaskScheduler
到 Task.Factory.StartNew
)。
解决这个问题的正确方法是什么?我是否应该实现一个自定义同步上下文,它将使用CoWaitForMultipleHandles
显式泵送消息,并将其安装在StaTaskScheduler
启动的每个STA 线程上?
如果是这样,BlockingCollection
的底层实现会调用我的SynchronizationContext.Wait
方法吗?我可以使用SynchronizationContext.WaitHelper
来实现SynchronizationContext.Wait
吗?
已编辑,其中一些代码显示托管 STA 线程在执行阻塞等待时不会泵送。该代码是一个完整的控制台应用程序,可用于复制/粘贴/运行:
using System;
using System.Collections.Concurrent;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;
namespace ConsoleTestApp
class Program
// start and run an STA thread
static void RunStaThread(bool pump)
// test a blocking wait with BlockingCollection.Take
var tasks = new BlockingCollection<Task>();
var thread = new Thread(() =>
// Create a simple Win32 window
var hwndStatic = NativeMethods.CreateWindowEx(0, "Static", String.Empty, NativeMethods.WS_POPUP,
0, 0, 0, 0, IntPtr.Zero, IntPtr.Zero, IntPtr.Zero, IntPtr.Zero);
// subclass it with a custom WndProc
IntPtr prevWndProc = IntPtr.Zero;
var newWndProc = new NativeMethods.WndProc((hwnd, msg, wParam, lParam) =>
if (msg == NativeMethods.WM_TEST)
Console.WriteLine("WM_TEST processed");
return NativeMethods.CallWindowProc(prevWndProc, hwnd, msg, wParam, lParam);
);
prevWndProc = NativeMethods.SetWindowLong(hwndStatic, NativeMethods.GWL_WNDPROC, newWndProc);
if (prevWndProc == IntPtr.Zero)
throw new ApplicationException();
// post a test WM_TEST message to it
NativeMethods.PostMessage(hwndStatic, NativeMethods.WM_TEST, IntPtr.Zero, IntPtr.Zero);
// BlockingCollection blocks without pumping, NativeMethods.WM_TEST never arrives
try var task = tasks.Take();
catch (Exception e) Console.WriteLine(e.Message);
if (pump)
// NativeMethods.WM_TEST will arrive, because Win32 MessageBox pumps
Console.WriteLine("Now start pumping...");
NativeMethods.MessageBox(IntPtr.Zero, "Pumping messages, press OK to stop...", String.Empty, 0);
);
thread.SetApartmentState(ApartmentState.STA);
thread.Start();
Thread.Sleep(2000);
// this causes the STA thread to end
tasks.CompleteAdding();
thread.Join();
static void Main(string[] args)
Console.WriteLine("Testing without pumping...");
RunStaThread(false);
Console.WriteLine("\nTest with pumping...");
RunStaThread(true);
Console.WriteLine("Press Enter to exit");
Console.ReadLine();
// Interop
static class NativeMethods
[DllImport("user32")]
public static extern IntPtr SetWindowLong(IntPtr hwnd, int nIndex, WndProc newProc);
[DllImport("user32")]
public static extern IntPtr CallWindowProc(IntPtr lpPrevWndFunc, IntPtr hwnd, int msg, int wParam, int lParam);
[DllImport("user32.dll")]
public static extern IntPtr CreateWindowEx(int dwExStyle, string lpClassName, string lpWindowName, int dwStyle, int x, int y, int nWidth, int nHeight, IntPtr hWndParent, IntPtr hMenu, IntPtr hInstance, IntPtr lpParam);
[DllImport("user32.dll")]
public static extern bool PostMessage(IntPtr hwnd, uint msg, IntPtr wParam, IntPtr lParam);
[DllImport("user32.dll")]
public static extern int MessageBox(IntPtr hwnd, string text, String caption, int options);
public delegate IntPtr WndProc(IntPtr hwnd, int msg, int wParam, int lParam);
public const int GWL_WNDPROC = -4;
public const int WS_POPUP = unchecked((int)0x80000000);
public const int WM_USER = 0x0400;
public const int WM_TEST = WM_USER + 1;
这会产生输出:
无需泵送测试... 集合参数是空的,并且在添加方面已被标记为完整。 抽水测试... 集合参数是空的,并且在添加方面已被标记为完整。 现在开始抽... WM_TEST 已处理 按 Enter 退出【问题讨论】:
您引用的文字是指Task
和线程池的实现,而不是StaTaskScheduler
。托管线程做泵;见this 和this。所以我不知道你为什么会看到死锁。
@StephenCleary,我确实意识到这是关于底层 WaitHandle.Wait
而不是 TPL。我用一些示例代码编辑了这个问题,显示了 STA 线程不泵的情况。我的代码有错误吗?
@avo,在您描述的场景中,当 COM 对对象 B
进行进程外调用并等待它返回时,应该会发生泵送。在这种情况下,B
应该能够回拨A
。问题一定是别的。
此外,您发布的示例使用常规 Windows 消息,CoWaitForMultipleHandles
可能会忽略该消息。文档说:STA 中的默认值只是发送了一小部分特殊情况的消息(即,没有COWAIT_DISPATCH_WINDOW_MESSAGES
)。根据@StephenCleary 发布的link,CLR 确实将CoWaitForMultipleHandles
用于托管STA 线程,但可能没有COWAIT_DISPATCH_WINDOW_MESSAGES
。
@avo,你可能对this感兴趣。
【参考方案1】:
我对您的问题的理解:您使用StaTaskScheduler
只是为了为您的旧 COM 对象组织经典的 COM STA 单元。您没有在StaTaskScheduler
的 STA 线程上运行 WinForms 或 WPF 核心消息循环。也就是说,您没有在该线程中使用Application.Run
、Application.DoEvents
或Dispatcher.PushFrame
之类的东西。如果这是一个错误的假设,请纠正我。
StaTaskScheduler
本身不会在它创建的 STA 线程上安装任何同步上下文。因此,您依赖 CLR 为您发送消息。我只在 Chris Brumme 的Apartments and Pumping in the CLR 中发现了 CLR 在 STA 线程上泵送的隐含确认:
我一直在说,托管阻塞将执行“一些抽水”,当 在 STA 线程上调用。确切地知道什么不是很好吗 会被抽吗?不幸的是,抽水是一门黑色艺术, 超出凡人的理解。在 Win2000 及更高版本上,我们只需委托给 OLE32 的 CoWaitForMultipleHandles 服务。
这表明 CLR 在内部为 STA 线程使用 CoWaitForMultipleHandles
。此外,COWAIT_DISPATCH_WINDOW_MESSAGES
标志 mention this 的 MSDN 文档:
...在 STA 中只是发送的一小部分特殊情况的消息。
我做了some research on that,但无法使用CoWaitForMultipleHandles
从您的示例代码中抽取WM_TEST
,我们在cmets 中讨论了您的问题。我的理解是,上述一小部分特殊情况的消息 实际上仅限于 一些 COM 编组器特定的消息,并且不包括任何常规的通用消息,例如你的WM_TEST
。
所以,回答你的问题:
...我应该实现一个自定义同步上下文吗? 使用 CoWaitForMultipleHandles 显式泵送消息,并安装它 在 StaTaskScheduler 启动的每个 STA 线程上?
是的,我相信创建自定义同步上下文并覆盖 SynchronizationContext.Wait
确实是正确的解决方案。
但是,您应该避免使用CoWaitForMultipleHandles
,而改用MsgWaitForMultipleObjectsEx
。如果MsgWaitForMultipleObjectsEx
表示队列中有一条待处理的消息,您应该使用PeekMessage(PM_REMOVE)
和DispatchMessage
手动抽取它。然后您应该继续等待句柄,所有这些都在同一个 SynchronizationContext.Wait
调用中。
注意MsgWaitForMultipleObjectsEx
和MsgWaitForMultipleObjects
之间存在细微但重要的区别。如果队列中已经有消息(例如,PeekMessage(PM_NOREMOVE)
或 GetQueueStatus
),但没有被删除,则后者不会返回并继续阻塞。这对抽水不利,因为您的 COM 对象可能正在使用类似 PeekMessage
的东西来检查消息队列。这可能会导致 MsgWaitForMultipleObjects
在未预料到的情况下被阻止。
OTOH,带有MWMO_INPUTAVAILABLE
标志的MsgWaitForMultipleObjectsEx
没有这样的缺点,在这种情况下会返回。
不久前,我创建了一个自定义版本的StaTaskScheduler
(available here as ThreadAffinityTaskScheduler
) 以尝试解决different problem:为后续await
延续维护一个具有线程亲和性的线程池。如果您跨多个 awaits
使用 STA COM 对象,线程关联性是至关重要的。原始的StaTaskScheduler
仅在其池被限制为 1 个线程时才会表现出此行为。
所以我继续对您的WM_TEST
案例进行了更多试验。最初,我在 STA 线程上安装了标准 SynchronizationContext
类的实例。 WM_TEST
消息未按预期发送。
然后我覆盖了SynchronizationContext.Wait
,将其转发到SynchronizationContext.WaitHelper
。它确实被调用了,但仍然没有抽水。
最后,我实现了一个功能齐全的消息泵循环,这是它的核心部分:
// the core loop
var msg = new NativeMethods.MSG();
while (true)
// MsgWaitForMultipleObjectsEx with MWMO_INPUTAVAILABLE returns,
// even if there's a message already seen but not removed in the message queue
nativeResult = NativeMethods.MsgWaitForMultipleObjectsEx(
count, waitHandles,
(uint)remainingTimeout,
QS_MASK,
NativeMethods.MWMO_INPUTAVAILABLE);
if (IsNativeWaitSuccessful(count, nativeResult, out managedResult) || WaitHandle.WaitTimeout == managedResult)
return managedResult;
// there is a message, pump and dispatch it
if (NativeMethods.PeekMessage(out msg, IntPtr.Zero, 0, 0, NativeMethods.PM_REMOVE))
NativeMethods.TranslateMessage(ref msg);
NativeMethods.DispatchMessage(ref msg);
if (hasTimedOut())
return WaitHandle.WaitTimeout;
这确实有效,WM_TEST
被抽了。 以下是您的测试的改编版本:
public static async Task RunAsync()
using (var staThread = new Noseratio.ThreadAffinity.ThreadWithAffinityContext(staThread: true, pumpMessages: true))
Console.WriteLine("Initial thread #" + Thread.CurrentThread.ManagedThreadId);
await staThread.Run(async () =>
Console.WriteLine("On STA thread #" + Thread.CurrentThread.ManagedThreadId);
// create a simple Win32 window
IntPtr hwnd = CreateTestWindow();
// Post some WM_TEST messages
Console.WriteLine("Post some WM_TEST messages...");
NativeMethods.PostMessage(hwnd, NativeMethods.WM_TEST, new IntPtr(1), IntPtr.Zero);
NativeMethods.PostMessage(hwnd, NativeMethods.WM_TEST, new IntPtr(2), IntPtr.Zero);
NativeMethods.PostMessage(hwnd, NativeMethods.WM_TEST, new IntPtr(3), IntPtr.Zero);
Console.WriteLine("Press Enter to continue...");
await ReadLineAsync();
Console.WriteLine("After await, thread #" + Thread.CurrentThread.ManagedThreadId);
Console.WriteLine("Pending messages in the queue: " + (NativeMethods.GetQueueStatus(0x1FF) >> 16 != 0));
Console.WriteLine("Exiting STA thread #" + Thread.CurrentThread.ManagedThreadId);
, CancellationToken.None);
Console.WriteLine("Current thread #" + Thread.CurrentThread.ManagedThreadId);
输出:
初始线程 #9 在 STA 线程 #10 发布一些 WM_TEST 消息... 按 Enter 继续... WM_TEST 已处理:1 WM_TEST 已处理:2 WM_TEST 已处理:3 等待之后,线程#10 队列中的待处理消息:False 退出 STA 线程 #10 当前线程#12 按任意键退出请注意,此实现同时支持线程关联(它在await
之后保持在线程#10 上)和消息泵送。完整的源代码包含可重复使用的部分(ThreadAffinityTaskScheduler
和 ThreadWithAffinityContext
),可通过here as self-contained console app 获取。它尚未经过彻底测试,因此使用它需要您自担风险。
【讨论】:
有效!我将您的ThreadWithAffinityContext
保持不变,并按照您的示例中所示使用它。我在其中创建了我的 COM 对象 A
和 B
。它确实解决了问题,僵局已经过去了!非常感谢您在这方面所做的工作,事实证明它对我很有用,我认为其他人也会从中受益。
另外,你是对的,我没有在该后台 STA 线程上使用任何 .NET UI 控件或表单,它仅用于遗留 COM 对象和一些辅助 .NET 数据结构。
非常感谢 Noseratio!这是迄今为止我看到的第一种管理 COM 互操作的可靠方法,即不易颠覆整个编程模型的拳头。太棒了!【参考方案2】:
STA 线程泵的主题很大,很少有程序员能愉快地解决死锁。关于它的开创性论文由 .NET 的主要聪明人 Chris Brumme 撰写。您可以在this blog post 中找到它。不幸的是,它的细节相当短,他并没有超越注意到 CLR 进行了 bit 的抽水,但没有关于确切规则的任何细节。
他所说的代码是在 .NET 2.0 中添加的,存在于名为 MsgWaitHelper() 的内部 CLR 函数中。 .NET 2.0 的源代码可通过 SSCLI20 分发获得。非常完整,但不包括 MsgWaitHelper() 的源代码。很不寻常。反编译它是一个失败的原因,它非常大。
从他的博客文章中可以看出的一件事是重新进入的危险。抽入 STA 线程是危险的,因为它能够在您的程序未处于允许执行此类代码的正确状态时分派 Windows 消息并执行任意代码。大多数 VB6 程序员在使用 DoEvents() 在代码中获取模式循环以停止冻结 UI 时都知道这一点。我写了a post 关于它最典型的危险。 MsgWaitHelper() 会自己执行这种精确的抽水操作,但是它对它允许运行的确切 什么 类型的代码非常有选择性。
您可以通过在不附加调试器的情况下运行程序,然后附加非托管调试器来了解它在测试程序中的作用。你会看到它在 NtWaitForMultipleObjects() 上阻塞。我更进一步并在 PeekMessageW() 上设置了一个断点,以获取此堆栈跟踪:
user32.dll!PeekMessageW() Unknown
combase.dll!CCliModalLoop::MyPeekMessage(tagMSG * pMsg, HWND__ * hwnd, unsigned int min, unsigned int max, unsigned short wFlag) Line 2305 C++
combase.dll!CCliModalLoop::PeekRPCAndDDEMessage() Line 2008 C++
combase.dll!CCliModalLoop::FindMessage(unsigned long dwStatus) Line 2087 C++
combase.dll!CCliModalLoop::HandleWakeForMsg() Line 1707 C++
combase.dll!CCliModalLoop::BlockFn(void * * ahEvent, unsigned long cEvents, unsigned long * lpdwSignaled) Line 1645 C++
combase.dll!ClassicSTAThreadWaitForHandles(unsigned long dwFlags, unsigned long dwTimeout, unsigned long cHandles, void * * pHandles, unsigned long * pdwIndex) Line 46 C++
combase.dll!CoWaitForMultipleHandles(unsigned long dwFlags, unsigned long dwTimeout, unsigned long cHandles, void * * pHandles, unsigned long * lpdwindex) Line 120 C++
clr.dll!MsgWaitHelper(int,void * *,int,unsigned long,int) Unknown
clr.dll!Thread::DoAppropriateWaitWorker(int,void * *,int,unsigned long,enum WaitMode) Unknown
clr.dll!Thread::DoAppropriateWait(int,void * *,int,unsigned long,enum WaitMode,struct PendingSync *) Unknown
clr.dll!CLREventBase::WaitEx(unsigned long,enum WaitMode,struct PendingSync *) Unknown
clr.dll!CLREventBase::Wait(unsigned long,int,struct PendingSync *) Unknown
clr.dll!Thread::Block(int,struct PendingSync *) Unknown
clr.dll!SyncBlock::Wait(int,int) Unknown
clr.dll!ObjectNative::WaitTimeout(bool,int,class Object *) Unknown
请注意,我在 Windows 8.1 上记录了此堆栈跟踪,它在旧 Windows 版本上看起来会大不相同。 COM 模态循环在 Windows 8 中进行了大量修改,这对 WinRT 程序来说也是一个非常重要的问题。不太了解它,但它似乎有另一个名为 ASTA 的 STA 线程模型,它执行更严格的泵送,包含在添加的 CoWaitForMultipleObjects() 中
ObjectNative::WaitTimeout() 是 BlockingCollection.Take() 方法中的 SemaphoreSlim.Wait() 开始执行 CLR 代码的地方。您会看到它遍历内部 CLR 代码的各个级别以到达神秘的 MsgWaitHelper() 函数,然后切换到臭名昭著的 COM 模态调度程序循环。
它在您的程序中执行“错误”类型的泵送的 bat 信号标志是对 CliModalLoop::PeekRPCAndDDEMessage() 方法的调用。换句话说,它仅考虑发布到特定内部窗口的互操作消息类型,该窗口调度跨越公寓边界的 COM 调用。它不会为您自己的窗口抽取消息队列中的消息。
这是可以理解的行为,只有当 Windows 看到您的 UI 线程空闲时,它才能绝对确定重入不会杀死您的程序。当它自己泵送消息循环时它是空闲的,对 PeekMessage() 或 GetMessage() 的调用指示该状态。问题是,你不会自己抽水。您违反了 STA 线程的核心契约,它必须 泵送消息循环。因此,希望 COM 模态循环为您进行抽水是空想。
您实际上可以解决此问题,即使我不建议您这样做。 CLR 将把它留给应用程序本身,由正确构造的 SynchronizationContext.Current 对象执行等待。您可以通过派生自己的类并覆盖 Wait() 方法来创建一个。调用 SetWaitNotificationRequired() 方法来让 CLR 相信它应该由您决定。演示该方法的不完整版本:
class MySynchronizationProvider : System.Threading.SynchronizationContext
public MySynchronizationProvider()
base.SetWaitNotificationRequired();
public override int Wait(IntPtr[] waitHandles, bool waitAll, int millisecondsTimeout)
for (; ; )
int result = MsgWaitForMultipleObjects(waitHandles.Length, waitHandles, waitAll, millisecondsTimeout, 8);
if (result == waitHandles.Length) System.Windows.Forms.Application.DoEvents();
else return result;
[DllImport("user32.dll")]
private static extern int MsgWaitForMultipleObjects(int cnt, IntPtr[] waitHandles, bool waitAll,
int millisecondTimeout, int mask);
并将其安装在线程的开头:
System.ComponentModel.AsyncOperationManager.SynchronizationContext =
new MySynchronizationProvider();
您现在将看到您的 WM_TEST 消息正在发送。它是对调度它的 Application.DoEvents() 的调用。我可以通过使用 PeekMessage + DispatchMessage 来掩盖它,但这会混淆这段代码的危险,最好不要将 DoEvents() 放在桌子下面。你真的在这里玩了一个非常危险的重入游戏。请勿使用此代码。
长话短说,正确使用 StaThreadScheduler 的唯一希望是在已经实现 STA 合约的代码中使用它,并且像 STA 线程一样进行抽水。它实际上是作为旧代码的创可贴,您不必奢侈地控制线程状态。就像在 VB6 程序或 Office 加载项中启动的任何代码一样。尝试了一下,我不认为它实际上可以工作。同样值得注意的是,随着 asych/await 的可用性,应该完全消除对它的需求。
【讨论】:
感谢您的精彩解释 感谢您详细解释幕后发生的事情。它让我清楚地了解了这个问题。您发布的Wait
版本确实吸引了我的WM_TEST
消息。不幸的是,它并没有解决原来的死锁问题。正如其他答案所暗示的那样,我将尝试MsgWaitForMultipleObjectsEx
与MsgWaitForMultipleObjects
。
@HansPassant,我很想知道DoEvents
与仅专用于 COM 对象的后台 STA 线程有什么关系,它与主 UI 线程是分开的。 OP 甚至没有指定他是使用 WPF 还是 WinForms,FWIW。我强烈避免在不创建任何WinForms
组件的线程上调用DoEvents
。
我并不是想用我的 p/invoke CreateWindow
示例混淆任何人,如果我这样做了,对不起。这只是为了表明StaTaskScheduler
不会抽水,我想不出更好的方法。在问题的第一部分,我只使用 COM 对象。但是他们可能在里面使用了一些Win32窗口,我没有他们的来源。
@avo,是否使用DoEvents
进行抽水取决于您,但请注意this serious bug 会影响其行为。还要记住 MsgWaitForMultipleObjectsEx
与 MsgWaitForMultipleObjects
语义,用于观察队列中的现有消息。以上是关于StaTaskScheduler 和 STA 线程消息泵送的主要内容,如果未能解决你的问题,请参考以下文章
如何在 STA 线程上与引导程序同时运行动画 SplashScreen