返回查询结果的异步流

Posted

技术标签:

【中文标题】返回查询结果的异步流【英文标题】:Returning async stream of query results 【发布时间】:2014-07-20 08:06:08 【问题描述】:

我有以下 WebApi 方法,它从 RavenDB 返回一个无限的结果流:

public IEnumerable<Foo> Get()

    var query = DocumentSession.Query<Foo, FooIndex>();
    using (var enumerator = DocumentSession.Advanced.Stream(query))
        while (enumerator.MoveNext())
            yield return enumerator.Current.Document;

现在我想让它异步。天真的方法当然行不通:

public async Task<IEnumerable<Location>> Get()

    var query = AsyncDocumentSession.Query<Foo, FooIndex>();
    using (var enumerator = await AsyncDocumentSession.Advanced.StreamAsync(query))
        while (await enumerator.MoveNextAsync())
            yield return enumerator.Current.Document;

...因为方法不能既是异步又是迭代器。

【问题讨论】:

您可以实现自己的迭代器。 但是,在迭代器上调用MoveNext 也必须是异步的——这意味着你不能实现IEnumerable&lt;T&gt;,你必须定义你自己的接口。而且您也无法在 foreach 循环中使用该迭代器。 是的,所有这些限制都是正确的。由于我只是返回这个(将由 WebApi 序列化),我不需要很大的灵活性。也许实现一个理解 Task&lt;IAsyncEnumerator&lt;StreamResult&lt;T&gt;&gt;&gt; 的 MediaTypeFormatter Using async / await with DataReader ? ( without middle buffers!) 的可能重复项 @noseratio,问题类似,但不是重复的。 @noseratio 建议的解决方案将适用。但是由于我使用的是Web Api,所以我有机会直接使用格式化程序来支持IAsyncEnumerator,所以我可以避免使用助手。使用 yield 是一种解决方案,而不是要求。 【参考方案1】:

由于这是一种 WebAPI 操作方法,HTTP 将您限制为单个响应。如果你只返回一个IEnumerable&lt;T&gt;,那么 ASP.NET 会在内存中枚举它,然后发送响应。

如果你对这个内存进程没问题,那么你可以自己做同样的事情:

public async Task<List<Location>> Get()

  var result = new List<Location>();
  var query = AsyncDocumentSession.Query<Foo, FooIndex>();
  using (var enumerator = await AsyncDocumentSession.Advanced.StreamAsync(query))
    while (await enumerator.MoveNextAsync())
      result.Add(enumerator.Current.Document);
  return result;

但是,我认为使用流式响应会更好,您可以通过PushStreamContent 获得;像这样:

public HttpResponseMessage Get()

  var query = AsyncDocumentSession.Query<Foo, FooIndex>();
  HttpResponseMessage response = Request.CreateResponse();
  response.Content = new PushStreamContent(
      async (stream, content, context) =>
      
        using (stream)
        using (var enumerator = await AsyncDocumentSession.Advanced.StreamAsync(query))
        
          while (await enumerator.MoveNextAsync())
          
            // TODO: adjust encoding as necessary.
            var serialized = JsonConvert.SerializeObject(enumerator.CurrentDocument);
            var data = UTF8Encoding.UTF8.GetBytes(serialized);
            var countPrefix = BitConverter.GetBytes(data.Length);
            await stream.WriteAsync(countPrefix, 0, countPrefix.Length);
            await stream.WriteAsync(data, 0, data.Length);
          
        
      );
  return response;

流式响应不需要您的服务器将整个响应保存在内存中;但是,您必须决定将文档写入响应流的正确方法。上面的示例代码只是将它们转换为 JSON,以 UTF8 编码,以及(二进制)长度前缀这些字符串。

【讨论】:

这实际上不是一个坏主意(PushStream 那个)。它甚至可能比我的好一点,它读取异步但写入同步。也许两者结合会很酷。【参考方案2】:

您可以实现自己的迭代器,而不是让编译器为您生成一个。

但是,在该迭代器上调用 MoveNext 也必须是异步的 - 这意味着您无法实现 IEnumerable&lt;T&gt;`IEnumerator, you'd have to define your own interface, e.g.,IAsyncEnumerator`。 而且您也无法在 foreach 循环中使用该迭代器。

在我看来,最好的办法是做StreamAsync 所做的事情。创建一个自定义类型IAsyncEnumerable,它返回一个实现自定义async T MoveNextAsync() 方法的IAsyncEnumerator&lt;T&gt;。 enumerable 将包装您的 query 对象,并且 enumerator 将获取文档会话的文档。

internal class AsyncDocumentEnumerable : IAsyncEnumerable<Document>

    private readonly YourQueryType _query;
    public AsyncDocumentEnumerable(YourQueryType query)
    
        _query = query;
    

    IAsyncEnumerator<Document> GetEnumerator()
    
        return new AsyncDocumentEnumerator(_query);
    



internal class AsyncDocumentEnumerator : IAsyncDocumentEnumerator<Document>

    private readonly YourQueryType _query;
    private IAsyncEnumerator<DocumentSession> _iter;

    public AsyncDocumentEnumerator(YourQueryType query)
    
        _query = query;
    

    public Task<bool> async MoveNextAsync()
    
        if(_iter == null)
            _iter = await AsyncDocumentSession.Advanced.StreamAsync(query);

        bool moved = await _iter.MoveNextAsync();

        if(moved)
            Current = _iter.Current.Document;
        return moved;
    

    public Document Currentget; private set;

【讨论】:

那行不通。 MoveNext 必须返回 Task&lt;bool&gt;,而不是 bool,所以我在同一个地方。 @DiegoMijelshon 哎呀,我的错!我就是这个意思。 仍然不是太有用 - 我仍然有一个 WebApi 不能使用的异步枚举/枚举器,就像以前一样,但被包装了。请参阅我自己的答案。【参考方案3】:

毕竟这并不难。解决方案是一个可以异步处理枚举器并将 JSON 写入流的格式化程序:

public class CustomJsonMediaTypeFormatter : JsonMediaTypeFormatter

    public override async Task WriteToStreamAsync(
           Type type, object value, Stream writeStream, HttpContent content,
           TransportContext transportContext, CancellationToken cancellationToken)
    
        if (type.IsGenericType &&
            type.GetGenericTypeDefinition() == typeof(IAsyncEnumerator<>))
        
            var writer = new JsonTextWriter(new StreamWriter(writeStream))
                          CloseOutput = false ;
            writer.WriteStartArray();
            await Serialize((dynamic)value, writer);
            writer.WriteEndArray();
            writer.Flush();
        
        else
            await base.WriteToStreamAsync(type, value, writeStream, content,
                                          transportContext, cancellationToken);
    

    async Task Serialize<T>(IAsyncEnumerator<StreamResult<T>> enumerator,
                            JsonTextWriter writer)
    
        var serializer = JsonSerializer.Create(SerializerSettings);
        while (await enumerator.MoveNextAsync())
            serializer.Serialize(writer, enumerator.Current.Document);
    

现在我的 WebApi 方法比以前更短了:

public Task<IAsyncEnumerator<StreamResult<Foo>>> Get()

    var query = AsyncDocumentSession.Query<Foo, FooIndex>();
    return AsyncDocumentSession.Advanced.StreamAsync(query);

【讨论】:

【参考方案4】:

他们在 C#8 中引入了IAsyncEnumerable&lt;int&gt;

    async IAsyncEnumerable<int> GetBigResultsAsync()
    
        await foreach (var result in GetResultsAsync())
        
            if (result > 20) yield return result; 
        
    

【讨论】:

【参考方案5】:

您可以查看ReactiveExtensions for .Net,它们是专门为您的需求而设计的。最终结果可能如下所示:

public IObservable<Location> Get()
        
            var locations = new Subject<Location>();

            Task.Run(() =>
                     
                         var query = DocumentSession.Query<Foo, FooIndex>();
                         foreach (var document in DocumentSession.Advanced.Stream(query))
                         
                             locations.OnNext(document);
                         
                         locations.OnCompleted();
                     );

            return locations;
        

【讨论】:

你只是在那里包装了一个非异步调用。它违背了异步的目的(在 IO 发生时不使用线程) 然后不要换行。我刚刚做了一个使用同步 API 的例子。如果你有异步 API,你肯定不需要任何 Task.Run() 调用。有很多方法可以从任何类型的 API(同步和异步)创建 IObservable&lt;Location&gt; 序列。 我刚刚意识到 AsyncDocumentSession 是 RavenDB 客户端公开的东西,而不是您的自定义东西......抱歉,我以前从未见过这个,所以我无法使用 @987654326 创建工作原型@ 很容易,但我仍然相信 IObservable 非常适合您的问题。

以上是关于返回查询结果的异步流的主要内容,如果未能解决你的问题,请参考以下文章

带有mysql查询的异步函数不会返回查询结果node.js

流分析 - 查询嵌套数组返回 0 个结果

异步流使用注意事项

java 异步查询转同步多种实现方式:循环等待,CountDownLatch,Spring Even

异步处理http请求同步返回结果

Kotlin 协程Flow 异步流 ② ( 使用 Flow 异步流持续获取不同返回值 | Flow 异步流获取返回值方式与其它方式对比 | 在 Android 中使用 Flow 异步流下载文件 )