如何使用 SqlDataReader 返回和使用 IAsyncEnumerable

Posted

技术标签:

【中文标题】如何使用 SqlDataReader 返回和使用 IAsyncEnumerable【英文标题】:How to Return and Consume an IAsyncEnumerable with SqlDataReader 【发布时间】:2020-03-08 13:52:34 【问题描述】:

请看以下两种方法。第一个返回IAsyncEnumerable。第二个试图消耗它。

using System.Collections.Generic;
using System.Data;
using System.Data.SqlClient;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;

public static class SqlUtility

    public static async IAsyncEnumerable<IDataRecord> GetRecordsAsync(
        string connectionString, SqlParameter[] parameters, string commandText,
        [EnumeratorCancellation]CancellationToken cancellationToken)
    
        using (SqlConnection connection = new SqlConnection(connectionString))
        
            await connection.OpenAsync(cancellationToken).ConfigureAwait(false);
            using (SqlCommand command = new SqlCommand(commandText, connection))
            
                command.Parameters.AddRange(parameters);
                using (var reader = await command.ExecuteReaderAsync()
                    .ConfigureAwait(false))
                
                    while (await reader.ReadAsync().ConfigureAwait(false))
                    
                        yield return reader;
                    
                
            
        
    

    public static async Task Example()
    
        const string connectionString =
            "Server=localhost;Database=[Redacted];Integrated Security=true";
        SqlParameter[] parameters = new SqlParameter[]
        
            new SqlParameter("VideoID", SqlDbType.Int)  Value = 1000 
        ;
        const string commandText = "select * from Video where VideoID=@VideoID";
        IAsyncEnumerable<IDataRecord> records = GetRecordsAsync(connectionString,
            parameters, commandText, CancellationToken.None);
        IDataRecord firstRecord = await records.FirstAsync().ConfigureAwait(false);
        object videoID = firstRecord["VideoID"]; //Should be 1000.
        // Instead, I get this exception:
        // "Invalid attempt to call MetaData when reader is closed."
    

当代码尝试读取结果 IDataReaderobject videoID = firstRecord["VideoID"];)时,我得到了这个异常:

阅读器关闭时调用元数据的尝试无效。

这是因为SqlDataReader 已被释放。有人可以提供推荐的方法来以异步方式枚举SqlDataReader,以便调用方法可以使用每个结果记录吗?谢谢。

【问题讨论】:

【参考方案1】:

在这种情况下,LINQ 不是你的朋友,因为FirstAsync 将在返回结果之前关闭迭代器,这不是 ADO.NET 所期望的;基本上:不要在这里使用 LINQ,或者至少:不要以这种方式。您也许可以使用Select 之类的东西在序列仍处于打开状态时执行投影,或者将这里的所有工作卸载到像 Dapper 这样的工具可能会更容易。或者,手动进行:

await foreach (var record in records)

    // TODO: process record
    // (perhaps "break"), because you only want the first

【讨论】:

我现在明白了。谢谢你。您的回答以最简洁的方式填补了我所缺少的知识。我用你告诉我的在读者处置之前将记录数据缓存到字典中。【参考方案2】:

您可以通过不返回依赖于仍处于打开状态的连接的对象来避免这种情况。例如,如果您只需要VideoID,则只需返回它(我假设它是int):

public static async IAsyncEnumerable<int> GetRecordsAsync(string connectionString, SqlParameter[] parameters, string commandText, [EnumeratorCancellation]CancellationToken cancellationToken)

    ...
                    yield return reader["VideoID"];
    ...

或者项目到你自己的类中:

public class MyRecord 
    public int VideoId  get; set; 


public static async IAsyncEnumerable<MyRecord> GetRecordsAsync(string connectionString, SqlParameter[] parameters, string commandText, [EnumeratorCancellation]CancellationToken cancellationToken)

    ...
                    yield return new MyRecord 
                        VideoId = reader["VideoID"]
                    
    ...

或者按照 Marc 的建议,在第一个之后使用 foreachbreak,在您的情况下看起来像这样:

IAsyncEnumerable<IDataRecord> records = GetRecordsAsync(connectionString, parameters, commandText, CancellationToken.None);
object videoID;
await foreach (var record in records)

    videoID = record["VideoID"];
    break;

【讨论】:

【参考方案3】:

当您公开一个打开的DataReader 时,关闭它以及底层Connection 的责任现在属于调用者,因此您不应处置任何东西。相反,您应该使用接受CommandBehavior 参数的DbCommand.ExecuteReaderAsync 重载,并传递CommandBehavior.CloseConnection 值:

命令执行时,关闭关联的DataReader对象时,关闭关联的Connection对象。

那你就只能希望调用者按规矩办事,及时调用DataReader.Close方法,在对象被垃圾回收之前不让连接打开。出于这个原因,暴露一个开放的DataReader 应该被认为是一种极端的性能优化技术,应该谨慎使用。

顺便说一句,如果您返回 IEnumerable&lt;IDataRecord&gt; 而不是 IAsyncEnumerable&lt;IDataRecord&gt;,您将遇到同样的问题。

【讨论】:

虽然我最终没有这样做,但我认为这是一个有趣的解决方案。你知道命令什么时候被处理吗?它只是在阅读器和连接保持打开状态时正常处理吗? 如果你不关闭或释放一个连接,当垃圾收集器回收对象时它会被关闭。确切的时间是不确定的,因为 GC 过程是不确定的。 谢谢。我明白那个。我在问命令。当阅读器关闭时,连接将被关闭。我想知道命令。再次感谢。 所有一次性内置对象都一样。如果您不明确处置它们,它们会在回收时被处置。 好的。我想知道当读者被处置时,命令是否会与连接一起被处置。也许我可以尝试使用GetRecordsAsync 中的using 语句正常处理命令。我不确定这是否会绊倒读者,因为它将在命令处理后使用。也许有一天我会尝试一下。谢谢。【参考方案4】:

要添加到其他答案,您可以使您的实用程序方法通用并添加一个投影委托Func&lt;IDataRecord, T&gt; projection,作为这样的参数:

public static async IAsyncEnumerable<T> GetRecordsAsync<T>(
    string connectionString, SqlParameter[] parameters, string commandText,
    Func<IDataRecord, T> projection, // Parameter here
    [EnumeratorCancellation] CancellationToken cancellationToken)

    ...
                    yield return projection(reader); // Projected here
    ...

然后在调用的时候传入一个 lambda 或者引用这样的方法组:

public static object GetVideoId(IDataRecord dataRecord)
    => dataRecord["VideoID"];

这样的:

GetRecordsAsync(connectionString, parameters, commandText, GetVideoId, CancellationToken.None);

【讨论】:

好主意。但这并不能阻止某人将IDataRecord 投射到自己(GetRecordsAsync(..., x =&gt; x, ...)上,从而导致同样的问题。暴露IDataRecords 本身就很麻烦。至少您的建议为知道自己在做什么并遵守规则的消费者提供了一条出路。【参考方案5】:

在 2021 年末,我有这个确切的问题。我找不到一个完整的例子,所以我只是把能找到的东西弄乱了,直到我找到了一些工作。

这是我的完整代码示例,虽然很简单(因此您可以稍后对其进行扩展),以及一些 cmets 详细说明了我在此过程中遇到的一些坑:

// This function turns each "DataRow" into an object of Type T and yields
// it. You could alternately yield the reader itself for each row.
// In this example, assume sqlCommandText and connectionString exist.
public async IAsyncEnumerable<T> ReadAsync<T>( Func<SqlDataReader, T> func )

    // we need a connection that will last as long as the reader is open,
    // alternately you could pass in an open connection.
    using SqlConnection connection = new SqlConnection( connectionString );
    using SqlCommand cmd = new SqlCommand( sqlCommandText, connection );

    await connection.OpenAsync();
    var reader = await cmd.ExecuteReaderAsync();
    while( await reader.ReadAsync() )
    
        yield return func( reader );
    

然后在您的(异步)代码的任何其他部分,您可以在 await foreach 循环中调用您的函数:

private static async Task CallIAsyncEnumerable()

    await foreach( var category in ReadAsync( ReaderToCategory ) )
    
        // do something with your category; save it in a list, write it to disk,
        // make an HTTP call ... the whole world is yours!
    


// an example delegate, which I'm passing into ReadAsync
private static Category ReaderToCategory( SqlDataReader reader )

    return new Category()
    
        Code = ( string )reader[ "Code" ],
        Group = ( string )reader[ "Group" ]
    ;


我发现的其他几件事:您不能在 try 中使用 yield,但是您可以将直到(包括)cmd.ExecuteReaderAsync() 的所有内容都填充到 try 中,或者返回一个单独的方法数据读取器。或者您可以将await foreach 包装在try 块中;我认为问题在于在尝试之外向调用者让步(这是有道理的,在你考虑之后)。

如果您使用其他方法生成阅读器,将连接传递给该方法,这样您就可以控制其生命周期。如果您的方法创建连接、执行命令并返回SqlDataReader,则连接将关闭(如果您使用了“使用”),然后您才能从阅读器中读取。同样,如果您仔细考虑一下,这完全有道理,但它让我绊倒了几分钟。

祝你好运,我希望这对以后的其他人有所帮助!

【讨论】:

以上是关于如何使用 SqlDataReader 返回和使用 IAsyncEnumerable的主要内容,如果未能解决你的问题,请参考以下文章

如何在 SqlDataReader 之前知道或获取数组大小?并将其作为 json 返回

如何在 JavaScript 中使用 C# SQLDataReader 对象

如何在 C# 中使用 SqlDataReader 获取行数

如何使用 SqlDataReader 像使用 SqlDataAdapter 一样快地填充 DataTable?

c#你如何从sqldatareader返回数据集?

c# - 从 SqlDataReader 填充通用列表