使用 IAsyncEnumerable 读取文本文件

Posted

技术标签:

【中文标题】使用 IAsyncEnumerable 读取文本文件【英文标题】:Read text file with IAsyncEnumerable 【发布时间】:2020-03-21 09:44:57 【问题描述】:

我在测试 C# 8.0 功能时遇到了 IAsyncEnumerable。我从 Anthony Chu (https://anthonychu.ca/post/async-streams-dotnet-core-3-iasyncenumerable/) 那里找到了非凡的例子。它是异步流和Task<IEnumerable<T>>的替代品

// Data Access Layer.
public async IAsyncEnumerable<Product> GetAllProducts()

    Container container = cosmosClient.GetContainer(DatabaseId, ContainerId);
    var iterator = container.GetItemQueryIterator<Product>("SELECT * FROM c");
    while (iterator.HasMoreResults)
    
        foreach (var product in await iterator.ReadNextAsync())
        
            yield return product;
        
    


// Usage
await foreach (var product in productsRepository.GetAllProducts())

    Console.WriteLine(product);

我想知道这是否可以应用于读取文本文件,如下所示,逐行读取文件。

foreach (var line in File.ReadLines("Filename"))

    // ...process line.

我真的很想知道如何将 async with IAsyncEnumerable&lt;string&gt;() 应用到上面的 foreach 循环中,以便它在读取时流式传输。

如何实现迭代器,以便可以使用 yield return 逐行读取?

【问题讨论】:

【参考方案1】:

完全一样,但是没有异步工作负载,所以让我们假装

public async IAsyncEnumerable<string> SomeSortOfAwesomeness()

   foreach (var line in File.ReadLines("Filename.txt"))
   
       // simulates an async workload, 
       // otherwise why would be using IAsyncEnumerable?
       // -- added due to popular demand 
       await Task.Delay(100);
       yield return line;
   

这只是一个封装的 APM 工作负载,请参阅Stephen Clearys cmets 了解详情

public static async IAsyncEnumerable<string> SomeSortOfAwesomeness()

   using StreamReader reader = File.OpenText("Filename.txt");
   while(!reader.EndOfStream)
      yield return await reader.ReadLineAsync();

用法

await foreach(var line in SomeSortOfAwesomeness())

   Console.WriteLine(line);

更新来自Stephen Cleary

File.OpenText 很遗憾只允许同步 I/O异步 API 是 在这种情况下实施不佳。要打开一个真正的异步文件, 您需要使用 FileStream 构造函数传递 isAsync: true 或 FileOptions.Asynchronous.

ReadLineAsync 基本上导致了这段代码,如你所见,它只是封装了 Stream APM BeginEnd 方法

private Task<Int32> BeginEndReadAsync(Byte[] buffer, Int32 offset, Int32 count)
            
     return TaskFactory<Int32>.FromAsyncTrim(
                    this, new ReadWriteParameters  Buffer = buffer, Offset = offset, Count = count ,
                    (stream, args, callback, state) => stream.BeginRead(args.Buffer, args.Offset, args.Count, callback, state), // cached by compiler
                    (stream, asyncResult) => stream.EndRead(asyncResult)); // cached by compiler

【讨论】:

什么!!!脑洞大开。我正在考虑某种真正异步的hack。所以微软不打算在IO操作中加入更多的异步方法。 我可以说第二种方法比第一种方法接近异步吗?我的目标是订阅这个流并尝试一些反应性的东西。 @phonemyatt 第一个不需要IAsyncEnumerable 没有Task.Delay。第二个实际上使用 StreamReader 类的 async 方法,它以最小的开销包装了 AMP 方法的读取 File.OpenText 遗憾的是只允许同步 I/O;在这种情况下,异步 API 的实现很差。要打开真正的异步文件,您需要使用 FileStream 构造函数传递 isAsync: trueFileOptions.Asynchronous @StephenCleary 感谢您的澄清,我更新了【参考方案2】:

我做了一些性能测试,似乎大的bufferSize 以及FileOptions.SequentialScan 选项很有帮助。

public static async IAsyncEnumerable<string> ReadLinesAsync(string filePath)

    using var stream = new FileStream(filePath, FileMode.Open, FileAccess.Read,
        FileShare.Read, 32768, FileOptions.Asynchronous | FileOptions.SequentialScan);
    using var reader = new StreamReader(stream);
    while (true)
    
        var line = await reader.ReadLineAsync().ConfigureAwait(false);
        if (line == null) break;
        yield return line;
    

虽然枚举并不是真正的异步。根据我的实验,StreamReader 类的xxxAsync 方法阻塞当前线程的持续时间长于它们返回的Task 的等待时间。例如,在我的 PC 中使用 ReadToEndAsync 方法读取 6 MB 文件会在返回任务之前阻塞当前线程 120 毫秒,然后任务仅在 20 毫秒内完成。所以我不确定使用这些方法有多大价值。通过使用同步 API 和一些 Linq.Async,伪造异步要容易得多:

IAsyncEnumerable<string> lines = File.ReadLines("SomeFile.txt").ToAsyncEnumerable();

【讨论】:

以上是关于使用 IAsyncEnumerable 读取文本文件的主要内容,如果未能解决你的问题,请参考以下文章

QT中怎样读取中文文本文件!

labview如何读取文本文档中某一行的字符串

读取大文件的最佳方式(例如非常大的文本文档)

读取一行文本文件,拆分为数组

Matlab读取文本文档txt文件

如何将文本文档批量拆分为变量