返回查询结果的异步流
Posted
技术标签:
【中文标题】返回查询结果的异步流【英文标题】:Returning async stream of query results 【发布时间】:2014-07-20 08:06:08 【问题描述】:我有以下 WebApi 方法,它从 RavenDB 返回一个无限的结果流:
public IEnumerable<Foo> Get()
var query = DocumentSession.Query<Foo, FooIndex>();
using (var enumerator = DocumentSession.Advanced.Stream(query))
while (enumerator.MoveNext())
yield return enumerator.Current.Document;
现在我想让它异步。天真的方法当然行不通:
public async Task<IEnumerable<Location>> Get()
var query = AsyncDocumentSession.Query<Foo, FooIndex>();
using (var enumerator = await AsyncDocumentSession.Advanced.StreamAsync(query))
while (await enumerator.MoveNextAsync())
yield return enumerator.Current.Document;
...因为方法不能既是异步又是迭代器。
【问题讨论】:
您可以实现自己的迭代器。 但是,在迭代器上调用MoveNext
也必须是异步的——这意味着你不能实现IEnumerable<T>
,你必须定义你自己的接口。而且您也无法在 foreach
循环中使用该迭代器。
是的,所有这些限制都是正确的。由于我只是返回这个(将由 WebApi 序列化),我不需要很大的灵活性。也许实现一个理解 Task<IAsyncEnumerator<StreamResult<T>>>
的 MediaTypeFormatter
Using async / await with DataReader ? ( without middle buffers!) 的可能重复项
@noseratio,问题类似,但不是重复的。
@noseratio 建议的解决方案将适用。但是由于我使用的是Web Api,所以我有机会直接使用格式化程序来支持IAsyncEnumerator,所以我可以避免使用助手。使用 yield 是一种解决方案,而不是要求。
【参考方案1】:
由于这是一种 WebAPI 操作方法,HTTP 将您限制为单个响应。如果你只返回一个IEnumerable<T>
,那么 ASP.NET 会在内存中枚举它,然后发送响应。
如果你对这个内存进程没问题,那么你可以自己做同样的事情:
public async Task<List<Location>> Get()
var result = new List<Location>();
var query = AsyncDocumentSession.Query<Foo, FooIndex>();
using (var enumerator = await AsyncDocumentSession.Advanced.StreamAsync(query))
while (await enumerator.MoveNextAsync())
result.Add(enumerator.Current.Document);
return result;
但是,我认为使用流式响应会更好,您可以通过PushStreamContent
获得;像这样:
public HttpResponseMessage Get()
var query = AsyncDocumentSession.Query<Foo, FooIndex>();
HttpResponseMessage response = Request.CreateResponse();
response.Content = new PushStreamContent(
async (stream, content, context) =>
using (stream)
using (var enumerator = await AsyncDocumentSession.Advanced.StreamAsync(query))
while (await enumerator.MoveNextAsync())
// TODO: adjust encoding as necessary.
var serialized = JsonConvert.SerializeObject(enumerator.CurrentDocument);
var data = UTF8Encoding.UTF8.GetBytes(serialized);
var countPrefix = BitConverter.GetBytes(data.Length);
await stream.WriteAsync(countPrefix, 0, countPrefix.Length);
await stream.WriteAsync(data, 0, data.Length);
);
return response;
流式响应不需要您的服务器将整个响应保存在内存中;但是,您必须决定将文档写入响应流的正确方法。上面的示例代码只是将它们转换为 JSON,以 UTF8 编码,以及(二进制)长度前缀这些字符串。
【讨论】:
这实际上不是一个坏主意(PushStream 那个)。它甚至可能比我的好一点,它读取异步但写入同步。也许两者结合会很酷。【参考方案2】:您可以实现自己的迭代器,而不是让编译器为您生成一个。
但是,在该迭代器上调用 MoveNext
也必须是异步的 - 这意味着您无法实现 IEnumerable<T>
`IEnumerator, you'd have to define your own interface, e.g.,
IAsyncEnumerator`。
而且您也无法在 foreach 循环中使用该迭代器。
在我看来,最好的办法是做StreamAsync
所做的事情。创建一个自定义类型IAsyncEnumerable
,它返回一个实现自定义async T MoveNextAsync()
方法的IAsyncEnumerator<T>
。 enumerable 将包装您的 query
对象,并且 enumerator 将获取文档会话的文档。
internal class AsyncDocumentEnumerable : IAsyncEnumerable<Document>
private readonly YourQueryType _query;
public AsyncDocumentEnumerable(YourQueryType query)
_query = query;
IAsyncEnumerator<Document> GetEnumerator()
return new AsyncDocumentEnumerator(_query);
internal class AsyncDocumentEnumerator : IAsyncDocumentEnumerator<Document>
private readonly YourQueryType _query;
private IAsyncEnumerator<DocumentSession> _iter;
public AsyncDocumentEnumerator(YourQueryType query)
_query = query;
public Task<bool> async MoveNextAsync()
if(_iter == null)
_iter = await AsyncDocumentSession.Advanced.StreamAsync(query);
bool moved = await _iter.MoveNextAsync();
if(moved)
Current = _iter.Current.Document;
return moved;
public Document Currentget; private set;
【讨论】:
那行不通。 MoveNext 必须返回Task<bool>
,而不是 bool,所以我在同一个地方。
@DiegoMijelshon 哎呀,我的错!我就是这个意思。
仍然不是太有用 - 我仍然有一个 WebApi 不能使用的异步枚举/枚举器,就像以前一样,但被包装了。请参阅我自己的答案。【参考方案3】:
毕竟这并不难。解决方案是一个可以异步处理枚举器并将 JSON 写入流的格式化程序:
public class CustomJsonMediaTypeFormatter : JsonMediaTypeFormatter
public override async Task WriteToStreamAsync(
Type type, object value, Stream writeStream, HttpContent content,
TransportContext transportContext, CancellationToken cancellationToken)
if (type.IsGenericType &&
type.GetGenericTypeDefinition() == typeof(IAsyncEnumerator<>))
var writer = new JsonTextWriter(new StreamWriter(writeStream))
CloseOutput = false ;
writer.WriteStartArray();
await Serialize((dynamic)value, writer);
writer.WriteEndArray();
writer.Flush();
else
await base.WriteToStreamAsync(type, value, writeStream, content,
transportContext, cancellationToken);
async Task Serialize<T>(IAsyncEnumerator<StreamResult<T>> enumerator,
JsonTextWriter writer)
var serializer = JsonSerializer.Create(SerializerSettings);
while (await enumerator.MoveNextAsync())
serializer.Serialize(writer, enumerator.Current.Document);
现在我的 WebApi 方法比以前更短了:
public Task<IAsyncEnumerator<StreamResult<Foo>>> Get()
var query = AsyncDocumentSession.Query<Foo, FooIndex>();
return AsyncDocumentSession.Advanced.StreamAsync(query);
【讨论】:
【参考方案4】:他们在 C#8 中引入了IAsyncEnumerable<int>
async IAsyncEnumerable<int> GetBigResultsAsync()
await foreach (var result in GetResultsAsync())
if (result > 20) yield return result;
【讨论】:
【参考方案5】:您可以查看ReactiveExtensions for .Net,它们是专门为您的需求而设计的。最终结果可能如下所示:
public IObservable<Location> Get()
var locations = new Subject<Location>();
Task.Run(() =>
var query = DocumentSession.Query<Foo, FooIndex>();
foreach (var document in DocumentSession.Advanced.Stream(query))
locations.OnNext(document);
locations.OnCompleted();
);
return locations;
【讨论】:
你只是在那里包装了一个非异步调用。它违背了异步的目的(在 IO 发生时不使用线程) 然后不要换行。我刚刚做了一个使用同步 API 的例子。如果你有异步 API,你肯定不需要任何Task.Run()
调用。有很多方法可以从任何类型的 API(同步和异步)创建 IObservable<Location>
序列。
我刚刚意识到 AsyncDocumentSession
是 RavenDB 客户端公开的东西,而不是您的自定义东西......抱歉,我以前从未见过这个,所以我无法使用 @987654326 创建工作原型@ 很容易,但我仍然相信 IObservable
非常适合您的问题。以上是关于返回查询结果的异步流的主要内容,如果未能解决你的问题,请参考以下文章
java 异步查询转同步多种实现方式:循环等待,CountDownLatch,Spring Even
Kotlin 协程Flow 异步流 ② ( 使用 Flow 异步流持续获取不同返回值 | Flow 异步流获取返回值方式与其它方式对比 | 在 Android 中使用 Flow 异步流下载文件 )