在 Windows 中的另一个线程上异步启动和取消 I/O 的无竞争方式

Posted

技术标签:

【中文标题】在 Windows 中的另一个线程上异步启动和取消 I/O 的无竞争方式【英文标题】:Race-free way to asynchronously start AND cancel I/O on another thread in Windows 【发布时间】:2022-01-05 03:07:26 【问题描述】:

背景:一般来说,如果我们想强制一个操作异步发生(避免阻塞主线程),使用FILE_FLAG_OVERLAPPED是不够的,因为操作仍然可以同步完成。

假设为了避免这种情况,我们将操作推迟到专用于 I/O 的工作线程。这样可以避免阻塞主线程。

现在主线程可以使用CancelIoEx(HANDLE, LPOVERLAPPED)取消worker发起的I/O操作(比如通过ReadFile)。

但是,要让CancelIoEx 成功,主线程需要一种方法来保证操作实际上已经开始,否则没有什么可以取消的。

这里明显的解决方案是让工作线程设置一个事件之后它调用例如ReadFile 返回,但现在又把我们带回到原来的问题:由于 ReadFile 可以阻塞,我们将首先破坏拥有工作线程的全部目的,即确保主线程是'没有在 I/O 上阻塞。

解决这个问题的“正确”方法是什么? 有没有一种好方法可以实际强制 I/O 操作异步发生,同时仍然能够在以后的无竞争中请求取消它I/O 尚未完成时的方式?

我唯一能想到的就是设置一个计时器,以便在 I/O 尚未完成时定期调用 CancelIoEx,但这看起来非常难看。有没有更好/更强大的解决方案?

【问题讨论】:

needs a way to guarantee that the operation has in fact started, otherwise there is nothing to cancel. 那么问题出在哪里呢?主线程的效果完全一样,如果操作还没有开始,工作线程可以忽略取消。 这看起来真的是一个 XY 问题。取消已安排好的 IO 旨在解决什么问题?我首先会认真质疑这种设计。 “好的,我必须做 X。哦,等等,我改变了主意!取消 X!!!” 如果你的回答是。 “嗯,能够取消 X 很重要”,如果 X 已经完成,你打算如何撤消它?如果能够取消操作是“关键”,那么弄清楚如何撤消 IO 操作也是“关键”。如果能够撤消它并不重要,那么能够取消它并不重要。 不应该是主线程调用CancelIo(),应该由工作线程来完成。定义一个您在主线程中发出信号的事件,以告诉工作线程停止做它正在做的任何事情。 Worker 中的 WaitForMultipleObjects 来检查该事件,以及重叠 I/O 的完成情况。 “我们想要强制一个操作异步发生” - 这听起来很奇怪。您试图通过禁止操作立即退出来解决什么具体问题? @0___________:问题是稍后有一些事情要取消,即您太早地致电CancelIoEx。问题是如何避免这种竞争条件。 【参考方案1】:

你通常需要下一步:

用于异步 I/O 的每个文件句柄都封装到 一些 c/c++ 对象(命名为IO_OBJECT

这个对象需要有引用计数

在开始异步 I/O 操作之前 - 你需要分配另一个 对象,它封装了OVERLAPPEDIO_STATUS_BLOCK(让名称 它IO_IRP) 在IO_IRP 中存储指向IO_OBJECT 的引用指针 和特定的 io 信息 - I/O 代码(读、写等)缓冲区 指针,..

检查 I/O 操作的返回码是否确定,将是 I/O 回调(数据包排队到 iocp 或 apc)或如果操作失败(将 没有回调) - 仅使用错误代码自行调用回调

I/O 管理器保存您在 IRP 结构中传递给 I/O 的指针 (UserApcContext) 和 当 I/O 完成时将它传回给你(如果使用 win32 api 这个指针 在本机 api 的情况下指向 OVERLAPPED 的相等指针 - 您可以通过 自控this指针)

当 I/O 完成 ID(如果不是同步开始时失败)- 回调 最终的 I/O 状态将被调用

这里你得到了指向IO_IRPOVERLAPPED)的指针——调用方法 IO_OBJECT并释放引用,删除IO_IRP

如果你在某个时候可以提前关闭对象句柄(不是在 析构函数) - 实现一些损坏保护,以防止访问 关闭后处理

run-down protection 非常类似于弱引用,不幸的是没有 用户模式 ​​api,但并不难自己实现

从任何线程,如果您有指向对象的指针(当然是引用),您可以调用 CancelIoEx 或关闭对象句柄 - 如果文件有 IOCP,当文件的最后一个句柄关闭时 - 所有 I/O 操作将被取消。但是对于关闭 - 您无需直接调用 CloseHandle 而是开始运行并在运行完成时调用 CloseHandle(在一些 ReleaseRundownProtection 调用中(这是演示名称,没有这样的 api)

一些最小的典型实现:

class __declspec(novtable) IO_OBJECT 

    friend class IO_IRP;

    virtual void IOCompletionRoutine(
        ULONG IoCode, 
        ULONG dwErrorCode, 
        ULONG dwNumberOfBytesTransfered, 
        PVOID Pointer) = 0;
    
    void AddRef();
    void Release();

    HANDLE _hFile = 0;
    LONG _nRef = 1;
    //...
;


class IO_IRP : public OVERLAPPED 

    IO_OBJECT* _pObj;
    PVOID Pointer;
    ULONG _IoCode;
    
    IO_IRP(IO_OBJECT* pObj, ULONG IoCode, PVOID Pointer) : 
        _pObj(pObj), _IoCode(IoCode), Pointer(Pointer)
    
        pObj->AddRef();
    
    
    ~IO_IRP()
    
        _pObj->Release();
    
    
    VOID CALLBACK IOCompletionRoutine(
        ULONG dwErrorCode,
        ULONG dwNumberOfBytesTransfered,
        )
    
        _pObj->IOCompletionRoutine(_IoCode, 
            dwErrorCode, dwNumberOfBytesTransfered, Pointer);

        delete this;
    

    static VOID CALLBACK FileIOCompletionRoutine(
        ULONG status,
        ULONG dwNumberOfBytesTransfered,
        LPOVERLAPPED lpOverlapped
        )
    
        static_cast<IO_IRP*>(lpOverlapped)->IOCompletionRoutine(
            RtlNtStatusToDosError(status), dwNumberOfBytesTransfered);
    

    static BOOL BindIoCompletion(HANDLE hObject)
    
        return BindIoCompletionCallback(hObject, FileIOCompletionRoutine, 0));
    
    
    void CheckErrorCode(ULONG dwErrorCode)
    
        switch (dwErrorCode)
        
        case NOERROR:
        case ERROR_IO_PENDING:
            return ;
        
        IOCompletionRoutine(dwErrorCode, 0);
    
    
    void CheckError(BOOL fOk)
    
        return CheckErrorCode(fOk ? NOERROR : GetLastError());
    
;


///// start some I/O // no run-downprotection on file

if (IO_IRP* irp = new IO_IRP(this, 'some', 0))

    irp->CheckErrorCode(ReadFile(_hFile, buf, cb, 0, irp));


///// start some I/O // with run-downprotection on file

if (IO_IRP* irp = new IO_IRP(this, 'some', 0))

    ULONG dwError = ERROR_INVALID_HANDLE;
    
    if (AcquireRundownProtection())
    
        dwError = ReadFile(_hFile, buf, cb, 0, irp) ? NOERROR : GetLastError();
        ReleaseRundownProtection();
    
    
    irp->CheckErrorCode(dwError);

some 更完整的实现


但是,要使CancelIoEx 成功,主线程需要一种方法来 保证操作实际上已经开始,否则有 没有什么可取消的。

是的,尽管您可以随时安全地调用 CancelIoEx,即使文件上没有活动的 I/O,事实上,在您调用 CancelIoEx 之后,另一个线程已经可以开始新的 I/O 操作。通过此调用,您可以取消当前已知的单一启动操作。例如 - 您开始连接 ConnectEx 并更新 UI(启用 Cancel 按钮)。当ConnectEx 完成时 - 您将消息发布到 UI(禁用 Cancel 按钮)。如果用户按下 Cancel 直到 I/O (ConnectEx) ative - 你调用 CancelIoEx - 结果连接将被取消或正常提前完成。以防周期性操作(例如循环中的ReadFile) - 通常CancelIoEx 不是停止此类循环的正确方法。相反,您需要从控制线程调用CloseHandle - 这有效地取消了文件中的所有当前 I/O。


关于ReadFile 和任何异步 I/O api 是如何工作的,我们是否可以强制从 api 调用中更快地返回。

    I/O 管理器检查输入参数,转换句柄(文件句柄到 FILE_OBJECT) 指向指针,检查权限等,如果出现错误 此阶段 - 为调用者返回错误并且 I/O 完成 I/O 管理器调用驱动程序。司机(或几个司机 - ***司机可以 将请求传递给另一个)处理 I/O 请求(IRP),最后 返回 I/O 管理器。它可以返回或STATUS_PENDING,这意味着 I/O 仍未完成或完成 I/O(调用 IofCompleteRequest) 并返回另一个状态。其他任何状态 比STATUS_PENDING 表示 I/O 已完成(成功,错误 或已取消,但已完成) I/O mahager 检查STATUS_PENDING 以及文件是否打开 同步 I/O(标志 FO_SYNCHRONOUS_IO )开始等待, 直到 I/O 完成。如果为异步 I/O 打开文件 - I/O 经理自己从不等待并返回呼叫者的状态,包括 STATUS_PENDING

我们可以通过调用CancelSynchronousIo 来打破阶段3的等待。但是如果在 2 阶段等待在驱动程序内部 - 不可能以任何方式打破这种等待。任何Cancel*Io*CloseHandle 都无济于事。如果我们使用异步文件句柄 - I/O 管理器永远不会在 3 中等待,如果 api 调用等待 - 它会在 2(驱动程序处理程序)中等待,我们无法中断等待.

作为结果 - 我们不能强制异步文件的 I/O 调用更快地返回。如果司机在某些情况下会等待。

还有更多 - 为什么我们不能中断驱动程序等待,但可以停止 I/O 管理器等待。因为未知 - 如何,在哪个对象(或只是睡眠)上,驱动程序等待哪个条件。如果我们在条件满足之前中断线程等待会发生什么......所以如果驱动程序等待 - 它将等待。如果 I/O 管理器 - 他等待 IRP 完成。并打破这个等待 - 需要完整的 IRP。对于此存在的 api,它将 IRP 标记为已取消并调用驱动程序回调(驱动程序必须设置此回调,以防它在完成请求之前返回)。此回调中的驱动程序完成 IRP,这是从等待中唤醒 I/O 管理器(再次它只等待同步文件)并返回给调用者

也很重要,不要混淆 - I/O 结束和 api 调用结束。如果是同步文件 - 这是相同的。 api 仅在 I/O 完成后返回。但对于异步 I/O,这是不同的事情 - I/O 在 api 调用返回后仍然可以处于活动状态(如果它返回 STATUS_PENDINGERROR_IO_PENDING 用于 win32 层)。

我们可以通过取消来要求 I/O 提前完成。通常(如果驱动程序设计良好)这项工作。但是我们不能要求 api 调用提前返回以防异步 I/O 文件。我们无法控制 I/O 调用(具体情况下为ReadFile)何时、多快返回。但可以提前取消 I/O 请求 I/O 调用 (ReadFile) return 之后。更准确地说是在驱动程序从 2 返回之后,并且因为 I/O 管理器从不等待 3 - 可以说 I/O 调用在驱动程序返回控制之后返回。


如果一个线程使用文件句柄,而另一个线程可以关闭它,没有任何同步 - 这当然会导致 raice 和错误。在最好的情况下,ERROR_INVALID_HANDLE 可以在另一个线程关闭句柄之后从 api 调用返回。在最坏的情况下 - 关闭后可以重用句柄,我们开始使用错误的句柄,结果未定义。为了避免这种情况,只需要在破旧保护内使用句柄(类似于 convert 对强的弱引用)。 演示实现:

class IoObject

    HANDLE _hFile = INVALID_HANDLE_VALUE;
    LONG _lock = 0x80000000;

public:
    HANDLE LockHandle() 
    
        LONG Value, PreviousValue;

        if (0 > (Value = _lock))
        
            do 
            
                PreviousValue = InterlockedCompareExchangeNoFence(&_lock, Value + 1, Value);

                if (PreviousValue == Value) return _hFile;

             while (0 > (Value = PreviousValue));
        
    
        return 0;
    

    void UnlockHandle()
    
        if (InterlockedDecrement(&_lock) == 0)
        
            _hFile = 0; // CloseHandle(_hFile)
        
    

    void Close()
    
        if (LockHandle())
        
            _interlockedbittestandreset(&_lock, 31);
            UnlockHandle();
        
    

    void WrongClose()
    
        _hFile = 0; // CloseHandle(_hFile)
    

    BOOL IsHandleClosed()
    
        return _hFile == 0;
    
;

ULONG WINAPI WorkThread(IoObject* pObj)

    ULONG t = GetTickCount();
    int i = 0x1000000;
    do 
    
        if (HANDLE hFile = pObj->LockHandle())
        
            SwitchToThread(); // simulate delay

            if (pObj->IsHandleClosed())
            
                __debugbreak();
            

            pObj->UnlockHandle();
        
        else
        
            DbgPrint("[%x]: handle closed ! (%u ms)\n", GetCurrentThreadId(), GetTickCount() - t);
            break;
        
     while (--i);

    return 0;


ULONG WINAPI WorkThreadWrong(IoObject* pObj)

    ULONG t = GetTickCount();
    int i = 0x10000000;
    do 
    
        if (pObj->IsHandleClosed())
        
            DbgPrint("[%x]: handle closed ! (%u ms)\n", GetCurrentThreadId(), GetTickCount() - t);
            break;
        
        
        SwitchToThread(); // simulate delay

        if (pObj->IsHandleClosed())
        
            __debugbreak();
        

     while (--i);

    return 0;


void CloseTest()

    IoObject obj;

    ULONG n = 8;
    do 
    
        if (HANDLE hThread = CreateThread(0, 0x1000, (PTHREAD_START_ROUTINE)WorkThread, &obj, 0, 0))
        
            CloseHandle(hThread);
        
     while (--n);

    Sleep(50);
//#define _WRONG_
#ifdef _WRONG_
    obj.WrongClose();
#else
    obj.Close();
#endif
    MessageBoxW(0,0,0,0);

通过WrongClose(); 调用,我们将在WorkThread[Wrong] 中永久捕获__debugbreak()(关闭后使用)。但是对于obj.Close();WorkThread,我们绝不能捕获异常。还要注意 Close()lock-free 并且它的调用者永远不会等待/挂起,即使在 rundown-protection 中的 api 调用会等待。

【讨论】:

“相反,您需要从控制线程调用 CloseHandle ——这有效地取消了所有当前文件上的 I/O。” 我确实考虑过这个,但这引入了另一个比赛条件:处理回收。该线程现在将在句柄上调用ReadFile,该句柄可能最终完全指向其他东西,从而导致损坏。这比我们试图预防的原始问题要糟糕得多。 @user541686 - 但我在回答中确切地写了这个 - 您必须仅在破旧保护内访问句柄。看看我的代码的结尾 - 这里是如何使用的。可能的用户模式implementation @user541686 如果我们对某些资源使用 rundown-protection(具体情况下的句柄),在我们获取它之后 - AcquireRundownProtection() 返回 true,我们保证句柄不会关闭,直到调用ReleaseRundownProtection()。从控制线程我们可以随时开始运行。在此之后 - 对 AcquireRundownProtection() 的任何调用都返回 false,但只有在最后一次调用 ReleaseRundownProtection() 后才会关闭句柄。非常接近 C++ 中的弱引用(如果它离你更近的话),除非你仍然将弱转换为强直到存在强引用。 @user541686 可以这么说 - 我们在对象 (IO_OBJECT) 中有逻辑弱引用来处理和在 I/O 调用中使用句柄之前 - 将弱引用转换为强引用而不是释放强引用.控制线程有强引用,当想要停止时 - 释放强引用。破败只添加标志 - 防止在开始破败后将弱转换为强,甚至仍然存在一些强 internal weak_ptr based on _Incref_nz() run-down 使用几乎这个(1位用作标志)

以上是关于在 Windows 中的另一个线程上异步启动和取消 I/O 的无竞争方式的主要内容,如果未能解决你的问题,请参考以下文章

CompletableFuture 异步超时 和取消

Android Lollipop 中的另一个通知正在取消通知

C ++从异步线程更新Windows窗口

套接字关闭时取消阻止 recvfrom

如何在 Windows 上从 C 中的另一个程序启动一个独立程序(在单独的控制台窗口中)?

为啥任务取消发生在调用者线程上?