CLR 异步函数
Posted mingjie-c
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了CLR 异步函数相关的知识,希望对你有一定的参考价值。
windows是如何执行I/O操作的?计算机的每个模块都有自己的微型处理器,当写文件到磁盘中时,操作系统将写文件的任务交给磁盘的处理单元就可以做其他的了。还有需要TCP/IP 与另一台电脑通信时,系统只要将发送的数据写入TCP的缓存区就可以做其他的了,发送数据由网卡处理单元完成。
但是这些模块的处理单元的计算能力远不如CPU快,如果将CPU的计算资源总是和这些模块的处理单元同步的话,就会影响应用程序的性能,给用户带来不好的体验。这一节就讲解如何执行计算限制的操作,利用线程池在多个CPU 内核上调度任务,使多个线程并发工作,从而高效使用系统资源。
1 Windows 如何执行I/O 操作
用一个从磁盘中读取文件中的数据为例:
1 程序通过构造一个 FileStream 对象来打开磁盘文件,然后调用 Read 方法从文件读取数据。
2 调用 FileStream 的Read 方法时,你的线程从托管代码转变为本机/用户模式代码,Read 内部调用 Win32 ReadFile 函数。
3 ReadFile 分配一个小的数据结构,称为 I/O 请求包 (I/O Request Packet,IRP)。 然后 ReadFile 将你的线程从本机/ 用户模式代码转变成本机/内核模式代码,向内核传递 IRP 数据结构,从而调用 Windows 内核。
4 根据 IRP 中的设备句柄, Windows 内核知道I/O 操作要传送给哪个硬件设备。因此,Windows 将IRP 传送给恰当的设备驱动程序的IRP 队列。
5 每个设备驱动程序都维护着自己的 IRP 队列,其中包含了机器上运行的所有进程发出的I/O 请求。IRP 数据包到达时,设备驱动程序将IRP 信息传递给物理硬件设备上安装的电路板。现在,硬件设备将执行这些请求的I/O 操作。
6 在硬件设备执行I/O 操作期间,发出了I/O 请求的线程将无事可做,所以 Windows 将线程变成睡眠状态,防止浪费CPU 时间。
7 最终,硬件设备会完成 I/O 操作。然后,Windows 会唤醒你的线程,把它调度给一个 CPU ,使它从内核模式返回用户模式,再返回至托管代码。
现在讨论一下 Windows 如何执行异步I/O 操作。引入了 CLR 的线程池。打开磁盘文件的方式任然是通过构造一个 FileStream 对象,但现在传递了一个 FileOptions.Asynchronous 标志。告诉Windows 我希望文件的读/写 操作以异步方式执行。
-
1 现在调用 ReadAsync 而不是 Read 从文件中读取数据。ReadAsync 内部分配一个 Task<Int32> 对象来代表用于完成读取操作的代码。然后,ReadAsync 调用 Win32 ReadFile函数。
-
2 ReadFile 分配 IRP 添加到硬盘驱动程序的 IRP 队列中。但线程不再阻塞,而是允许返回至你的代码。
-
3 那什么时候以及什么方式处理最终读取的数据呢?
注意:调用 ReadAsync 返回的是一个 Task<Int32> 对象,可在该对象调用 ContinueWith 来登记任务完成时执行的回调方法。也可以用C# 的异步函数功能简化编码。
-
4 硬件设备处理好IRP 后,会将完成的 IRP 放到 CLR 的线程池队列中。将来的某个时候,一个线程池线程会提取完成的 IRP 并执行完成任务的代码,最终要么设置异常,要么返回结果。这样一来,Task 对象就知道操作在什么时候完成,代码可以开始运行并安全地访问 Byte[] 中的数据。
CLR 的线程池使用名为 “I/O完成端口”(I/O Completion Port)的 Windows 资源来引出我刚才描述的行为。CLR 在初始化时创建一个 I/O 完成端口。当你打开硬件设备时,这些设备可以和I/O 完成端口关联,使设备驱动程序知道完成的IRP 送到哪。
以异步方式执行 I/O 操作有很多好处:
-
1 将资源利用率降到最低,并减少上下文切换。
-
2 每开始一次垃圾回收,CLR 都会挂起进程中的所有线程。所以,线程越少,垃圾回收器运行的速度越快。
-
3 垃圾回收时,CLR 遍历所有线程栈来查找根,同样线程越少,栈的数量越少,使垃圾回收速率变得更快。
C# 的异步函数
Microsoft 设计了一个编程模型来帮助开发者利用这种异步操作能力。该模式利用了上一章的 Task 和 称为 异步函数 的一个C# 语言功能。以下代码使用异步函数来执行两个异步 I/O 操作。
private static async Task<String> IssueClientRequestAsync(String serverName, String message) {
using (var pipe = new NamedPipeClientStream(serverName, "PipeName", PipeDirection.InOut,PipeOptions.Asynchronous | PipeOptions.WriteThrough)) {
pipe.Connect(); // Must Connect before setting ReadMode
pipe.ReadMode = PipeTransmissionMode.Message;
// Asynchronously send data to the server
Byte[] request = Encoding.UTF8.GetBytes(message);
await pipe.WriteAsync(request, 0, request.Length);
// Asynchronously read the server‘s response
Byte[] response = new Byte[1000];
Int32 bytesRead = await pipe.ReadAsync(response, 0, response.Length);
return Encoding.UTF8.GetString(response, 0, bytesRead);
} // Close the pipe
}
下面来解释一下上述代码中的异步函数执行过程。对于理解await 非常重要。
-
方法标记为 async ,编译器就会将方法的代码转换成实现了状态机的一个类型。这就允许线程执行状态机中的一些代码并返回,方法不需要一直执行到结束。
-
调用WriteAsync时,在WriteAsync 内部分配了一个 Task 对象并把它返回给IssueClientRequestAsync ,此时,C# await 操作符实际会在 Task 对象上调用ContinueWith ,向它传递用于恢复状态机的方法。然后线程从 IssueClientRequestAsync返回。
-
在将来某个时候,网络设备驱动程序会结束向管道的写入,一个线程池线程会通知Task 对象,后者激活ContinueWith回调方法,造成一个线程恢复状态机。更具体的说,一个线程会重新进入 IssueClientRequestAsync 方法,但这次是从 await 操作符的位置开始的。
-
方法现在执行编译器生成的、用于查询Task 对象状态的代码。如果操作失败,会设置代表错误的一个异常。如果操作成功完成,await 操作符会返回结果。本列子中,WriteAsync 返回一个Task 而不是Task<TResult>, 所以无返回值。
-
现在方法继续执行,分配一个Byte[] 并调用 NamedPipeClientStream 的异步 ReadAsync 方法。ReadAsync 内部创建一个 Task<Int32>对象并返回它。同样的,await 操作符实际会在Task<Int32>对象上调用 ContinueWith,向其传递用于恢复状态机的方法。然后线程再次从 IssueClientRequestAsync 返回。
-
将来的某个时候,服务器向客户机发送一个响应,网络设备驱动程序获得这个响应,一个线程池线程通知 Task<Int32>对象,后者恢复状态机。await 操作符造成编译器生成代码来查询 Task对象的Result 属性(一个 Int32)并将结果赋给局部变量 bytesRead。如果操作失败,则抛出异常。然后执行 IssueClientRequestAsync 剩余的代码,返回结果字符串并关闭管道。此时状态机执行完毕,垃圾回收器会回收任何内存。
-
调用者如何知道 IssueClientRequestAsync 已经执行完毕它的状态机呢?一旦将方法标记为 async,编译器会自动生成代码,在状态机开始执行时创建一个 Task 对象。该Task 对象在状态机执行完毕时自动完成。注意 IssueClientRequestAsync 方法的返回类型是 Task<String>,它实际返回的是由编译器生成的代码为这个方法(IssueClientRequestAsync 方法)的调用者而创建的Task<String> 对象,Task 的Result 属性在本列中是 String 类型。在IssueClientRequestAsync 方法靠近尾部的地方,我反回了一个字符串。这造成编译器生成的代码完成它创建的 Task<String>对象,把对象的Result 属性设为返回的字符串。
注意:异步函数存在以下限制。
-
1 不能转变为异步函数的情况,Main方法、构造器、属性访问器方法和 事件访问器方法。
-
2 异步函数不能使用任何 out 或 ref 参数。
-
3 不能在 catch ,finally 或 unsafe 块中使用 await 操作符。
-
4 不能在 await 操作符之前获得一个支持线程所有权 或递归的锁,并在 await 操作符之后释放它。 因为 await 之前的代码由一个线程执行,而await 之后的代码由另一个线程执行。
-
5 在查询表达式中,await 操作符只能在初始 from 子句的第一个集合表达式中使用,或者在 join 子句的集合表达式中使用。
异步函数扩展性
在扩展性方面,用Task对象包装一个将来完成的操作,就可以用await 操作符来等待该操作。下面是Jeffrey Richter 写的一个TaskLogger 类,它可以显示尚未完成的异步操作。我们可以在调试的时候使用。
public static class TaskLogger {
public enum TaskLogLevel { None, Pending }
public static TaskLogLevel LogLevel { get; set; }
public sealed class TaskLogEntry {
public Task Task { get; internal set; }
public String Tag { get; internal set; }
public DateTime LogTime { get; internal set; }
public String CallerMemberName { get; internal set; }
public String CallerFilePath { get; internal set; }
public Int32 CallerLineNumber { get; internal set; }
public override string ToString() {
return String.Format("LogTime={0}, Tag={1}, Member={2}, File={3}({4})",
LogTime, Tag ?? "(none)", CallerMemberName, CallerFilePath, CallerLineNumber);
}
}
private static readonly ConcurrentDictionary<Task,TaskLogEntry> s_log =
new ConcurrentDictionary<Task, TaskLogEntry>();
public static IEnumerable<TaskLogEntry> GetLogEntries() { return s_log.Values; }
public static Task<TResult> Log<TResult>(this Task<TResult> task, String tag = null,
[CallerMemberName] String callerMemberName = null,
[CallerFilePath] String callerFilePath = null,
[CallerLineNumber] Int32 callerLineNumber = -1) {
return (Task<TResult>)
Log((Task)task, tag, callerMemberName, callerFilePath, callerLineNumber);
}
public static Task Log(this Task task, String tag = null,
[CallerMemberName] String callerMemberName = null,
[CallerFilePath] String callerFilePath = null,
[CallerLineNumber] Int32 callerLineNumber = •1) {
if (LogLevel == TaskLogLevel.None) return task;
var logEntry = new TaskLogEntry {
Task = task,
LogTime = DateTime.Now,
Tag = tag,
CallerMemberName = callerMemberName,
CallerFilePath = callerFilePath,
CallerLineNumber = callerLineNumber
};
s_log[task] = logEntry;
task.ContinueWith(t