为啥使用 IAsyncEnumerable 比返回 async/await Task<T> 慢?

Posted

技术标签:

【中文标题】为啥使用 IAsyncEnumerable 比返回 async/await Task<T> 慢?【英文标题】:Why is using IAsyncEnumerable slower than returning async/await Task<T>?为什么使用 IAsyncEnumerable 比返回 async/await Task<T> 慢? 【发布时间】:2020-05-03 13:06:51 【问题描述】:

我目前正在测试 C# 8 的异步流,似乎当我尝试使用使用 async/await 并返回 Task> 的旧模式运行应用程序时,它似乎更快。 (我用秒表测量它并尝试多次运行它,结果是我提到的旧模式似乎比使用 IAsyncEnumerable 快一些)。

这是我编写的一个简单的控制台应用程序(我也在想我可能以错误的方式从数据库中加载数据)

class Program
    
        static async Task Main(string[] args)
        

            // Using the old pattern 
            //Stopwatch stopwatch = Stopwatch.StartNew();
            //foreach (var person in await LoadDataAsync())
            //
            //    Console.WriteLine($"Id: person.Id, Name: person.Name");
            //
            //stopwatch.Stop();
            //Console.WriteLine(stopwatch.ElapsedMilliseconds);


            Stopwatch stopwatch = Stopwatch.StartNew();
            await foreach (var person in LoadDataAsyncStream())
            
                Console.WriteLine($"Id: person.Id, Name: person.Name");
            
            stopwatch.Stop();
            Console.WriteLine(stopwatch.ElapsedMilliseconds);


            Console.ReadKey();
        


        static async Task<IEnumerable<Person>> LoadDataAsync()
        
            string connectionString = "Server=localhost; Database=AsyncStreams; Trusted_Connection = True;";
            var people = new List<Person>();
            using (SqlConnection connection = new SqlConnection(connectionString))
            
                //SqlDataReader
                await connection.OpenAsync();

                string sql = "Select * From Person";
                SqlCommand command = new SqlCommand(sql, connection);

                using (SqlDataReader dataReader = await command.ExecuteReaderAsync())
                
                    while (await dataReader.ReadAsync())
                    
                        Person person = new Person();
                        person.Id = Convert.ToInt32(dataReader[nameof(Person.Id)]);
                        person.Name = Convert.ToString(dataReader[nameof(Person.Name)]);
                        person.Address = Convert.ToString(dataReader[nameof(Person.Address)]);
                        person.Occupation = Convert.ToString(dataReader[nameof(Person.Occupation)]);
                        person.Birthday = Convert.ToDateTime(dataReader[nameof(Person.Birthday)]);
                        person.FavoriteColor = Convert.ToString(dataReader[nameof(Person.FavoriteColor)]);
                        person.Quote = Convert.ToString(dataReader[nameof(Person.Quote)]);
                        person.Message = Convert.ToString(dataReader[nameof(Person.Message)]);

                        people.Add(person);
                    
                

                await connection.CloseAsync();
            

            return people;
        

        static async IAsyncEnumerable<Person> LoadDataAsyncStream()
        
            string connectionString = "Server=localhost; Database=AsyncStreams; Trusted_Connection = True;";
            using (SqlConnection connection = new SqlConnection(connectionString))
            
                //SqlDataReader
                await connection.OpenAsync();

                string sql = "Select * From Person";
                SqlCommand command = new SqlCommand(sql, connection);

                using (SqlDataReader dataReader = await command.ExecuteReaderAsync())
                
                    while (await dataReader.ReadAsync())
                    
                        Person person = new Person();
                        person.Id = Convert.ToInt32(dataReader[nameof(Person.Id)]);
                        person.Name = Convert.ToString(dataReader[nameof(Person.Name)]);
                        person.Address = Convert.ToString(dataReader[nameof(Person.Address)]);
                        person.Occupation = Convert.ToString(dataReader[nameof(Person.Occupation)]);
                        person.Birthday = Convert.ToDateTime(dataReader[nameof(Person.Birthday)]);
                        person.FavoriteColor = Convert.ToString(dataReader[nameof(Person.FavoriteColor)]);
                        person.Quote = Convert.ToString(dataReader[nameof(Person.Quote)]);
                        person.Message = Convert.ToString(dataReader[nameof(Person.Message)]);

                        yield return person;
                    
                

                await connection.CloseAsync();
            
        

我想知道 IAsyncEnumerable 是不是最适合这种情况,还是我在使用 IAsyncEnumerable 时查询数据的方式有问题?我可能错了,但我实际上希望使用 IAsyncEnumerable 会更快。 (顺便说一句......差异通常以数百毫秒为单位)

我使用 10,000 行的示例数据尝试了该应用程序。

这也是填充数据的代码以防万一......

static async Task InsertDataAsync()
        
            string connectionString = "Server=localhost; Database=AsyncStreams; Trusted_Connection = True;";
            using (SqlConnection connection = new SqlConnection(connectionString))
            
                string sql = $"Insert Into Person (Name, Address, Birthday, Occupation, FavoriteColor, Quote, Message) Values";


                for (int i = 0; i < 1000; i++)
                
                    sql += $"('"Randel Ramirez " + i', '"Address " + i', 'new DateTime(1989, 4, 26)', '"Software Engineer " + i', '"Red " + i', '"Quote " + i', '"Message " + i'),";
                

                using (SqlCommand command = new SqlCommand(sql.Remove(sql.Length - 1), connection))
                
                    command.CommandType = CommandType.Text;

                    await connection.OpenAsync();
                    await command.ExecuteNonQueryAsync();
                    await connection.CloseAsync();
                

            
        

【问题讨论】:

这并不奇怪。使用IAsyncEnumerable,您就是awaiting 每个人。使用Task&lt;IEnumerable&gt;,您只需等待一次。 IAsyncEnumerable 的优势在于您可以看到每个人都被抓取:您不必等待所有人都被抓取。如果您不需要,请不要使用IAsyncEnumerable @canton7 这并不完全正确。在 LoadDataAsyncStream 中,代码也在等待每次调用 ExecuteReaderAsync。 @F***Bigler 我说的是消费IAsyncEnumerable / Task&lt;IEnumerable&gt;。在这两种情况下创建它需要相同数量的等待 实际上,IAsyncEnumerable&lt;T&gt; 实现允许“生成”批量值,使 MoveNextAsync 与已批量处理的值同步。 如果注释掉Console.WriteLine($"Id: person.Id, Name: person.Name"); 行,性能差异是否仍然存在?我的理论是,在从数据库中获取数据的同时打印数据可能会减慢与数据库的异步通信。 【参考方案1】:

我认为“我想知道 IAsyncEnumerable 是否最适合这种情况”这个问题的答案在@Bizhan 的批处理示例和随后的讨论中有点迷失了,但从那篇文章中重申:

IAsyncEnumerable 是关于异步检索数据尽快提供单个值

OP 正在测量读取所有记录的总时间,而忽略了第一条记录被检索并准备好供调用代码使用的速度。

如果“这种场景”意味着尽可能快地将所有数据读入内存,那么 IAsyncEnumerable 并不是最适合的。

如果在等待读取所有记录之前开始处理初始记录很重要,那么 IAsyncEnumerable 最适合。

但是,在现实世界中,您确实应该测试整个系统的性能,这将包括对数据的实际处理,而不是简单地将其输出到控制台。特别是在多线程系统中,通过尽可能快地开始同时处理多条记录,同时从数据库中读取更多数据,可以获得最大性能。比较一下,等待单个线程预先读取所有数据(假设您可以将整个数据集放入内存)然后才能开始处理它。

【讨论】:

再举一个例子,在 asp.net 6 中,控制器可以返回一个IAsyncEnumerable,框架将在获取结果的同时流式传输生成的 json。如果传输受 I/O 限制,这既可以缩短到第一个对象的时间,也可以提高总时间。【参考方案2】:

IAsyncEnumerable&lt;T&gt; 本身并不比Task&lt;T&gt; 快或慢。这取决于实现。

IAsyncEnumerable&lt;T&gt; 是关于尽快异步检索提供单个值的数据。

IAsyncEnumerable&lt;T&gt; 允许批量生成值,这将使MoveNextAsync 的某些调用同步,如下例所示:

async Task Main()

    var hasValue = false;
    var asyncEnumerator = GetValuesAsync().GetAsyncEnumerator();
    do
    
        var task = asyncEnumerator.MoveNextAsync();
        Console.WriteLine($"Completed synchronously: task.IsCompleted");
        hasValue = await task;
        if (hasValue)
        
            Console.WriteLine($"Value=asyncEnumerator.Current");
        
    
    while (hasValue);
    await asyncEnumerator.DisposeAsync();


async IAsyncEnumerable<int> GetValuesAsync()

    foreach (var batch in GetValuesBatch())
    
        await Task.Delay(1000);
        foreach (var value in batch)
        
            yield return value;
        
    

IEnumerable<IEnumerable<int>> GetValuesBatch()

    yield return Enumerable.Range(0, 3);
    yield return Enumerable.Range(3, 3);
    yield return Enumerable.Range(6, 3);

输出:

Completed synchronously: False
Value=0
Completed synchronously: True
Value=1
Completed synchronously: True
Value=2
Completed synchronously: False
Value=3
Completed synchronously: True
Value=4
Completed synchronously: True
Value=5
Completed synchronously: False
Value=6
Completed synchronously: True
Value=7
Completed synchronously: True
Value=8
Completed synchronously: True

【讨论】:

注意:IAsyncEnumerator&lt;T&gt;.MoveNextAsync 方法返回一个ValueTask&lt;bool&gt;,除了等待它(一次)或调用它的AsTask 方法之外,用ValueTask&lt;T&gt; 做任何事情都是违反类型的contract .这包括查询其IsCompleted 属性。 没错,@TheodorZoulias。但是,在这种特殊情况下,除了等待它或再次调用 MoveNextAsync 之外,代码并没有保留 ValueTask 这一行呢:Console.WriteLine($"Completed synchronously: task.IsCompleted");? 它总是在下次调用 MoveNextAsync 之前调用。 这不是唯一的限制。不允许在完成之前查询其Result 属性,不允许等待两次,不允许两次调用AsTask 等。

以上是关于为啥使用 IAsyncEnumerable 比返回 async/await Task<T> 慢?的主要内容,如果未能解决你的问题,请参考以下文章

如何使用 SqlDataReader 返回和使用 IAsyncEnumerable

在返回带有取消的 IAsyncEnumerable 的函数中迭代 IAsyncEnumerable

在 C#8 IAsyncEnumerable<T> 中并行化收益返回

如何在实际迭代发生之前验证 IAsyncEnumerable 返回方法的参数?

gRPC 服务器流是不是可以将流返回到 Blazor Wasm 而不是 IAsyncEnumerable<T>?

如何实现一个高效的 WhenEach 流式传输任务结果的 IAsyncEnumerable?