将 IAsyncEnumerable 与 Dapper 一起使用

Posted

技术标签:

【中文标题】将 IAsyncEnumerable 与 Dapper 一起使用【英文标题】:Using IAsyncEnumerable with Dapper 【发布时间】:2020-05-14 07:59:45 【问题描述】:

我们最近将使用 Dapper 的 ASP.NET Core API 迁移到 .NET Core 3.1。迁移后,我们觉得有机会将 C# 8 的最新 IAsyncEnumerable 功能用于我们的一个端点。

这是修改前的伪代码:

public async Task<IEnumerable<Item>> GetItems(int id)

    var reader = await _connection.QueryMultipleAsync(getItemsSql,
       param: new
       
           Id = id
       );

    var idFromDb = (await reader.ReadAsync<int?>().ConfigureAwait(false)).SingleOrDefault();
    if (idFromDb == null)
    
       return null;
    

    var items = await reader.ReadAsync<Item>(buffered: false).ConfigureAwait(false);

    return Stream(reader, items);
 

private IEnumerable<Item> Stream(SqlMapper.GridReader reader, IEnumerable<Item> items)

    using (reader)
    
        foreach (var item in items)
        
            yield return item;
        
         

IAsyncEnumerable 代码更改后:

// Import Nuget pacakage: System.Linq.Async

public async Task<IAsyncEnumerable<Item>> GetItems(int id)

    var reader = await _connection.QueryMultipleAsync(getItemsSql,
       param: new
       
           Id = id
       );

    var idFromDb = (await reader.ReadAsync<int?>().ConfigureAwait(false)).SingleOrDefault();
    if (idFromDb == null)
    
        return null;
    

    var items = await reader.ReadAsync<Item>(buffered: false).ConfigureAwait(false);

    return Stream(reader, items);
 

private IAsyncEnumerable<Item> Stream(SqlMapper.GridReader reader, IEnumerable<Item> items)

    using (reader)
    
       await foreach (var item in items.ToAsyncEnumerable())
       
           yield return item;
       
    
 

上述方法是使用ToAsyncEnumerable 的灵感来自this post,但我不能100% 确定我是否在正确的地方/上下文中使用它。

问题:

dapper 库只返回IEnumerable,但我们可以使用ToAsyncEnumerable 将其转换为IAsyncEnumerable for async stream 像上面一样吗?

注意:这个问题与What happens with returning IEnumerable if used with async/await (streaming data from SQL Server with Dapper)? 类似,但我认为这不能回答我的问题。

【问题讨论】:

如果 dapper 不暴露 IAsyncEnumerable&lt;T&gt; API,你希望通过包装 IEnumerable&lt;T&gt; API 获得什么? 像这样包裹IAsyncEnumerable,你将获得nothingIAsyncEnumerable 允许您在值到达时返回它们。但是,您的代码所做的是检索所有内容,然后使用虚假的异步操作将其返回。客户认为他们很快就会得到结果,但实际上他们必须像以前一样等待 嗨@abatishchev,谢谢你这么说。这种情况发生了很多次,以至于我不再问为什么我被否决了。我会花 15 分钟到半小时来提出问题,有时我会在几分钟内被否决。猜猜,否决票的定义不是很清楚。对我来说,如果一个问题格式正确且精确,如果不赞成,则可能不值得反对。 @AnkitVijay:干杯!由于某种原因,评论被删除(被版主?)。搞砸这个。 嗨@svw,如果这个问题是你搜索结果的顶部,它只是表明网络上没有太多关于这个主题的信息。您如何期望有人在这种情况下进行更多研究?我认为假设在问题发布到 SO 之前没有进行任何研究是错误的。我相信您会同意,提出一个问题以获得社区的良好响应是一项相当大的努力。无论如何,downvotes 并没有真正困扰我了。 :) 【参考方案1】:

更新:当我第一次写这个答案时,我不知道异步迭代器。感谢 Theodor Zoulias 指出这一点。鉴于此,一种更简单的方法是可能的:

using var reader = await connection.ExecuteReaderAsync(query, parameters);
var rowParser = reader.GetRowParser<T>();

while (await reader.ReadAsync()) 
    yield return rowParser(reader);

原答案:

这是我编写的 IAsyncEnumerable 包装器,它可以帮助那些想要使用 async/await 流式传输无缓冲数据并且还想要 Dapper 类型映射的强大功能的人:

public class ReaderParser<T> : IAsyncEnumerable<T> 
    public ReaderParser(SqlDataReader reader) 
        Reader = reader;
    
    private SqlDataReader Reader  get; 
    public IAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken cancellationToken = default) 
        return new ReaderParserEnumerator<T>(Reader);
    

public class ReaderParserEnumerator<T> : IAsyncEnumerator<T> 
    public ReaderParserEnumerator(SqlDataReader reader) 
        Reader = reader;
        RowParser = reader.GetRowParser<T>();
    
    public T Current => Reader.FieldCount == 0 ? default(T) : RowParser(Reader);
    private SqlDataReader Reader  get; 
    private Func<IDataReader, T> RowParser  get; 
    public async ValueTask DisposeAsync() 
        await Reader.DisposeAsync();
    
    public async ValueTask<bool> MoveNextAsync() 
        return await Reader.ReadAsync();
    

用法:

var reader = await command.ExecuteReaderAsync();
return new ReaderParser<T>(reader);

然后,包System.Linq.Async 基本上添加了所有你知道和喜欢的漂亮的IEnumerable 扩展,例如在我的使用中:

var streamData = await repo.GetDataStream();
var buffer = await streamData.Take(BATCH_SIZE).ToListAsync();

【讨论】:

AFAICS cancellationToken 方法的 GetAsyncEnumerator 参数被忽略。另外,当您可以编写 async iterator 时,为什么要显式实现接口? 这并不是一个强大的解决方案,真的。诚然,我不知道异步迭代器。我会尝试进行更改并适当地编辑我的答案。 对于那些在未来找到这个答案的人 - 它有效,但请记住,即使枚举器被释放,读者也会从数据库中获取(并在客户端忽略)整个内容预先。不幸的是,我还没有找到任何使用 Dapper 的方法来规避这个问题

以上是关于将 IAsyncEnumerable 与 Dapper 一起使用的主要内容,如果未能解决你的问题,请参考以下文章

ToArrayAsync() 抛出“源 IQueryable 未实现 IAsyncEnumerable”

gRPC 服务器流是不是可以将流返回到 Blazor Wasm 而不是 IAsyncEnumerable<T>?

是否可以将 IObservable<T> 转换为 IAsyncEnumerable<T>

如何通过 Remix 将 Dapp 连接到 Metamask 并与部署在 Ropsten 上的智能合约进行交互

处理以太坊 DApp 中的用户资料

为啥我不允许在返回 IAsyncEnumerable 的方法中返回 IAsyncEnumerable