将 IAsyncEnumerable 与 Dapper 一起使用
Posted
技术标签:
【中文标题】将 IAsyncEnumerable 与 Dapper 一起使用【英文标题】:Using IAsyncEnumerable with Dapper 【发布时间】:2020-05-14 07:59:45 【问题描述】:我们最近将使用 Dapper
的 ASP.NET Core API 迁移到 .NET Core 3.1。迁移后,我们觉得有机会将 C# 8
的最新 IAsyncEnumerable
功能用于我们的一个端点。
这是修改前的伪代码:
public async Task<IEnumerable<Item>> GetItems(int id)
var reader = await _connection.QueryMultipleAsync(getItemsSql,
param: new
Id = id
);
var idFromDb = (await reader.ReadAsync<int?>().ConfigureAwait(false)).SingleOrDefault();
if (idFromDb == null)
return null;
var items = await reader.ReadAsync<Item>(buffered: false).ConfigureAwait(false);
return Stream(reader, items);
private IEnumerable<Item> Stream(SqlMapper.GridReader reader, IEnumerable<Item> items)
using (reader)
foreach (var item in items)
yield return item;
IAsyncEnumerable
代码更改后:
// Import Nuget pacakage: System.Linq.Async
public async Task<IAsyncEnumerable<Item>> GetItems(int id)
var reader = await _connection.QueryMultipleAsync(getItemsSql,
param: new
Id = id
);
var idFromDb = (await reader.ReadAsync<int?>().ConfigureAwait(false)).SingleOrDefault();
if (idFromDb == null)
return null;
var items = await reader.ReadAsync<Item>(buffered: false).ConfigureAwait(false);
return Stream(reader, items);
private IAsyncEnumerable<Item> Stream(SqlMapper.GridReader reader, IEnumerable<Item> items)
using (reader)
await foreach (var item in items.ToAsyncEnumerable())
yield return item;
上述方法是使用ToAsyncEnumerable
的灵感来自this post,但我不能100% 确定我是否在正确的地方/上下文中使用它。
问题:
dapper 库只返回IEnumerable
,但我们可以使用ToAsyncEnumerable
将其转换为IAsyncEnumerable
for async
stream
像上面一样吗?
注意:这个问题与What happens with returning IEnumerable if used with async/await (streaming data from SQL Server with Dapper)? 类似,但我认为这不能回答我的问题。
【问题讨论】:
如果 dapper 不暴露IAsyncEnumerable<T>
API,你希望通过包装 IEnumerable<T>
API 获得什么?
像这样包裹IAsyncEnumerable
,你将获得nothing。 IAsyncEnumerable
允许您在值到达时返回它们。但是,您的代码所做的是检索所有内容,然后使用虚假的异步操作将其返回。客户认为他们很快就会得到结果,但实际上他们必须像以前一样等待
嗨@abatishchev,谢谢你这么说。这种情况发生了很多次,以至于我不再问为什么我被否决了。我会花 15 分钟到半小时来提出问题,有时我会在几分钟内被否决。猜猜,否决票的定义不是很清楚。对我来说,如果一个问题格式正确且精确,如果不赞成,则可能不值得反对。
@AnkitVijay:干杯!由于某种原因,评论被删除(被版主?)。搞砸这个。
嗨@svw,如果这个问题是你搜索结果的顶部,它只是表明网络上没有太多关于这个主题的信息。您如何期望有人在这种情况下进行更多研究?我认为假设在问题发布到 SO 之前没有进行任何研究是错误的。我相信您会同意,提出一个问题以获得社区的良好响应是一项相当大的努力。无论如何,downvotes 并没有真正困扰我了。 :)
【参考方案1】:
更新:当我第一次写这个答案时,我不知道异步迭代器。感谢 Theodor Zoulias 指出这一点。鉴于此,一种更简单的方法是可能的:
using var reader = await connection.ExecuteReaderAsync(query, parameters);
var rowParser = reader.GetRowParser<T>();
while (await reader.ReadAsync())
yield return rowParser(reader);
原答案:
这是我编写的 IAsyncEnumerable
包装器,它可以帮助那些想要使用 async/await 流式传输无缓冲数据并且还想要 Dapper 类型映射的强大功能的人:
public class ReaderParser<T> : IAsyncEnumerable<T>
public ReaderParser(SqlDataReader reader)
Reader = reader;
private SqlDataReader Reader get;
public IAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken cancellationToken = default)
return new ReaderParserEnumerator<T>(Reader);
public class ReaderParserEnumerator<T> : IAsyncEnumerator<T>
public ReaderParserEnumerator(SqlDataReader reader)
Reader = reader;
RowParser = reader.GetRowParser<T>();
public T Current => Reader.FieldCount == 0 ? default(T) : RowParser(Reader);
private SqlDataReader Reader get;
private Func<IDataReader, T> RowParser get;
public async ValueTask DisposeAsync()
await Reader.DisposeAsync();
public async ValueTask<bool> MoveNextAsync()
return await Reader.ReadAsync();
用法:
var reader = await command.ExecuteReaderAsync();
return new ReaderParser<T>(reader);
然后,包System.Linq.Async
基本上添加了所有你知道和喜欢的漂亮的IEnumerable
扩展,例如在我的使用中:
var streamData = await repo.GetDataStream();
var buffer = await streamData.Take(BATCH_SIZE).ToListAsync();
【讨论】:
AFAICScancellationToken
方法的 GetAsyncEnumerator
参数被忽略。另外,当您可以编写 async iterator 时,为什么要显式实现接口?
这并不是一个强大的解决方案,真的。诚然,我不知道异步迭代器。我会尝试进行更改并适当地编辑我的答案。
对于那些在未来找到这个答案的人 - 它有效,但请记住,即使枚举器被释放,读者也会从数据库中获取(并在客户端忽略)整个内容预先。不幸的是,我还没有找到任何使用 Dapper 的方法来规避这个问题以上是关于将 IAsyncEnumerable 与 Dapper 一起使用的主要内容,如果未能解决你的问题,请参考以下文章
ToArrayAsync() 抛出“源 IQueryable 未实现 IAsyncEnumerable”
gRPC 服务器流是不是可以将流返回到 Blazor Wasm 而不是 IAsyncEnumerable<T>?
是否可以将 IObservable<T> 转换为 IAsyncEnumerable<T>