NamedPipeServerStream.ReadAsync() 在 CancellationToken 请求取消时不退出
Posted
技术标签:
【中文标题】NamedPipeServerStream.ReadAsync() 在 CancellationToken 请求取消时不退出【英文标题】:NamedPipeServerStream.ReadAsync() does not exit when CancellationToken requests cancellation 【发布时间】:2019-03-09 00:04:53 【问题描述】:当 NamedPipeServer 流从管道读取任何数据时,它不会对CancellationTokenSource.Cancel()
做出反应
这是为什么呢?
如何限制我在服务器中等待来自客户端的数据的时间?
要重现的代码:
static void Main(string[] args)
Server();
Clinet();
Console.WriteLine("press [enter] to exit");
Console.ReadLine();
private static async Task Server()
using (var cancellationTokenSource = new CancellationTokenSource(1000))
using (var server = new NamedPipeServerStream("test",
PipeDirection.InOut,
1,
PipeTransmissionMode.Byte,
PipeOptions.Asynchronous))
var cancellationToken = cancellationTokenSource.Token;
await server.WaitForConnectionAsync(cancellationToken);
await server.WriteAsync(new byte[]1,2,3,4, 0, 4, cancellationToken);
var buffer = new byte[4];
await server.ReadAsync(buffer, 0, 4, cancellationToken);
Console.WriteLine("exit server");
private static async Task Clinet()
using (var client = new NamedPipeClientStream(".", "test", PipeDirection.InOut, PipeOptions.Asynchronous))
var buffer = new byte[4];
client.Connect();
client.Read(buffer, 0, 4);
await Task.Delay(5000);
await client.WriteAsync(new byte[] 1, 2, 3, 4, 0, 4);
Console.WriteLine("client exit");
预期结果:
exit server
<client throws exception cuz server closed pipe>
实际结果:
client exit
exit server
编辑
CancelIo
的答案似乎很有希望,它确实允许服务器在取消令牌被取消时结束通信。
但是,我不明白为什么我的“基本场景”在使用ReadPipeAsync
时停止工作。
这里是代码,它包括 2 个客户端函数:
Clinet_ShouldWorkFine
- 一个读/写及时的好客户端
Clinet_ServerShouldEndCommunication_CuzClientIsSlow
- 客户端太慢,服务器应该结束通信
预期:
Clinet_ShouldWorkFine
- 执行结束,没有任何异常
Clinet_ServerShouldEndCommunication_CuzClientIsSlow
- 服务器关闭管道,客户端抛出异常
实际:
Clinet_ShouldWorkFine
- 服务器在第一次调用 ReadPipeAsync
时停止,管道在 1 秒后关闭,客户端抛出异常
Clinet_ServerShouldEndCommunication_CuzClientIsSlow
- 服务器关闭管道,客户端抛出异常
为什么服务器使用ReadPipeAsync
时Clinet_ShouldWorkFine
不起作用
class Program
static void Main(string[] args)
// in this case server should close the pipe cuz client is too slow
try
var tasks = new Task[3];
tasks[0] = Server();
tasks[1] = tasks[0].ContinueWith(c =>
Console.WriteLine($"Server exited, cancelled=c.IsCanceled");
);
tasks[2] = Clinet_ServerShouldEndCommunication_CuzClientIsSlow();
Task.WhenAll(tasks).Wait();
catch (Exception ex)
Console.WriteLine(ex);
// in this case server should exchange data with client fine
try
var tasks = new Task[3];
tasks[0] = Server();
tasks[1] = tasks[0].ContinueWith(c =>
Console.WriteLine($"Server exited, cancelled=c.IsCanceled");
);
tasks[2] = Clinet_ShouldWorkFine();
Task.WhenAll(tasks).Wait();
catch (Exception ex)
Console.WriteLine(ex);
Console.WriteLine("press [enter] to exit");
Console.ReadLine();
private static async Task Server()
using (var cancellationTokenSource = new CancellationTokenSource(1000))
using (var server = new NamedPipeServerStream("test",
PipeDirection.InOut,
1,
PipeTransmissionMode.Byte,
PipeOptions.Asynchronous))
var cancellationToken = cancellationTokenSource.Token;
await server.WaitForConnectionAsync(cancellationToken);
await server.WriteAsync(new byte[]1,2,3,4, 0, 4, cancellationToken);
await server.WriteAsync(new byte[]1,2,3,4, 0, 4, cancellationToken);
var buffer = new byte[4];
var bytes = await server.ReadPipeAsync(buffer, 0, 4, cancellationToken);
var bytes2 = await server.ReadPipeAsync(buffer, 0, 4, cancellationToken);
Console.WriteLine("exit server");
private static async Task Clinet_ShouldWorkFine()
using (var client = new NamedPipeClientStream(".", "test", PipeDirection.InOut, PipeOptions.Asynchronous))
var buffer = new byte[4];
client.Connect();
client.Read(buffer, 0, 4);
client.Read(buffer, 0, 4);
await client.WriteAsync(new byte[] 1, 2, 3, 4, 0, 4);
await client.WriteAsync(new byte[] 1, 2, 3, 4, 0, 4);
Console.WriteLine("client exit");
private static async Task Clinet_ServerShouldEndCommunication_CuzClientIsSlow()
using (var client = new NamedPipeClientStream(".", "test", PipeDirection.InOut, PipeOptions.Asynchronous))
var buffer = new byte[4];
client.Connect();
client.Read(buffer, 0, 4);
client.Read(buffer, 0, 4);
await Task.Delay(5000);
await client.WriteAsync(new byte[] 1, 2, 3, 4, 0, 4);
await client.WriteAsync(new byte[] 1, 2, 3, 4, 0, 4);
Console.WriteLine("client exit");
public static class AsyncPipeFixer
public static Task<int> ReadPipeAsync(this PipeStream pipe, byte[] buffer, int offset, int count, CancellationToken cancellationToken)
if (cancellationToken.IsCancellationRequested) return Task.FromCanceled<int>(cancellationToken);
var registration = cancellationToken.Register(() => CancelPipeIo(pipe));
var async = pipe.BeginRead(buffer, offset, count, null, null);
return new Task<int>(() =>
try return pipe.EndRead(async);
finally registration.Dispose();
, cancellationToken);
private static void CancelPipeIo(PipeStream pipe)
// Note: no PipeStream.IsDisposed, we'll have to swallow
try
CancelIo(pipe.SafePipeHandle);
catch (ObjectDisposedException)
[DllImport("kernel32.dll")]
private static extern bool CancelIo(SafePipeHandle handle);
【问题讨论】:
@MrinalKamboj 我正在使用new CancellationTokenSource(1000)
,它在指定的时间过去后调用.Cancel()
- 在这种情况下是在1000ms之后
@MrinalKamboj 我应该在哪里添加Task.Delay(1000)
?对不起,我不明白。附注:上面的代码准确地展示了我遇到这个问题的真实情况。我从 C# 运行一个 python 脚本并通过管道与它交谈。我不能在这里或那里添加延迟,因为我知道 C# 完全卡在ReadAsync()
。
【参考方案1】:
.NET 程序员在编写像这样的小测试程序时会遇到 async/await 的可怕麻烦。它的构图很差,一路上都是乌龟。这个程序缺少最后的海龟,任务陷入僵局。没有人关心让任务继续执行,就像在(比如说)GUI 应用程序中通常发生的那样。调试起来也非常困难。
首先做一个小改动,让死锁完全可见:
int bytes = await server.ReadPipeAsync(buffer, 0, 4, cancellationTokenSource.Token);
这需要一个令人讨厌的小角落案例,Server 方法一直到“Server exited”消息。 Task 类的一个长期问题是,当任务完成或等待的方法同步完成时,它将尝试直接运行延续。这恰好在这个程序中起作用。通过强制它获取异步结果,死锁现在很明显。
下一步是修复 Main(),这样这些任务就不会再死锁了。可能是这样的:
static void Main(string[] args)
try
var tasks = new Task[3];
tasks[0] = Server();
tasks[1] = tasks[0].ContinueWith(c =>
Console.WriteLine($"Server exited, cancelled=c.IsCanceled");
);
tasks[2] = Clinet();
Task.WhenAll(tasks).Wait();
catch (Exception ex)
Console.WriteLine(ex);
Console.WriteLine("press [enter] to exit");
Console.ReadLine();
现在我们有机会提前解决取消问题。 NamedPipeServerStream 类本身不实现 ReadAsync,它从其基类之一 Stream 继承方法。它有一个糟糕的小细节,完全没有记录。只有盯着framework source code 才能看到。它只能在您调用 ReadAsync()之前发生取消时检测到取消。一旦开始读取,它就再也看不到取消了。您要解决的终极问题。
这是一个可以解决的问题,但我有一个模糊的想法,为什么微软没有为 PipeStreams 这样做。强制 BeginRead() 方法提前完成的正常方法是 Dispose() 对象,这也是 Stream.ReadAsync() 可以中断的唯一方法。但还有另一种方法,在 Windows 上可以使用CancelIo() 中断 I/O 操作。让我们把它变成一个扩展方法:
using System;
using System.Threading.Tasks;
using System.Runtime.InteropServices;
using System.IO.Pipes;
using Microsoft.Win32.SafeHandles;
public static class AsyncPipeFixer
public static Task<int> ReadPipeAsync(this PipeStream pipe, byte[] buffer, int offset, int count, CancellationToken cancellationToken)
if (cancellationToken.IsCancellationRequested) return Task.FromCanceled<int>(cancellationToken);
var registration = cancellationToken.Register(() => CancelPipeIo(pipe));
var async = pipe.BeginRead(buffer, offset, count, null, null);
return new Task<int>(() =>
try return pipe.EndRead(async);
finally registration.Dispose();
, cancellationToken);
private static void CancelPipeIo(PipeStream pipe)
// Note: no PipeStream.IsDisposed, we'll have to swallow
try
CancelIo(pipe.SafePipeHandle);
catch (ObjectDisposedException)
[DllImport("kernel32.dll")]
private static extern bool CancelIo(SafePipeHandle handle);
最后调整服务器以使用它:
int bytes = await server.ReadPipeAsync(buffer, 0, 4, cancellationTokenSource.Token);
请注意,此解决方法特定于 Windows,因此无法在面向 Unix 风格的 .NETCore 程序中运行。然后考虑更重的锤子,在 CancelPipeIo() 方法中调用 pipe.Close()。
【讨论】:
这真的很有趣。我现在明白为什么服务器是deaf
来取消消息了。但是我不知道为什么在我的Happy path
中使用ReadPipeAsync
不起作用。在我的原始帖子中添加了代码以演示“好”客户端的问题
请更清楚“不起作用”和“快乐的道路”是什么意思。正如所写的客户端不应该工作,你应该得到“管道坏了”异常消息。
我在原始帖子中添加了一个编辑。它包括 2 个“客户”。 1.Clinet_ServerShouldEndCommunication_CuzClientIsSlow()
是一个慢客户端的表示,它有一个Task.Delay(5000)
,在这种情况下服务器应该结束通信,因为客户端太慢了。 2.Clinet_ShouldWorkFine()
代表HappyPath
即。作为一个表现良好的客户端,服务器应该能够与Clinet_ShouldWorkFine()
交换数据而不会出现任何错误/异常。
这里应该有AsyncCallback
而不是null
?我看到的几乎所有类似的命名管道代码都有回调。只是想知道为什么这里省略了它?
我试过了,正如上面OP修改过的问题一样,发现它对我不起作用。什么工作,是用Task.Run
替换ReadPipeAsync 中的new Task<int>
。但我并没有假装对此了解得足够多,以知道我可能会造成什么其他混乱!【参考方案2】:
ReadAsync 首先检查是否取消,如果令牌取消则开始读取它没有效果
添加以下行
cancellationToken.Register(server.Disconnect);
using (var cancellationTokenSource = new CancellationTokenSource(1000))
using (var server = new NamedPipeServerStream("test",
PipeDirection.InOut,
1,
PipeTransmissionMode.Byte,
PipeOptions.Asynchronous))
var cancellationToken = cancellationTokenSource.Token;
cancellationToken.Register(server.Disconnect);
await server.WaitForConnectionAsync(cancellationToken);
await server.WriteAsync(new byte[]1,2,3,4, 0, 4, cancellationToken);
var buffer = new byte[4];
await server.ReadAsync(buffer, 0, 4, cancellationToken);
Console.WriteLine("exit server");
【讨论】:
Sever.Disconnect
如何在此处帮助取消调用。 OP 正在寻找没有发生的异常
如果服务器的管道关闭,Clinet() 方法将抛出异常。正如预期的结果。
真正的异常应该来自服务器,客户端不应该制造异常,基于像断开连接这样的简单事件
@Milad,谢谢。我实际上目前正在使用cancellationToken.Register(server.Disconnect);
作为解决方法,它确实处理了我正确发布的初始案例。但是,当客户端不从管道读取数据时,它不起作用。在这种情况下,服务器等待WriteAsync
,当server.Disconnect
发生时WriteAsync
抛出PipeClosedException
或类似情况。【参考方案3】:
我只是在看你的代码,也许还有一双新的眼睛……
据我所知,在您的原始场景和更复杂的场景中......您正在传递一个已经取消的取消令牌,这是非常不可预测的,其他人如何实现(如果有)在方法中抛出的异常...... .
使用
IsCancellationRequested
属性检查令牌是否已被取消并且不要传递已取消的令牌。
这里是从原始问题将其添加到您的代码中的示例(您可以为以后的 ReadPipeAsync
方法执行相同的操作。
var cancellationToken = cancellationTokenSource.Token;
await server.WaitForConnectionAsync(cancellationToken);
if(!cancellationToken.IsCancellationRequested)
await server.WriteAsync(new byte[] 1, 2, 3, 4 , 0, 4, cancellationToken);
if(!cancellationToken.IsCancellationRequested)
var buffer = new byte[4];
await server.ReadAsync(buffer, 0, 4, cancellationToken);
Console.WriteLine("exit server");
上面的代码会导致
exit server
client exit
我认为这也是你最原始的问题......
【讨论】:
【参考方案4】:Hans Passant 的答案是理想的……几乎。唯一的问题是CancelIo()
取消了从同一个线程完成的请求。如果任务在不同的线程上恢复,这将不起作用。不幸的是,我没有足够的声望点来直接评论他的答案,因此单独回答。
所以他的示例代码的最后一部分应该改写如下:
private static void CancelPipeIo(PipeStream pipe)
// Note: no PipeStream.IsDisposed, we'll have to swallow
try
CancelIoEx(pipe.SafePipeHandle);
catch (ObjectDisposedException)
[DllImport("kernel32.dll")]
private static extern bool CancelIoEx(SafePipeHandle handle, IntPtr _ = default);
请注意,CancelIoEx()
在 Vista/Server 2008 及更高版本中可用,而CancelIo()
在 Windows XP 中也可用。
【讨论】:
以上是关于NamedPipeServerStream.ReadAsync() 在 CancellationToken 请求取消时不退出的主要内容,如果未能解决你的问题,请参考以下文章