NamedPipeServerStream.ReadAsync() 在 CancellationToken 请求取消时不退出

Posted

技术标签:

【中文标题】NamedPipeServerStream.ReadAsync() 在 CancellationToken 请求取消时不退出【英文标题】:NamedPipeServerStream.ReadAsync() does not exit when CancellationToken requests cancellation 【发布时间】:2019-03-09 00:04:53 【问题描述】:

当 NamedPipeServer 流从管道读取任何数据时,它不会对CancellationTokenSource.Cancel()做出反应

这是为什么呢?

如何限制我在服务器中等待来自客户端的数据的时间?

要重现的代码:

static void Main(string[] args)

    Server();
    Clinet();
    Console.WriteLine("press [enter] to exit");
    Console.ReadLine();


private static async Task Server()

    using (var cancellationTokenSource = new CancellationTokenSource(1000))
    using (var server = new NamedPipeServerStream("test",
        PipeDirection.InOut,
        1,
        PipeTransmissionMode.Byte,
        PipeOptions.Asynchronous))
    
        var cancellationToken = cancellationTokenSource.Token;
        await server.WaitForConnectionAsync(cancellationToken);
        await server.WriteAsync(new byte[]1,2,3,4, 0, 4, cancellationToken);
        var buffer = new byte[4];
        await server.ReadAsync(buffer, 0, 4, cancellationToken);
        Console.WriteLine("exit server");
    


private static async Task Clinet()

    using (var client = new NamedPipeClientStream(".", "test", PipeDirection.InOut, PipeOptions.Asynchronous))
    
        var buffer = new byte[4];
        client.Connect();
        client.Read(buffer, 0, 4);
        await Task.Delay(5000);
        await client.WriteAsync(new byte[] 1, 2, 3, 4, 0, 4);
        Console.WriteLine("client exit");
    

预期结果:

exit server
<client throws exception cuz server closed pipe>

实际结果:

client exit
exit server

编辑

CancelIo 的答案似乎很有希望,它确实允许服务器在取消令牌被取消时结束通信。 但是,我不明白为什么我的“基本场景”在使用ReadPipeAsync 时停止工作。

这里是代码,它包括 2 个客户端函数:

    Clinet_ShouldWorkFine - 一个读/写及时的好客户端 Clinet_ServerShouldEndCommunication_CuzClientIsSlow - 客户端太慢,服务器应该结束通信

预期:

    Clinet_ShouldWorkFine - 执行结束,没有任何异常 Clinet_ServerShouldEndCommunication_CuzClientIsSlow - 服务器关闭管道,客户端抛出异常

实际:

    Clinet_ShouldWorkFine - 服务器在第一次调用 ReadPipeAsync 时停止,管道在 1 秒后关闭,客户端抛出异常 Clinet_ServerShouldEndCommunication_CuzClientIsSlow - 服务器关闭管道,客户端抛出异常

为什么服务器使用ReadPipeAsyncClinet_ShouldWorkFine不起作用

class Program

    static void Main(string[] args) 
        // in this case server should close the pipe cuz client is too slow
        try 
            var tasks = new Task[3];
            tasks[0] = Server();
            tasks[1] = tasks[0].ContinueWith(c => 
                Console.WriteLine($"Server exited, cancelled=c.IsCanceled");
            );
            tasks[2] = Clinet_ServerShouldEndCommunication_CuzClientIsSlow();
            Task.WhenAll(tasks).Wait();
        
        catch (Exception ex) 
            Console.WriteLine(ex);
        

        // in this case server should exchange data with client fine
        try 
            var tasks = new Task[3];
            tasks[0] = Server();
            tasks[1] = tasks[0].ContinueWith(c => 
                Console.WriteLine($"Server exited, cancelled=c.IsCanceled");
            );
            tasks[2] = Clinet_ShouldWorkFine();
            Task.WhenAll(tasks).Wait();
        
        catch (Exception ex) 
            Console.WriteLine(ex);
        

        Console.WriteLine("press [enter] to exit");
        Console.ReadLine();
    

    private static async Task Server()
    
        using (var cancellationTokenSource = new CancellationTokenSource(1000))
        using (var server = new NamedPipeServerStream("test",
            PipeDirection.InOut,
            1,
            PipeTransmissionMode.Byte,
            PipeOptions.Asynchronous))
        
            var cancellationToken = cancellationTokenSource.Token;
            await server.WaitForConnectionAsync(cancellationToken);
            await server.WriteAsync(new byte[]1,2,3,4, 0, 4, cancellationToken);
            await server.WriteAsync(new byte[]1,2,3,4, 0, 4, cancellationToken);
            var buffer = new byte[4];
            var bytes = await server.ReadPipeAsync(buffer, 0, 4, cancellationToken);
            var bytes2 = await server.ReadPipeAsync(buffer, 0, 4, cancellationToken);
            Console.WriteLine("exit server");
        
    

    private static async Task Clinet_ShouldWorkFine()
    
        using (var client = new NamedPipeClientStream(".", "test", PipeDirection.InOut, PipeOptions.Asynchronous))
        
            var buffer = new byte[4];
            client.Connect();
            client.Read(buffer, 0, 4);
            client.Read(buffer, 0, 4);
            await client.WriteAsync(new byte[] 1, 2, 3, 4, 0, 4);
            await client.WriteAsync(new byte[] 1, 2, 3, 4, 0, 4);
            Console.WriteLine("client exit");
        
    

    private static async Task Clinet_ServerShouldEndCommunication_CuzClientIsSlow()
    
        using (var client = new NamedPipeClientStream(".", "test", PipeDirection.InOut, PipeOptions.Asynchronous))
        
            var buffer = new byte[4];
            client.Connect();
            client.Read(buffer, 0, 4);
            client.Read(buffer, 0, 4);
            await Task.Delay(5000);
            await client.WriteAsync(new byte[] 1, 2, 3, 4, 0, 4);
            await client.WriteAsync(new byte[] 1, 2, 3, 4, 0, 4);
            Console.WriteLine("client exit");
        
    


public static class AsyncPipeFixer 

    public static Task<int> ReadPipeAsync(this PipeStream pipe, byte[] buffer, int offset, int count, CancellationToken cancellationToken) 
        if (cancellationToken.IsCancellationRequested) return Task.FromCanceled<int>(cancellationToken);
        var registration = cancellationToken.Register(() => CancelPipeIo(pipe));
        var async = pipe.BeginRead(buffer, offset, count, null, null);
        return new Task<int>(() => 
            try  return pipe.EndRead(async); 
            finally  registration.Dispose(); 
        , cancellationToken);
    

    private static void CancelPipeIo(PipeStream pipe) 
        // Note: no PipeStream.IsDisposed, we'll have to swallow
        try 
            CancelIo(pipe.SafePipeHandle);
        
        catch (ObjectDisposedException)  
    
    [DllImport("kernel32.dll")]
    private static extern bool CancelIo(SafePipeHandle handle);


【问题讨论】:

@MrinalKamboj 我正在使用new CancellationTokenSource(1000),它在指定的时间过去后调用.Cancel() - 在这种情况下是在1000ms之后 @MrinalKamboj 我应该在哪里添加Task.Delay(1000)?对不起,我不明白。附注:上面的代码准确地展示了我遇到这个问题的真实情况。我从 C# 运行一个 python 脚本并通过管道与它交谈。我不能在这里或那里添加延迟,因为我知道 C# 完全卡在ReadAsync() 【参考方案1】:

.NET 程序员在编写像这样的小测试程序时会遇到 async/await 的可怕麻烦。它的构图很差,一路上都是乌龟。这个程序缺少最后的海龟,任务陷入僵局。没有人关心让任务继续执行,就像在(比如说)GUI 应用程序中通常发生的那样。调试起来也非常困难。

首先做一个小改动,让死锁完全可见:

   int bytes = await server.ReadPipeAsync(buffer, 0, 4, cancellationTokenSource.Token);

这需要一个令人讨厌的小角落案例,Server 方法一直到“Server exited”消息。 Task 类的一个长期问题是,当任务完成或等待的方法同步完成时,它将尝试直接运行延续。这恰好在这个程序中起作用。通过强制它获取异步结果,死锁现在很明显。


下一步是修复 Main(),这样这些任务就不会再死锁了。可能是这样的:

static void Main(string[] args) 
    try 
        var tasks = new Task[3];
        tasks[0] = Server();
        tasks[1] = tasks[0].ContinueWith(c => 
            Console.WriteLine($"Server exited, cancelled=c.IsCanceled");
        );
        tasks[2] = Clinet();
        Task.WhenAll(tasks).Wait();
    
    catch (Exception ex) 
        Console.WriteLine(ex);
    
    Console.WriteLine("press [enter] to exit");
    Console.ReadLine();


现在我们有机会提前解决取消问题。 NamedPipeServerStream 类本身不实现 ReadAsync,它从其基类之一 Stream 继承方法。它有一个糟糕的小细节,完全没有记录。只有盯着framework source code 才能看到。它只能在您调用 ReadAsync()之前发生取消时检测到取消。一旦开始读取,它就再也看不到取消了。您要解决的终极问题。

这是一个可以解决的问题,但我有一个模糊的想法,为什么微软没有为 PipeStreams 这样做。强制 BeginRead() 方法提前完成的正常方法是 Dispose() 对象,这也是 Stream.ReadAsync() 可以中断的唯一方法。但还有另一种方法,在 Windows 上可以使用CancelIo() 中断 I/O 操作。让我们把它变成一个扩展方法:

using System;
using System.Threading.Tasks;
using System.Runtime.InteropServices;
using System.IO.Pipes;
using Microsoft.Win32.SafeHandles;

public static class AsyncPipeFixer 

    public static Task<int> ReadPipeAsync(this PipeStream pipe, byte[] buffer, int offset, int count, CancellationToken cancellationToken) 
        if (cancellationToken.IsCancellationRequested) return Task.FromCanceled<int>(cancellationToken);
        var registration = cancellationToken.Register(() => CancelPipeIo(pipe));
        var async = pipe.BeginRead(buffer, offset, count, null, null);
        return new Task<int>(() => 
            try  return pipe.EndRead(async); 
            finally  registration.Dispose(); 
        , cancellationToken);
    

    private static void CancelPipeIo(PipeStream pipe) 
        // Note: no PipeStream.IsDisposed, we'll have to swallow
        try 
            CancelIo(pipe.SafePipeHandle);
        
        catch (ObjectDisposedException)  
    
    [DllImport("kernel32.dll")]
    private static extern bool CancelIo(SafePipeHandle handle);


最后调整服务器以使用它:

    int bytes = await server.ReadPipeAsync(buffer, 0, 4, cancellationTokenSource.Token);

请注意,此解决方法特定于 Windows,因此无法在面向 Unix 风格的 .NETCore 程序中运行。然后考虑更重的锤子,在 CancelPipeIo() 方法中调用 pipe.Close()。

【讨论】:

这真的很有趣。我现在明白为什么服务器是deaf 来取消消息了。但是我不知道为什么在我的Happy path 中使用ReadPipeAsync 不起作用。在我的原始帖子中添加了代码以演示“好”客户端的问题 请更清楚“不起作用”和“快乐的道路”是什么意思。正如所写的客户端不应该工作,你应该得到“管道坏了”异常消息。 我在原始帖子中添加了一个编辑。它包括 2 个“客户”。 1.Clinet_ServerShouldEndCommunication_CuzClientIsSlow()是一个慢客户端的表示,它有一个Task.Delay(5000),在这种情况下服务器应该结束通信,因为客户端太慢了。 2.Clinet_ShouldWorkFine() 代表HappyPath 即。作为一个表现良好的客户端,服务器应该能够与Clinet_ShouldWorkFine() 交换数据而不会出现任何错误/异常。 这里应该有AsyncCallback 而不是null?我看到的几乎所有类似的命名管道代码都有回调。只是想知道为什么这里省略了它? 我试过了,正如上面OP修改过的问题一样,发现它对我不起作用。什么工作,是用Task.Run 替换ReadPipeAsync 中的new Task&lt;int&gt;。但我并没有假装对此了解得足够多,以知道我可能会造成什么其他混乱!【参考方案2】:

ReadAsync 首先检查是否取消,如果令牌取消则开始读取它没有效果

添加以下行

cancellationToken.Register(server.Disconnect);

using (var cancellationTokenSource = new CancellationTokenSource(1000))
using (var server = new NamedPipeServerStream("test",
    PipeDirection.InOut,
    1,
    PipeTransmissionMode.Byte,
    PipeOptions.Asynchronous))

    var cancellationToken = cancellationTokenSource.Token;
    cancellationToken.Register(server.Disconnect);
    await server.WaitForConnectionAsync(cancellationToken);
    await server.WriteAsync(new byte[]1,2,3,4, 0, 4, cancellationToken);
    var buffer = new byte[4];
    await server.ReadAsync(buffer, 0, 4, cancellationToken);
    Console.WriteLine("exit server");

【讨论】:

Sever.Disconnect 如何在此处帮助取消调用。 OP 正在寻找没有发生的异常 如果服务器的管道关闭,Clinet() 方法将抛出异常。正如预期的结果。 真正的异常应该来自服务器,客户端不应该制造异常,基于像断开连接这样的简单事件 @Milad,谢谢。我实际上目前正在使用cancellationToken.Register(server.Disconnect); 作为解决方法,它确实处理了我正确发布的初始案例。但是,当客户端不从管道读取数据时,它不起作用。在这种情况下,服务器等待WriteAsync,当server.Disconnect 发生时WriteAsync 抛出PipeClosedException 或类似情况。【参考方案3】:

我只是在看你的代码,也许还有一双新的眼睛……

据我所知,在您的原始场景和更复杂的场景中......您正在传递一个已经取消的取消令牌,这是非常不可预测的,其他人如何实现(如果有)在方法中抛出的异常...... .

使用IsCancellationRequested 属性检查令牌是否已被取消并且不要传递已取消的令牌。

这里是从原始问题将其添加到您的代码中的示例(您可以为以后的 ReadPipeAsync 方法执行相同的操作。

var cancellationToken = cancellationTokenSource.Token;
await server.WaitForConnectionAsync(cancellationToken);

if(!cancellationToken.IsCancellationRequested)

    await server.WriteAsync(new byte[]  1, 2, 3, 4 , 0, 4, cancellationToken);


if(!cancellationToken.IsCancellationRequested)

    var buffer = new byte[4];
    await server.ReadAsync(buffer, 0, 4, cancellationToken);


Console.WriteLine("exit server");

上面的代码会导致

exit server
client exit

我认为这也是你最原始的问题......

【讨论】:

【参考方案4】:

Hans Passant 的答案是理想的……几乎。唯一的问题是CancelIo() 取消了从同一个线程完成的请求。如果任务在不同的线程上恢复,这将不起作用。不幸的是,我没有足够的声望点来直接评论他的答案,因此单独回答。

所以他的示例代码的最后一部分应该改写如下:

    private static void CancelPipeIo(PipeStream pipe) 
        // Note: no PipeStream.IsDisposed, we'll have to swallow
        try 
            CancelIoEx(pipe.SafePipeHandle);
        
        catch (ObjectDisposedException)  
    
    [DllImport("kernel32.dll")]
    private static extern bool CancelIoEx(SafePipeHandle handle, IntPtr _ = default);

请注意,CancelIoEx() 在 Vista/Server 2008 及更高版本中可用,而CancelIo() 在 Windows XP 中也可用。

【讨论】:

以上是关于NamedPipeServerStream.ReadAsync() 在 CancellationToken 请求取消时不退出的主要内容,如果未能解决你的问题,请参考以下文章